Ruby 3 was just released today(2020/12/25) with a lot of new features, among them are:

  • Static type
  • Ractor
  • Non-blocking fiber

I’ll focus on non-blocking fiber in this post.

I have written an introduction about Fiber some months ago. In case you haven’t heard about Fiber before, here it is: Event driven non blocking IO with Ruby’s fiber

Notable change to Fiber in Ruby 3 is the new boolean option blocking for every Fiber.new operation. So what does this keyword mean?

When blocking: true, fiber will act as it is before.

The story becomes interesting when blocking: false(this is the default). It will make this fiber to become a non-blocking fiber. Inside this fiber whenever there is a blocking operation such as IO, network, sleep, etc, it will be suspended(ie yield) and transfer control to other fiber.

Obviously, the next question is how can the former fiber continue its execution when says the IO operation finished? We will need a scheduler(ie an event-loop) to keep track of which fiber is in blocking state and when it’s posible to resume.

Ruby doesn’t provide a scheduler. It’s up to us to implement the scheduler whatever we like.

In the next part, we will implement a simple scheduler.

Fiber.scheduler

A fiber scheduler is like an event-loop. It keeps track of which fiber is in blocking state and its corresponding blocking operation. When blocking operation finished or unblocked, scheduler will resume its execution.

To implement a scheduler, we must implement the following hooks:

  • io_wait
  • process_wait
  • kernel_sleep
  • block and unblock

These hooks will be called when a non-blocking fiber call a blocking operation(eg: IO(io_wait), sleep(kernel_sleep))

A naive implementation of scheduler can be like

class Scheduler
  # trigger by events: IO#wait, IO#wait_readable, IO#wait_writeable
  def io_wait(io, events, timeout)
  end

  # trigger by events: Kernel#sleep, Mutex#sleep
  def kernel_sleep(duration = nil)
  end

  # trigger by events: Process::Status.wait
  def process_wait(pid, flags)
  end

  # trigger by events: Thread#join, Mutex
  def block(blocker, timeout = nil)
  end

  # trigger when a previous block called is unblock
  def unblock(blocker, fiber)
  end
  
  # Called when current thread exits
  def close
  end
end

With a scheduler, we can start using non-blocking Fiber by calling Fiber.set_scheduler

Now, let implement a simple kernel_sleep hook.

kernel_sleep

kernel_sleep hook will be called for events such as Kernel.sleep. Our scheduler will need to support kernel_sleep and close hooks

require 'fiber'

class SimpleScheduler
  def initialize
    # our internal Hash to track which Fiber is sleeping and for how long
    @waiting = {}
  end

  def run
    # we will loop until there is no more event
    # Our event loop for now will only check for sleeping fibers
    while @waiting.any?
      @waiting.keys.each do |fiber|
        # fiber needs to wake up
        if current_time > @waiting[fiber]
          @waiting.delete(fiber)
          fiber.resume
        end
      end
    end
  end

  def kernel_sleep(duration = nil)
    # this function(and other hooks) will run in context of the fiber calling sleep
    # hence Fiber.current will be our target fiber that need to be halted
    @waiting[Fiber.current] = current_time + duration
    # halt this fiber and transfer control to its parent
    Fiber.yield
    return true
  end

  def close
    run
  end

  private
  def current_time
    Process.clock_gettime(Process::CLOCK_MONOTONIC)
  end
end

To use our SimpleScheduler, we will need to call Fiber.set_scheduler

scheduler = SimpleScheduler.new
Fiber.set_scheduler(scheduler)

# now we're ready to call sleep
# let's create two fiber
Fiber.new do
  puts "Fiber 1: sleep for 2s"
  sleep(2)
  puts "Fiber 1: wake up"
end.resume

Fiber.new do
  puts "Fiber 2: sleep for 3s"
  sleep(3)
  puts "Fiber 2: wake up"
end.resume

which when running will result

Fiber 1: Sleep 2s
Fiber 2: Sleep 3s
Fiber 1: wakeup
Fiber 2: wakeup

We can see that while fiber 1 is sleeping, it is halted and fiber 2 gets its chance to run. Our scheduler keeps track of how long these fibers needs to sleep then resume when the time comes.

io_wait

We’ll use this simple program to test io_wait

require 'net/http'
Fiber.new do
  Net::HTTP.get(URI('https://www.google.com'))
end.resume

Fiber’s document of io_wait reads:

io_wait(io, events, timeout):

events is a bit mask of IO::READABLE, IO::WRITABLE, and IO::PRIORITY.

Expected to return the subset of events that are ready immediately.

so the stragegy here is to maintain two hash inside our scheduler, @readable and @writable. We’ll use IO#select to know when there is an io object’s ready.

class SimpleScheduler
  def initialize
    # our internal Hash to track which Fiber is waiting for a read event
    @readable = {}
    # our internal Hash to track which Fiber is waiting for a write event
    @writable = {}
    @waiting = {}
  end

  def io_wait(io, events, timeout)
    # events is a bit mask of IO::READABLE, IO::WRITABLE, and IO::PRIORITY.
    unless (events & IO::READABLE).zero?
      @readable[io] = Fiber.current
    end
  
    unless (events & IO::WRITABLE).zero?
      @writable[io] = Fiber.current
    end

    # small exercise: add IO::PRIORITY
  
    # halt fiber and transfer control to its parent
    Fiber.yield
    # when it's ready to read/write, resume fiber
    return events
  end
end

now, in our event-loop, we will need to handle @readable and @writable

class SimpleScheduler
  def run
    while @readable.any? or @writable.any? or @waiting.any?
      # select's signature:
      # select(read_array [, write_array [, error_array [, timeout]]]) → array or nil
      readable, writable = IO.select(@readable.keys, @writable.keys, [], 0)

      # when there's an io that's ready
      # we remove it from our @readable/@writable and resume its corresponding fiber
      readable&.each do |io|
        fiber = @readable.delete(io)
        fiber.resume
      end

      writable&.each do |io|
        fiber = @writable.delete(io)
        fiber.resume
      end

      @waiting.keys.each do |fiber|
        if current_time > @waiting[fiber]
          @waiting.delete(fiber)
          fiber.resume
        end
      end
    end
  end
end

However, when run this scheduler against our test program(the one making http request to google), we’ll get this exception:

undefined method ‘block’ for #<SimpleScheduler>

which implies that somewhere in Ruby’s http library, block/unblock hook is called.

Let’s add block/unblock

block/unblock

# our block's implementation is pretty simple
# we maintain @blocking variable, tracking how many fibers are blocked
# increase @blocking when there's block event
# decrease it when it's resumed
def block(blocker, timeout = nil)
  @blocking += 1
  begin
    Fiber.yield
  ensure
    @blocking -= 1
  end
end

# this is when previously block fiber is unblock
# we'll use an IO.pipe to notify event-loop that there's fiber unblocked
#  ie: @urgent = IO.pipe
# @ready is an array containing fiber that are unblocked
def unblock(blocker, fiber)
  @ready << fiber
  # we'll write to the pipe to notify event-loop
  io = @urgent.last
  io.write_nonblock('.')
end

# we also need to update our event-loop
def run
  # let's review what we're tracking:
  # @readable: for fibers that are waiting to read
  # @writable: for fibers that are waiting to write
  # @waiting: for fibers that are sleeping
  # @blocking: for fibers that are blocked
  # @ready: for fibers that are unblocked
  while @readable.any? or @writable.any? or @waiting.any? or @blocking.positive? or @ready.any?
    readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], 0)

    readable&.each do |io|
      if fiber = @readable.delete(io)
        fiber.resume
      end
    end

    writable&.each do |io|
      if fiber = @writable.delete(io)
        fiber.resume
      end
    end

    @waiting.keys.each do |fiber|
      if current_time > @waiting[fiber]
        @waiting.delete(fiber)
        fiber.resume
      end
    end

    # we simply resume all fibers in @ready
    ready, @ready = @ready, []
    ready.each do |fiber|
      fiber.resume
    end
  end
end

I’ve added the complete implemetation of simple_scheduler at the end of this post.

Conclusion

Non-blocking Fiber provides a simple interface to handle non-blocking IO. To us user, we probably don’t deal directly with fiber but through other wrapper such as Async.

In Ruby 3.0, non-blocking Fiber is still in development, there’re some IO event that aren’t Fiber-friendly. We’ll need to wait sometime for it to become mature.

Lastly, there’re some improvements that I’ll leave as an exercise

  • In our IO.select call, we return immediatelly if there is no event(by passing 0 as its last parameter). It would be more efficent if it can wait for a while. How can we achieve that?
  • Implement process_wait hook.
  • Our event-loop is while loop that will eat up CPU time is there is no event. Can we improve it?