RubyのFiberを使用したイベント駆動型ノンブロッキングIO
FiberはRuby 1.9から導入されています。しかし、最近ではRubyの並行処理性能の向上に向けた積極的な開発により、再び注目されています。本記事では、Fiberの基本的な使い方について説明します。
簡単に言うと、Fiberは停止と再開が可能な関数です。この「関数を任意のタイミングで停止・再開できる」ことがどのように役立つのかを理解するために、従来の逐次実行モデルと比較してみましょう。
# sequencial.rb
def foo
  puts "foo 1"
  puts "foo 2"
end
def bar
  puts "bar 1"
  puts "bar 2"
end
foo
bar
実行順序としては、foo が完了しない限り bar は開始できません。つまり、foo が実行中の間は bar は動作しません。その結果、次のような出力になります:
foo 1
foo 2
bar 1
bar 2
このモデルでは、どのようにして foo と bar を同時に実行できるでしょうか?
簡単です。それぞれを独自のスレッドに配置すればよいのです。(ただし、MRI Ruby の GIL により foo と bar が並行実行されることはありませんが、ここでは仮に並行して実行されると想定するか、あるいは JRuby を使用することを考えましょう。)
# thread.rb
t1 = Thread.new do
  foo
end
t2 = Thread.new do
  bar
end
t1.join
t2.join
foo と bar の出力が交互に入り混じっているのが分かります。
bar 1
foo 1
bar 2
foo 2
各関数ごとに新しいスレッドを生成することで、OS(または今回の場合はRubyのVM)が foo と bar をいつどのように同時実行するかを決定します。
このモデルは プリエンプティブマルチタスキング(Preemptive Multitasking) として知られています。
一方で、協調型マルチタスキング(Cooperative Multitasking)(または 非プリエンプティブマルチタスキング(Non-Preemptive Multitasking))と呼ばれる別のモデルも存在します。
このモデルでは、OSやVMが関数のスケジュールを管理するのではなく、自分自身で関数を開始・停止・再開することで並行処理を実現します。
RubyのFiberは、タスクを自分でスケジュールするための基本的な仕組みを提供しています。
では、いくつかの例を見てみましょう。
Fiberについて
Fiberを定義するには、Fiber.new を使用します。
# fiber.rb
foo = Fiber.new do
  puts "foo 1"
  Fiber.yield
  puts "foo 2"
end
bar = Fiber.new do
  puts "bar 1"
  Fiber.yield
  puts "bar 2"
end
fiber.rb を実行しても何も起こりません。これは、まだFiberを開始していないためです。
Fiberを開始するには、Fiber#resume を使用します。
foo.resume
bar.resume
次のような出力が得られます。
foo 1
bar 1
今回の例では、Fiber.yield を使用しました。これは、現在のFiberの実行を一時停止し、制御を親側に戻すことを意味します。
最初の foo.resume の呼び出しでは、foo を最初の yield に到達するまで実行し、そこで停止します。
その後、bar を実行し、同様に最初の yield で停止します。
この動作によって、出力結果が説明できます。
次に、bar の実行を継続させ、その後 foo に切り替えることができます。
foo.resume # output: foo 1
bar.resume # output: bar 1
bar.resume # output: bar 2
foo.resume # output: foo 2
ご覧の通り、出力が foo と bar の間で交互に切り替わり、まるでマルチスレッドのような動作をしています。
次に、Fiber間でメッセージをやり取りする方法を見ていきましょう。
fiberとメッセージを送る方法
foo = Fiber.new do |a|
        b = a * Fiber.yield("hello")
        return b
      end
value1 = foo.resume(6) # inside fiber: a = 6; value1 = hello
value2 = foo.resume(7) # inside fiber: Fiber.yield return 7; val2 = 42
呼び出し元からFiberへ値を渡す
Fiberに値を渡すには、Fiber#resume の引数として渡します。
- 最初の foo.resume(6)の呼び出しでは、6がfooの初期パラメータ(つまりa)として渡されます。
- 2回目の foo.resume(7)の呼び出しでは、Fiberの内部で7がFiber.yieldの戻り値となるため、b = 6 * 7となります。
Fiberから呼び出し元へ値を渡す
Fiberから呼び出し元に値を渡すには、Fiber.yield の引数として渡します。
- 最初の Fiber.yieldの呼び出しでは、文字列"hello"をFiberの呼び出し元に返すため、value1 = "hello"となります。
- 2回目の resumeの呼び出しでは、bの値(42)が返されるため、value2 = 42となります。
次に、Fiberの応用に入る前に、無限に続くフィボナッチ数列のジェネレーター を実装してみましょう。
fibo = Fiber.new do
  a, b = 1, 1
  while true
    Fiber.yield(a)
    a, b = b, a + b
  end
end
10.times{ puts fibo.resume }
fiberの応用: EventMachineとの組み合わせ
FiberをEventMachineと組み合わせることで、より直感的にプログラムを構築する別の方法が得られます。
ここに簡単な例を示します:
require 'fiber'
require 'em-http-request'
def async_get(url)
  f = Fiber.current
  http = EM::HttpRequest.new(url).get
  http.errback { f.resume("error") }
  http.callback { f.resume(http.response) }
  return Fiber.yield
end
EM.run do
  Fiber.new do
    response = async_get("https://www.google.com")
    puts response
    EM.stop
  end.resume
end
async_get は、EventMachine のコールバック機構を Fiber 内にラップしており、コードをまるで逐次実行されているかのように見せることができます。
この使い方は、JavaScript の async/await 機構に似た動作をします。
また、コード内で Fiber.current を使用しています。これは現在実行中の Fiber を返すもので、この関数は fiber ライブラリに定義されているため、require 'fiber' が必要になります。
fiberの応用: ノンブロッキングIO
Fiber を活用することで、ノンブロッキングIOを簡単に実装できます。
ここでは、Fiber を使って エコーサーバー を作成してみましょう。
基本的なアイデアとしては、各TCP接続に対応するFiberを管理するために、Hash(ハッシュ)を維持する というものです。
class Reactor
  def initialize
    @readable = {}
    @writeable = {}
  end
end
イベントループ内では、IO.select(これはブロッキング呼び出し)を使用して、利用可能なTCP接続があるかをチェックします。
その後、その接続に対応するFiberを再開 (resume) するだけです。
class Reactor
  # ...
  def run
    while @readable.any? || @writeable.any? 
      readable, writeable = IO.select(@readable, @writeable)
      readable.each{ |io| @readable[io].resume }
      writeable.each{ |io| @writeable[io].resume }
    end
  end
end
ここでは、@readable と @writeable に追加するための2つのメソッドを定義します。
その後、Fiber を一時停止し、イベントループによって再開されるのを待ちます。
class Reactor
  # ...
  def await_readable(io)
    @readable[io] = Fiber.current
    Fiber.yield
    @readable.delete(io)
  end
  def await_writeable(io)
    @writeable[io] = Fiber.current
    Fiber.yield
    @writeable.delete(io)
  end
end
では、メインプログラムに移りましょう。
まず、オブジェクトを初期化します。
reactor = Reactor.new
server = TCPServer.new('localhost', 8080)
次に、server 用の Fiber を作成し、接続ごとに新しい Fiber を作成します。
Fiber.new do
  # wait for connection, meanwhile, halt this Fiber
  reactor.await_readable(server) 
  client = server.accept
  Fiber.new do
    # wait for this socket to be readable, meanwhile, halt it
    reactor.await_readable(client) 
    request = client.gets
    # wait for this socket to be writeable, meanwhile, halt it
    client.await_writeable(client) 
    # just send back the request then close
    client.puts(request) 
    client.close
  end
end.resume
reactor.run
このプログラムにより、スレッドやマルチプロセスを使用せずに 並行接続(concurrent connections) を処理できることが分かります。
また、await_readable と await_writeable は ES6 の async/await 機構を模倣 したものです。
補足
Fiber は Ruby 固有の概念ではなく、多くのプログラミング言語が非プリエンプティブなマルチタスキングの仕組みを持っています。例えば:
- Python: asyncioのコルーチン
- JavaScript (ES6): generator
- C++20: coroutine
名前こそ異なりますが、これらは Ruby の Fiber と同じ基本概念 に基づいています。
参考文献
このリアクターサーバーのコードは、Fibers Are the Right Solution から引用しました。