Ruby 3: Non-blocking Fiber
Ruby 3 was just released today(2020/12/25) with a lot of new features, among them are:
- Static type
- Ractor
- Non-blocking fiber
I’ll focus on non-blocking fiber in this post.
I have written an introduction about Fiber some months ago. In case you haven’t heard about Fiber before, here it is: Event driven non blocking IO with Ruby’s fiber
Notable change to Fiber in Ruby 3 is the new boolean option blocking for every Fiber.new operation.
So what does this keyword mean?
When blocking: true, fiber will act as it is before.
The story becomes interesting when blocking: false(this is the default). It will make this fiber to become a non-blocking fiber.
Inside this fiber whenever there is a blocking operation such as IO, network, sleep, etc, it will be suspended(ie yield)
and transfer control to other fiber.
Obviously, the next question is how can the former fiber continue its execution when says the IO operation finished? We will need a scheduler(ie an event-loop) to keep track of which fiber is in blocking state and when it’s posible to resume.
Ruby doesn’t provide a scheduler. It’s up to us to implement the scheduler whatever we like.
In the next part, we will implement a simple scheduler.
Fiber.scheduler
A fiber scheduler is like an event-loop. It keeps track of which fiber is in blocking state and its corresponding blocking operation. When blocking operation finished or unblocked, scheduler will resume its execution.
To implement a scheduler, we must implement the following hooks:
io_waitprocess_waitkernel_sleepblockandunblock
These hooks will be called when a non-blocking fiber call a blocking operation(eg: IO(io_wait), sleep(kernel_sleep))
A naive implementation of scheduler can be like
class Scheduler
# trigger by events: IO#wait, IO#wait_readable, IO#wait_writeable
def io_wait(io, events, timeout)
end
# trigger by events: Kernel#sleep, Mutex#sleep
def kernel_sleep(duration = nil)
end
# trigger by events: Process::Status.wait
def process_wait(pid, flags)
end
# trigger by events: Thread#join, Mutex
def block(blocker, timeout = nil)
end
# trigger when a previous block called is unblock
def unblock(blocker, fiber)
end
# Called when current thread exits
def close
end
end
With a scheduler, we can start using non-blocking Fiber by calling Fiber.set_scheduler
Now, let implement a simple kernel_sleep hook.
kernel_sleep
kernel_sleep hook will be called for events such as Kernel.sleep. Our scheduler will need to support kernel_sleep and close hooks
require 'fiber'
class SimpleScheduler
def initialize
# our internal Hash to track which Fiber is sleeping and for how long
@waiting = {}
end
def run
# we will loop until there is no more event
# Our event loop for now will only check for sleeping fibers
while @waiting.any?
@waiting.keys.each do |fiber|
# fiber needs to wake up
if current_time > @waiting[fiber]
@waiting.delete(fiber)
fiber.resume
end
end
end
end
def kernel_sleep(duration = nil)
# this function(and other hooks) will run in context of the fiber calling sleep
# hence Fiber.current will be our target fiber that need to be halted
@waiting[Fiber.current] = current_time + duration
# halt this fiber and transfer control to its parent
Fiber.yield
return true
end
def close
run
end
private
def current_time
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
end
To use our SimpleScheduler, we will need to call Fiber.set_scheduler
scheduler = SimpleScheduler.new
Fiber.set_scheduler(scheduler)
# now we're ready to call sleep
# let's create two fiber
Fiber.new do
puts "Fiber 1: sleep for 2s"
sleep(2)
puts "Fiber 1: wake up"
end.resume
Fiber.new do
puts "Fiber 2: sleep for 3s"
sleep(3)
puts "Fiber 2: wake up"
end.resume
which when running will result
Fiber 1: Sleep 2s
Fiber 2: Sleep 3s
Fiber 1: wakeup
Fiber 2: wakeup
We can see that while fiber 1 is sleeping, it is halted and fiber 2 gets its chance to run. Our scheduler keeps track of how long these fibers needs to sleep then resume when the time comes.
io_wait
We’ll use this simple program to test io_wait
require 'net/http'
Fiber.new do
Net::HTTP.get(URI('https://www.google.com'))
end.resume
Fiber’s document of io_wait reads:
io_wait(io, events, timeout):
events is a bit mask of IO::READABLE, IO::WRITABLE, and IO::PRIORITY.
Expected to return the subset of events that are ready immediately.
so the stragegy here is to maintain two hash inside our scheduler, @readable and @writable.
We’ll use IO#select to know when there is an io object’s ready.
class SimpleScheduler
def initialize
# our internal Hash to track which Fiber is waiting for a read event
@readable = {}
# our internal Hash to track which Fiber is waiting for a write event
@writable = {}
@waiting = {}
end
def io_wait(io, events, timeout)
# events is a bit mask of IO::READABLE, IO::WRITABLE, and IO::PRIORITY.
unless (events & IO::READABLE).zero?
@readable[io] = Fiber.current
end
unless (events & IO::WRITABLE).zero?
@writable[io] = Fiber.current
end
# small exercise: add IO::PRIORITY
# halt fiber and transfer control to its parent
Fiber.yield
# when it's ready to read/write, resume fiber
return events
end
end
now, in our event-loop, we will need to handle @readable and @writable
class SimpleScheduler
def run
while @readable.any? or @writable.any? or @waiting.any?
# select's signature:
# select(read_array [, write_array [, error_array [, timeout]]]) → array or nil
readable, writable = IO.select(@readable.keys, @writable.keys, [], 0)
# when there's an io that's ready
# we remove it from our @readable/@writable and resume its corresponding fiber
readable&.each do |io|
fiber = @readable.delete(io)
fiber.resume
end
writable&.each do |io|
fiber = @writable.delete(io)
fiber.resume
end
@waiting.keys.each do |fiber|
if current_time > @waiting[fiber]
@waiting.delete(fiber)
fiber.resume
end
end
end
end
end
However, when run this scheduler against our test program(the one making http request to google), we’ll get this exception:
undefined method ‘block’ for #<SimpleScheduler>
which implies that somewhere in Ruby’s http library, block/unblock hook is called.
Let’s add block/unblock
block/unblock
# our block's implementation is pretty simple
# we maintain @blocking variable, tracking how many fibers are blocked
# increase @blocking when there's block event
# decrease it when it's resumed
def block(blocker, timeout = nil)
@blocking += 1
begin
Fiber.yield
ensure
@blocking -= 1
end
end
# this is when previously block fiber is unblock
# we'll use an IO.pipe to notify event-loop that there's fiber unblocked
# ie: @urgent = IO.pipe
# @ready is an array containing fiber that are unblocked
def unblock(blocker, fiber)
@ready << fiber
# we'll write to the pipe to notify event-loop
io = @urgent.last
io.write_nonblock('.')
end
# we also need to update our event-loop
def run
# let's review what we're tracking:
# @readable: for fibers that are waiting to read
# @writable: for fibers that are waiting to write
# @waiting: for fibers that are sleeping
# @blocking: for fibers that are blocked
# @ready: for fibers that are unblocked
while @readable.any? or @writable.any? or @waiting.any? or @blocking.positive? or @ready.any?
readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], 0)
readable&.each do |io|
if fiber = @readable.delete(io)
fiber.resume
end
end
writable&.each do |io|
if fiber = @writable.delete(io)
fiber.resume
end
end
@waiting.keys.each do |fiber|
if current_time > @waiting[fiber]
@waiting.delete(fiber)
fiber.resume
end
end
# we simply resume all fibers in @ready
ready, @ready = @ready, []
ready.each do |fiber|
fiber.resume
end
end
end
I’ve added the complete implemetation of simple_scheduler at the end of this post.
Conclusion
Non-blocking Fiber provides a simple interface to handle non-blocking IO. To us user, we probably don’t deal directly with fiber but through other wrapper such as Async.
In Ruby 3.0, non-blocking Fiber is still in development, there’re some IO event that aren’t Fiber-friendly. We’ll need to wait sometime for it to become mature.
Lastly, there’re some improvements that I’ll leave as an exercise
- In our IO.select call, we return immediatelly if there is no event(by passing 0 as its last parameter). It would be more efficent if it can wait for a while. How can we achieve that?
- Implement
process_waithook. - Our event-loop is while loop that will eat up CPU time is there is no event. Can we improve it?