Ruby 3: Non-blocking Fiber
Ruby 3 was just released today(2020/12/25) with a lot of new features, among them are:
- Static type
- Ractor
- Non-blocking fiber
I’ll focus on non-blocking fiber in this post.
I have written an introduction about Fiber some months ago. In case you haven’t heard about Fiber before, here it is: Event driven non blocking IO with Ruby’s fiber
Notable change to Fiber in Ruby 3 is the new boolean option blocking
for every Fiber.new
operation.
So what does this keyword mean?
When blocking: true
, fiber will act as it is before.
The story becomes interesting when blocking: false
(this is the default). It will make this fiber to become a non-blocking fiber.
Inside this fiber whenever there is a blocking operation such as IO, network, sleep, etc, it will be suspended(ie yield
)
and transfer control to other fiber.
Obviously, the next question is how can the former fiber continue its execution when says the IO operation finished? We will need a scheduler(ie an event-loop) to keep track of which fiber is in blocking state and when it’s posible to resume.
Ruby doesn’t provide a scheduler. It’s up to us to implement the scheduler whatever we like.
In the next part, we will implement a simple scheduler.
Fiber.scheduler
A fiber scheduler is like an event-loop. It keeps track of which fiber is in blocking state and its corresponding blocking operation. When blocking operation finished or unblocked, scheduler will resume its execution.
To implement a scheduler, we must implement the following hooks:
io_wait
process_wait
kernel_sleep
block
andunblock
These hooks will be called when a non-blocking fiber call a blocking operation(eg: IO(io_wait
), sleep(kernel_sleep
))
A naive implementation of scheduler can be like
class Scheduler
# trigger by events: IO#wait, IO#wait_readable, IO#wait_writeable
def io_wait(io, events, timeout)
end
# trigger by events: Kernel#sleep, Mutex#sleep
def kernel_sleep(duration = nil)
end
# trigger by events: Process::Status.wait
def process_wait(pid, flags)
end
# trigger by events: Thread#join, Mutex
def block(blocker, timeout = nil)
end
# trigger when a previous block called is unblock
def unblock(blocker, fiber)
end
# Called when current thread exits
def close
end
end
With a scheduler, we can start using non-blocking Fiber by calling Fiber.set_scheduler
Now, let implement a simple kernel_sleep
hook.
kernel_sleep
kernel_sleep
hook will be called for events such as Kernel.sleep
. Our scheduler will need to support kernel_sleep
and close
hooks
require 'fiber'
class SimpleScheduler
def initialize
# our internal Hash to track which Fiber is sleeping and for how long
@waiting = {}
end
def run
# we will loop until there is no more event
# Our event loop for now will only check for sleeping fibers
while @waiting.any?
@waiting.keys.each do |fiber|
# fiber needs to wake up
if current_time > @waiting[fiber]
@waiting.delete(fiber)
fiber.resume
end
end
end
end
def kernel_sleep(duration = nil)
# this function(and other hooks) will run in context of the fiber calling sleep
# hence Fiber.current will be our target fiber that need to be halted
@waiting[Fiber.current] = current_time + duration
# halt this fiber and transfer control to its parent
Fiber.yield
return true
end
def close
run
end
private
def current_time
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
end
To use our SimpleScheduler
, we will need to call Fiber.set_scheduler
scheduler = SimpleScheduler.new
Fiber.set_scheduler(scheduler)
# now we're ready to call sleep
# let's create two fiber
Fiber.new do
puts "Fiber 1: sleep for 2s"
sleep(2)
puts "Fiber 1: wake up"
end.resume
Fiber.new do
puts "Fiber 2: sleep for 3s"
sleep(3)
puts "Fiber 2: wake up"
end.resume
which when running will result
Fiber 1: Sleep 2s
Fiber 2: Sleep 3s
Fiber 1: wakeup
Fiber 2: wakeup
We can see that while fiber 1 is sleeping, it is halted and fiber 2 gets its chance to run. Our scheduler keeps track of how long these fibers needs to sleep then resume when the time comes.
io_wait
We’ll use this simple program to test io_wait
require 'net/http'
Fiber.new do
Net::HTTP.get(URI('https://www.google.com'))
end.resume
Fiber’s document of io_wait
reads:
io_wait(io, events, timeout):
events is a bit mask of IO::READABLE, IO::WRITABLE, and IO::PRIORITY.
Expected to return the subset of events that are ready immediately.
so the stragegy here is to maintain two hash inside our scheduler, @readable
and @writable
.
We’ll use IO#select
to know when there is an io object’s ready.
class SimpleScheduler
def initialize
# our internal Hash to track which Fiber is waiting for a read event
@readable = {}
# our internal Hash to track which Fiber is waiting for a write event
@writable = {}
@waiting = {}
end
def io_wait(io, events, timeout)
# events is a bit mask of IO::READABLE, IO::WRITABLE, and IO::PRIORITY.
unless (events & IO::READABLE).zero?
@readable[io] = Fiber.current
end
unless (events & IO::WRITABLE).zero?
@writable[io] = Fiber.current
end
# small exercise: add IO::PRIORITY
# halt fiber and transfer control to its parent
Fiber.yield
# when it's ready to read/write, resume fiber
return events
end
end
now, in our event-loop, we will need to handle @readable
and @writable
class SimpleScheduler
def run
while @readable.any? or @writable.any? or @waiting.any?
# select's signature:
# select(read_array [, write_array [, error_array [, timeout]]]) → array or nil
readable, writable = IO.select(@readable.keys, @writable.keys, [], 0)
# when there's an io that's ready
# we remove it from our @readable/@writable and resume its corresponding fiber
readable&.each do |io|
fiber = @readable.delete(io)
fiber.resume
end
writable&.each do |io|
fiber = @writable.delete(io)
fiber.resume
end
@waiting.keys.each do |fiber|
if current_time > @waiting[fiber]
@waiting.delete(fiber)
fiber.resume
end
end
end
end
end
However, when run this scheduler against our test program(the one making http request to google), we’ll get this exception:
undefined method ‘block’ for #<SimpleScheduler>
which implies that somewhere in Ruby’s http library, block/unblock
hook is called.
Let’s add block/unblock
block/unblock
# our block's implementation is pretty simple
# we maintain @blocking variable, tracking how many fibers are blocked
# increase @blocking when there's block event
# decrease it when it's resumed
def block(blocker, timeout = nil)
@blocking += 1
begin
Fiber.yield
ensure
@blocking -= 1
end
end
# this is when previously block fiber is unblock
# we'll use an IO.pipe to notify event-loop that there's fiber unblocked
# ie: @urgent = IO.pipe
# @ready is an array containing fiber that are unblocked
def unblock(blocker, fiber)
@ready << fiber
# we'll write to the pipe to notify event-loop
io = @urgent.last
io.write_nonblock('.')
end
# we also need to update our event-loop
def run
# let's review what we're tracking:
# @readable: for fibers that are waiting to read
# @writable: for fibers that are waiting to write
# @waiting: for fibers that are sleeping
# @blocking: for fibers that are blocked
# @ready: for fibers that are unblocked
while @readable.any? or @writable.any? or @waiting.any? or @blocking.positive? or @ready.any?
readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], 0)
readable&.each do |io|
if fiber = @readable.delete(io)
fiber.resume
end
end
writable&.each do |io|
if fiber = @writable.delete(io)
fiber.resume
end
end
@waiting.keys.each do |fiber|
if current_time > @waiting[fiber]
@waiting.delete(fiber)
fiber.resume
end
end
# we simply resume all fibers in @ready
ready, @ready = @ready, []
ready.each do |fiber|
fiber.resume
end
end
end
I’ve added the complete implemetation of simple_scheduler
at the end of this post.
Conclusion
Non-blocking Fiber provides a simple interface to handle non-blocking IO. To us user, we probably don’t deal directly with fiber but through other wrapper such as Async.
In Ruby 3.0, non-blocking Fiber is still in development, there’re some IO event that aren’t Fiber-friendly. We’ll need to wait sometime for it to become mature.
Lastly, there’re some improvements that I’ll leave as an exercise
- In our IO.select call, we return immediatelly if there is no event(by passing 0 as its last parameter). It would be more efficent if it can wait for a while. How can we achieve that?
- Implement
process_wait
hook. - Our event-loop is while loop that will eat up CPU time is there is no event. Can we improve it?