Ruby に Software Transactional Memory (STM) を入れようと思った話

技術部でRubyインタプリタの開発をしている笹田です。コロナの影響で、リモート勤務などに移行し、新しい生活スタイルを満喫されている方々がたくさんいらっしゃるんじゃないかと思います。ただ、私は以前から自主的に自宅勤務することが多かったので、正直生活がぜんぜん変わっていません。

さて、家で私が何をしているかというと、Ruby 3の準備です。その中でも、数年取り組んできた Ruby で並列処理をするための仕組みである Ractor の開発をしています(以前は Guild というコードネームで呼んでいました)。Ractor という名前は、Ruby の Actor みたいな意味で付けました。Erlang とか Elixir で有名な Actor model というアレです。厳密には、Actor model でよく言われる特性をすべて備えているわけではないのですが、並列で動く Ractor を複数作ることで並列計算機上で気楽に並列処理を行うことができます(少なくとも、それができることを目標にしています)。

Ractor は、意図的に Ractor 間でメモリの共有を排除するように設計されています。しかし、どうしても共有したいなぁ、というときのために、Software Transactional Memory (STM) という仕組みを入れようと思いました。STM を使うと、DB のトランザクションのように、何か競合したらなかったことにしてやりなおすメモリを作ることができます。

本稿では、その背景と、実際にどう作ったか、そしてどう試すのか、などについてご紹介します。

Ractor のちょっとしたご紹介

本題に入る前に、本稿を読むために必要になる、Ractor についての基礎知識を少しご紹介します。 しっかりしたリファレンスは ruby/ractor.md at master · ruby/ruby にありますので、よかったら参照してみてください。

Ractor を作って並列計算する

Ractor は、複数作ってそれらが並列に動く、ということで、並列計算機上で並列に動かすことができます。

# Ractor を生成する
r = Ractor.new do
  expr # expr は、他の Ractor とは並列に動く
end

r.take #=> expr の実行、つまり Ractor 自体の処理が終了を待ち、
       #   expr の結果を得る

この例では、Ractor.new で新しい Ractor を作り、そこで expr を実行し、その結果を Ractor#take で受け取る(r.take)、という意味になります。ここでは1つしか作っていませんが、n 個作れば、n 個の Ractor が並行に処理され(Thread と一緒)、それらがシステムで許された並列度で並列に実行されます(Thread と異なる)。

ちなみに生成時に引数を渡すと、それをブロック引数で受け取ることができます。

r = Ractor.new 10 do |n|
  p n #=> 10
end
r.take

Ractor 間ではオブジェクトは(あんまり)共有されない

Ractor 上ではたいていの Ruby のプログラムを動かすことができます。つまり、上記 expr に、いろんな Ruby の式が書けます。が、Ractor 間でオブジェクトを共有することは、基本的にはできません。

# s から参照される文字列を、新しく作った Ractor と main Ractor で共有する例
# エラーになります

s = "hello"
r = Ractor.new do
  s << "world"
end

s << "ko1"
p r.take

この例では、sで参照される "Hello"という文字列を、2つのRactor(起動時からあるmain Ractorと、Ractor.newで作る子Ractorの2つ)で共有してしまう例です。それぞれの Ractor で、String#<< で文字を結合、つまり破壊的操作をしようとしています。一般的には、並列処理において、ロックなどで排他制御をしなければならない場面です。

例えば、スレッドが並列に実行されるようなJRubyなどでは、RactorではなくてThreadでこのようなコードを動かすと、Javaレベルのエラーが起きることがあります(手元で連結を何度も繰り返すようにして試してみたら、java.lang.ArrayIndexOutOfBoundsExceptionが出ました)。余談ですが、MRIは、GIL/GVLによって並列に動くことはなく、String#<<の処理中にスレッドの切り替えが起こらないことを保証しているため、問題なく動かすことができます。が、Ruby レベルの処理ではどこで切り替えがおこるかわからないため、やっぱり排他制御ちゃんと考えないと、となります。

というわけで、もしこのようなコードによって、どんなオブジェクトも複数の Ractor から同時にアクセスできるようになってしまうと、Ractor 間での同期が必須になってしまいます。

Ractorでは、Ractor間で文字列などの共有による、排他制御が必要な状況になるのを防ぐために、いろいろな工夫をしてあります。例えば、ブロックの外側のローカル変数を参照するようなブロックを Ractor.new に渡そうとすると、Ractor.new のタイミングでエラーになります。

in `new': can not isolate a Proc because it accesses outer variables (s). (ArgumentError)

こんな感じでオブジェクトを共有できないので、「ロックをちゃんとしなきゃ」といった、難しいスレッドプログラミングに関する問題から解放されます。やったね。

Ractor 間のコミュニケーション

そうは言っても、何か状態を共有したいことはあります。また、複数の Ractor が協調して動くように作る必要もあるでしょう(何かイベントをまったり、イベントが起こるまで別の Ractor を待たせたり)。そこで、Ractor では、メモリを共有するのではなく、オブジェクトをメッセージとしてコピーして送ったり受け取ったりすることで、データを共有します。

Go で言われているらしい "Do not communicate by sharing memory; instead, share memory by communicating." ということですね。Go と異なるのは、Go はいうてもメモリをいじってコミュニケーションできてしまう(メモリを共有しているので)のですが、Ractor ではコピーしちゃうので、そもそも共有ができません。Go は「気をつけようね」というニュアンスですが、Ractor では「絶対にさせないでござる」という感じです。

Ractor 間のコミュニケーションは Ractor#sendRactor.receiveおよび、Ractor.yieldRactor#takeのペアで行います。

r1 = Ractor.new do
  while msg = Ractor.receive
    Ractor.yield msg
  end
  :fin
end

r1.send 1
r1.send 2
r1.send 3
r1.send nil
p r.take #=> 1
p r.take #=> 2
p r.take #=> 3
p r.take #=> :fin

この例では、main Ractor が、作成したRactor r1に対して、1, 2, 3, nil という値を Ractor#send でメッセージとして送っています。

r1 では、Ractor.receive で send されたメッセージを受け取って(送られるまで待つ)、それをそのまま Ractor.yield に渡しています。Ractor.yield は、他の Ractor がそのオブジェクトを Ractor#take で持っていくまで待ちます。つまり、1, 2, 3 について、Ractor.yield しているわけです。最後、Ractor.yieldは nilを返すので、while 文が止まり、ブロックは :fin を返して Ractor は終了します。

main Ractor では、Ractor#takeによって、Ractor.yield された 1, 2, 3 を受け取り、表示します。 また、4 回目の Ractor#takeによって、ブロックの返値 :fin を取ります。

というのが、コミュニケーションの方法になります。

さて、メッセージとして渡すオブジェクトは毎回コピーするとご紹介しましたが、いくつかの場合、コピーなしで受け渡されます。コピー無しで受け渡されるオブジェクトのことを「共有可能オブジェクト」と呼びます。

共有可能オブジェクトの定義はこちら:

  • 不変オブジェクトは共有される
    • 不変オブジェクトとは、そのオブジェクトが freeze されており、参照するオブジェクトがすべて共有可能であること
    • 例えば、整数オブジェクトや nil とかは、frozen で参照するオブジェクトは無いので共有可能です
  • クラス・モジュールは共有される
  • その他、特別に共有可能に作られたオブジェクト
    • たとえば、Ractor オブジェクトは共有可能
    • 今回ご紹介する Ractor::TVar も共有可能オブジェクト

共有可能オブジェクトを他の Ractor に送るときは、コピーせずにリファレンスだけ送ります(共有されても、おかしなことは起こらないだろうから。もしくは、共有されても、おかしなことが起こらないように特別に設計されているから)。

それから、渡すときにコピー以外にも move が選べますが、ちょっと長くなってきたのでこの辺で。Ractor に関しては、いろんな話があります。再掲になりますが、詳細は ruby/ractor.md at master · ruby/ruby をご参照ください。

Software Transactional Memory (STM)

Ractor ではメッセージのやりとりで共有できるんですが、やっぱり一部はメモリを直接共有したいこともあるかもしれません(ないかもしれません、ちょっとわからない)。そこで、Software Transactional Memory (STM) という仕組みを入れるのはどうかと考え、実装してみました。最新の Ruby で gem でインストールすれば使えるようになっているので、よかったら試してください。

以降は、その STM の話をご紹介します。

STM が必要な背景

ちょっとしたデータを Ractor 間で共有したい例として、例えば、何かプログラム全体で数を数えたい、ってのがあります。さばいたリクエスト数かもしれません。処理したデータの総サイズを数えたいかもしれません。こういう、Ractor 間でちょっとしたデータを共有する方法が、今はありません。強いて言えば、そのデータを管理する専用のRactorを作ることで行うことができます(専用じゃなくてもいいけど、何か管理するやつ)。

counter = Ractor.new do
  cnt = 0
  while msg = Ractor.receive
    case msg
    in [:increment, n]
      cnt += n
    in [:decrement, n]
      cnt -= n
    in [:value, receiver]
      receiver.send cnt
    end
  end
end

counter << [:increment, 1] # Ractor#send は Ractor#<< という alias を持っています
counter << [:increment, 2]
counter << [:increment, 3]
counter << [:value, Ractor.current]

p Ractor.receive #=> 6

この例では、カウンターを管理するためだけの Ractor を用意してみました。実際、Actor モデルの言語では、こんな感じで作ることが多いんじゃないかと思います。そして、こういうのを簡単につくるためのライブラリが用意されています。例えば Elixir なんかだと Agent(Agent — Elixir v1.11.2。日本語での詳細は、12月に出版される プログラミングElixir(第2版) | Ohmsha とかがお勧めですよ!)とかですかね。複数の Ractor 間で安全に共有できる、変更可能な状態を作るときは、こんな感じにします。

が、もうちょっと楽に書きたいなぁ、という気分があります。「カウンターごとに Ractor 作るんですか?」って感じです(まだ、Ractor の生成は遅いのです。Thread と同程度に)(べつに、カウンターごとに作らないで、すべてのカウンターを管理する Ractor を作る、みたいな方法でできんこともないです。単純なカウンターの集合だけなら)。

そこで、メモリを共有する仕組みを用意するのはどうでしょうか。cnt = Counter.new(0) としたら、cntは複数の Ractor で共有できる、みたいな感じです。ただ、値の increment でも、ロックが必要です(Thread-safe の説明の例でよくあるアレです)。

じゃあ、ロックしないとアクセスできないようなインターフェースにすると、どうでしょうか。ロックを持たないでアクセスするのを禁止すれば、うっかりロックを忘れてしまうこともなさそうです(エラーになって気づく)。ちゃんとロックをするようにすれば、Ractor 間で排他制御されるので、まずい問題が起こらない気がします。

やってみましょう。

cnt = Counter.new(0)

r = Ractor.new cnt do
  cnt.lock{
    cnt.value += 1
  }
end

cnt.lock{ cnt.value += 2 }

r.take
p cnt.lock{ cnt.value } #=> 3

良さそうです!

さて、ここでカウンタを 2 個にしてみましょう。そして、2つのカウンタは同時に動く必要があるとしましょう。そうですね、c2 = c1 * 2 となるような関係になるという特殊なカウンタです。ロックをうまく使えば大丈夫ですかね?

c1 = Counter.new(0)
c2 = Counter.new(0)

r1 = Ractor.new do
  c2.lock do
    c1.lock do
      c1.value += 2
      c2.value = c1.value * 2
    end
  end
end

c1.lock do
  c2.lock do
    c1.value += 1
    c2.value = c1.value * 2
  end
end

#...?

こんな感じでしょうか。

実は、このプログラムはデッドロックしてしまいます。というのも、main Ractor は c1 -> c2 の順でロックをしていきます。r1 は、c2 -> c1 の順です。このとき、運悪く次のような順にロックしていくと、デッドロックしてしまいます。

  • main: c1.lock
  • r2: c2.lock
  • main: c2.lock -> できないので待つ
  • r2: c1.lock -> できないので待つ

こうならないためには、ロックの順番を、複数の Ractor でそろえる(c1->c2とか)必要があります。

とか考えていくと、ロックのアプローチはいまいちです。うっかり順番間違えるとか、普通にありそうじゃないですか。

STM のよさ

そこで使えそうなのが STM です。DB なんかで Transaction の話はよくご存じの方は多いと思いますが、これをメモリに適用したのが STM で、2010 年くらいに言語処理系界隈で研究が盛んでした。でも、今ではあんまり聞かないですねえ。言語についている STM としては、Clojure とか Haskell (Concurrent Haskell) が有名だと思います。Erlang/Elixir における mnesia も STM と Wikipedia には書いてありました、があれは DB だよなぁ。

STM は、DB のトランザクション(楽観的ロック)と同じように、とりあえずなんか読み書きして、あとで、「あ、別の Ractor とアクセスが被った!」となったらロールバックしてしまいます。簡単ですね。

ロック(悲観的ロック)と何が違うかというと、さっきの順序の問題が現れないんですよね。そもそも「ここからトランザクションです」のように指定するので、ロックの順序がない。この性質を、composable であるといいます。複数の排他制御が必要とする操作を、まとめても問題ないという良い性質です。

STM のデメリットは、操作が衝突してロールバックが多発するとむっちゃ遅くなっちゃうんですよね。この辺はフロー制御をなんとかする、みたいな研究がいろいろあります。たとえば、衝突しまくってそうなら、実行可能なスレッド(今回は Ractor)を絞っちゃうとか。

まぁあと、楽観ロックなので、みんなが read しかしないような場合は、どの処理も並列に実行可能なので速そうです。それから、進行性保証的な話もあったりして、いろいろメリットがあります。

どんな STM を作るのか

STM にもいろいろな流派があります。

  • そもそも、Software じゃない Hardware でやる HTM って分野があります。CPU がサポートしたりしています。が、あんまり最近聞かないですねえ。
  • メモリ操作を全部 transaction の対象にしてしまうという STM があります。C++ とかで多いですね。X10 という昔ちょっとかかわってた言語では、言語組み込みにこういう STM がありました。
  • 特定のメモリを transaction 対象にするという STM があります。特定のメモリしか扱わないので、それ以外のメモリはロールバックしてももとに戻りません。
  • 操作の衝突の定義もいろいろあります。

Ruby の場合は、全部ロールバックできないので(作るのスゴイ大変)、一部のメモリだけを対象にする、というようにします。具体的には、Ractor::TVar.newTVar は Transactional Variable の略)が保持する値のみ、transaction で何か問題があったらロールバックします。そして、Transaction の範囲は Ractor.atomically に渡すブロック中ということにします。

というインターフェースが、実は Class: Concurrent::TVar — Concurrent Ruby にあったんですよね。Concurrent Ruby は、Thread を対象にしています。このインターフェース踏襲し、Ractor でも使えるようにしたのが Ractor::TVar です。

先ほどのカウンターの例だと、こんな感じで書けるようにするといいでしょう。

c1 = Ractor::TVar.new(0)
c2 = Ractor::TVar.new(0)

r1 = Ractor.new c1, c2 do |c1, c2|
  # 外側のローカル変数は見えないから引数で渡す
  Ractor.atomically do
    c1.value += 2
    c2.value = c1.value * 2
  end
end

Ractor.atomically do
  c1.value += 1
  c2.value = c1.value * 2
end

main Ractor と子 Ractor で、変更が競合してしまった場合は、どちらかのブロックが再実行されます。先に紹介した通り、ロールバックされるのは Ractor::TVar#value の値だけなので、例えばインスタンス変数への代入などは残ってしまいます。IO 処理なんかも取り返しがつきません。そのため、Ractor.atomically に渡すブロックは、できるだけシンプルにする必要があります。

Ractor.atomically は自由にネストすることができます。この性質が、composable である、という話です(ロックですと、ロックの順番に気を付けないといけませんでした)。

TVar は共有可能オブジェクトなので、他の Ractor に渡すことができます。TVar に設定できる値は、他の Ractor から見えることになるので、共有可能オブジェクトに制限されます。たとえば、mutable な文字列などは渡せません。

トランザクションは、次のようなプロセスで管理されます。

  • (1) トランザクションの生成・開始
  • (2) TVar の読み書き
  • (3) トランザクションのコミット

このとき、(2) および (3) のタイミングで競合を検知し、必要ならロールバックを行って (1) に戻ります。

  • (a) (2) において、read した値がすでに他の Ractor に書き換えられていた→ロールバック
  • (b) (3) において、read した値が、すでに他の Ractor で書き換えられていた
  • (c) (3) において、write しようと思ったら、すでに他の Ractor に書き換えられていた

(c) は直観的だと思いますが(git で push しようとしたら、先に他の人が変更していて書き換えられなかった、みたいな話です)、(a), (b) はちょっと意外ではないでしょうか。つまり、書き換えの行わないトランザクションでも、ロールバックは発生し得る、という話です。

この読み込みだけでロールバックしてしまう、という挙動は、2つ以上の値を読み込むときに重要になります。tv1.value, tv2.value 2値を取り出すとき、tv1 を読み込んだ後で、他の Ractor が tv2 を書き込み、それを main Ractor で読み込んだ時、tv1tv2 が一貫性を持たない状態である可能性が出てきます。そのため、(b), (c) のタイミングで、適切な tv1, tv2 を読み込めているかチェックする、という話になります。まだちょっとわかりづらいですね。

例えば tv1 に配列のインデックス、tv2 に配列が格納されているとき、tv1 のインデックスを読み込んだ後、なんやかんやがあって他の Ractor で tv2 の配列が切り詰められたとします。このとき、すでに読み込んだインデックスは tv2 の配列の長さを超えているかもしれません。問題です。

これはつまり、tv1tv2 の一貫性が取れていない、という状況です。TVar では、このようなことが起こらないように、上記 (a)~(c) をトランザクションのロールバックタイミングとしています。

さて、1つの値だけを読みだすとき、Ractor.atomically が必要かどうかは議論が必要なところです(例えば、p Ractor.atomically{ c1.value } と書かなければならないのか、p c1.value と書くだけでよいのか)。というのも、この処理は複数読み込みもせず、write もないので、一貫性制御が要らないような気がするからです。実際、Clojure の STM や、Concurrent-ruby の TVar は、トランザクション内でなくても値を読みだすことだけはできるようになっています。

我々は、このときも Ractor.atomically を必須としました。というのも、c1.value + c2.value のように、2つ以上の値を読み込むために、うっかり Ractor.atomically を書き忘れそうな気がしたからです。

あと、カウンタとして使おうとすると、increment 処理をよくやると思うので、Ractor.atomically{ tv.value += 1 } のショートカットである Ractor::TVar#increment(n=1) を用意しています。

STM の限界

composable に記述できる STM ですが、たとえば同一トランザクション内で処理しなければならないのに、複数のトランザクションに分けてしまう、という問題はいかんともしがたいです(意図的かもしれないのでエラーにできません)。

c1 = Ractor::TVar.new(0)
c2 = Ractor::TVar.new(0)

r1 = Ractor.new c1, c2 do |c1, c2|
  Ractor.atomically do
    c1.value += 2
  end
  # 本当はここで transaction を切ってはいけない!!
  Ractor.atomically do
    c2.value = c1.value * 2
  end
end

c1 を変更後、c2 を変更する前に他の Ractor が c1, c2 を観測すると、c2 = c1 * 2 という関係が崩れている(一貫性がない)瞬間を目撃できるのです。

ちなみに、何かのカウンタなら多少の誤差は許されることもあるかもしれませんが、例えば STM でよく例に出てくる銀行口座の残高の移動というタスクにおいては大問題になってしまうかもしれません。例えば、A さんから B さんに n 円送金するとき、A さんから残高を減らして、B さんに残高を追加する、という処理になります。このとき、Aさんから残高を減らしたタイミングで他の Ractor から A, B 各氏の口座が観測され、世界から n 円消える、という瞬間を目撃していまいます。それはまずい、あってはならないことです。

(STM 自体はこのように、口座残高のような、同時に複数のデータをいっきに変える(一貫性のない状態を、ほかから見えないようにする)ときに使うことが多いと思います)

さて、この例では恣意的で、こんなミスは起こさないような気がするのですが、例えば、

def add_2 tv
  Ractor.atomically{ tv.value += 2 }
end

def set_twice tv1, tv2
  Ractor.atomically{ tv2.value = tv1.value * 2}
end

のように定義していれば、add_2(c1); set_twice(c1, c2) のように記述してしまう可能性は十分あります。

どーにかなんないか考えてみたのですが、トランザクションでの read/write のログを取れるようにしておいて、問題が発覚したら、そのログを見つめるなり自動解析ツールなりを作って、トランザクションが分かれていないかチェックする、みたいなことくらいしかできないかなぁ、と思っています。良いアイディアをご存じでしたら教えてください。

そういえば、TVar という名前は concurrent-ruby からとりましたが、T って色々ありますよね。なので、TxVar とかもう少し冗長でもいいかなぁ、などという気分があります。どうしよっかな。

STM の実装

あんまり中身の話をしてもしょうがないような気がしますが、こんなアルゴリズムで実現しています。

  • (1) トランザクション開始時に、現在の時刻 T を取得する
  • (2) TVar の読み込み時 / 書き込み
    • 書き込み時には、TVar には書かず、Ractor local なトランザクションログに書き込む
    • 読み込み時には、トランザクションログにその TVar の書き込み履歴があれば、Ractor local なその最新の値を返し、なければ読み込む。このとき、TVar に記録された最終書き込み時間と、開始時に記録した T を比較し、T が古ければればロールバック。新しければ読み込み完了だが、ついでにトランザクションログに載せておく(次の read 時は、TVar を読む必要がなくなる)
  • (3) コミット
    • コミット時、トランザクションログに記録された TVar たちについて、最終書き込み時間が T より新しくないことを確認
    • 時刻を1進める。この時の時刻を T' とする。
    • 書き込みが必要な TVar には、変更を反映。このとき、その TVar の最終書き込み時間が T' となる。

あんまり難しそうじゃないんですが、なんかこれで動くみたいです。論文的には、TL2 という方式(をちょっと弄っている)なんだそうです。

ちなみに STM を作った本当の経緯。以前から STM が欲しいと思っていました。そこで、9月にとった夏休みに STM の実現方法についてのアイディアが思いついたので、実装したら動いたヤッター、俺スゴイ、となったのです。で、調べてみたら、すでに誰かが提案していて、しかも自分が考慮していなかった箇所とかあったり、名前までついていたという。新しいことを考えるのは大変ですね(いや、別に今回は新しいことは目指してはいなかったんですが)。

ロールバックは、(2) の時は単純に例外を投げるようにしています。(3) のときは、コミットする関数で成功失敗を返し、失敗していたら最初からやりなおす、という実装にしています。

なお、Ractor と言ってますが、Thread 間でも同じように TVar が使えます。なので、Ractor ごとにトランザクションログをもつのではなく、Thread ごとに持つようにしています。この辺で Thread::TVar にするのか Ractor::TVar にするのか悩んだんですが、結局 Ractor::TVar がいいかなぁ、と思い至りました。

Ruby 3.0 における STM

この提案を、Ruby 3.0 の機能として提案してみたのですが、良さがわからん、ということで reject されました(Feature #17261: Software transactional memory (STM) for Threads and Ractors - Ruby master - Ruby Issue Tracking System)。残念。まぁ、確かに Actor っぽい仕組みでメモリを分けたのに、また別の仕組みを入れるのか、という気はしないでもないです。

ただ、実際これないとプログラム書きづらいと思うんだよなー。どうかなー。

ということで、gem を用意しました。

ractor-tvar に Ractor::TVar だけ入っていて、ractor gem は、ractor-tvar への依存があるため、gem install ractor すれば入ります。ractor gem のほうは、今は空ですが、Ractor に関するいろいろなユーティリティーを入れられるようにしようと思っています。

require 'ractor/tvar' で使えるようになります。なお、当然ですが、開発中の Ruby 3.0 上でしか動かせません(そもそも拡張ライブラリがビルドできません)。もう入っていますか?

最初は本体組み込みを前提に STM を実装していたのですが、gem に切り出すために変更が入り、性能が若干落ちています。また、コンテンションマネージメントをまじめにやっていないため、ロールバックが多発するようなシチュエーション(つまり、ある TVar への書き込みが激しいとき)では性能が凄く下がります。逐次実行時より下がります。

性能評価はまじめにやる時間がないのでスキップしますが、いくらかの評価が先のチケット(Feature #17261: Software transactional memory (STM) for Threads and Ractors - Ruby master - Ruby Issue Tracking System)にありますのでご参照ください。

おわりに

本稿では、Ruby に STM を入れたいと思った話と、それからその仕様と実装を軽くご紹介しました。Ruby 3.0 には入らないのですが、gem で使えるので、お試しいただけると良いかもしれません。

STM については、いろいろ偉そうに書きましたが、だいたいこの書籍の受け売りです: Amazon | Transactional Memory, 2nd Edition (Synthesis Lectures on Computer Architecture) | Harris, Tim, Larus, James, Rajwar, Ravi, Hill, Mark | Network Administration

また、Ruby と STM について、Context on STM in Ruby という記事が出ています。

Ruby に STM 入ると、あまり注目されない STM もまた盛り上がる気がします。性能チューニングや記事中に書いたデバッグ支援など、いろいろやることがあるので、興味ある言語処理系の研究者の方とか、共同研究とかどうでしょうか。ちゃんとやれば、学生さんの卒論修論くらいにはなるんじゃないかと思います。

さて、Ruby 3.0 は、そんなわけで Ractor も入るし他にもいろいろ入るし、夢いっぱい楽しさイッパイのリリースです。多分。そんなすてきな Ruby 3.0 をいち早くご紹介するイベントを開催するので、年の瀬ですが、もしよかったらご参加ください。

Ruby 3.0 release event - connpass

では、12月の Ruby 3.0 リリースをお楽しみに!

2020年のクックパッドAndroidアプリのアーキテクチャ事情

こんにちは、モバイル基盤部の加藤です。普段はモバイルアプリの基盤技術の整備や品質管理の業務に携わっています。 今回はクックパッドAndroidアプリ(以後クックパッドアプリ)の2020年時点でのアーキテクチャの紹介をしたいと思います。

アーキテクチャ導入以前のクックパッドアプリ

2017年以前クックパッドアプリにはアーキテクチャと呼べるようなものが存在していませんでした。大まかに API 通信や DB 操作等のデータ取得箇所を分離し、複雑なロジックを持つ場合は Manager, Util 等の強いオブジェクトが生成されていましたが、それ以外は Activity / Fragment に処理を直接記述することがほとんどでした。

そういった状況の中で今後もアプリを継続的に開発可能にすることを目的にアーキテクチャの導入が始まりました。クックパッドアプリでは iOS/Android 両プラットフォームで VIPER アーキテクチャを採用し、現在に至ります。

VIPER アーキテクチャ

クックパッドアプリで VIPER アーキテクチャを選定した理由を説明する前に、簡単に VIPER アーキテクチャを紹介します。

VIPER は View, Interactor, Presenter, Entity, Routing の頭文字を並べたもので、アーキテクチャはこれらの要素と Contract (契約)を元に構成されます。クックパッドアプリでは VIPER の要素を画面(Activity / Fragment)ごとにまとめ、VIPER の1かたまりを シーン(Scnene) と読んでいます。 これらの要素は大まかにそれぞれ以下のような責務を持ちます(フローに合わせて順序を変えています)。

  • View
    • Entityを描画する。実装クラス(Activity / Fragment)は UI を更新し、UI操作をもとに Presenter を呼び出す。
  • Presenter
    • Presentation Logic の起点を示す。実装クラスは Interactor, Routing を呼び出してPresentation Logic を実装する 。
  • Interactor
    • Presentation Logicを実現する為のBusiness Logicを示す。実装クラスは Presenter からリクエストを受け、ビジネスロジックを処理し、結果を Presenter に返す。
  • Routing
    • 発生しうる画面遷移を示す。実装クラスは Presenter からリクエストを受け画面遷移を行う。
    • 一部の記事では Router となっていますが、社内では Routing と読んでいます。
  • Entity
    • VIPER シーン中で利用されるデータそのもの。
  • Contract
    • 上記の要素を内容を定義する VIPER の核。

例えばレシピを描画するような画面の場合、以下のように Contract を定義し VIPER を構築します。

interface RecipeContract {
    interface View {
        fun renderRecipe(recipe: Recipe)
    }

    interface Presenter {
        fun onRecipeRequested(recipeId: Long)
        fun onNavigateNextRecipe(recipeId: Long)
    }

    interface Interactor {
        fun fetchRecipe(recipeId: Long): Single<Recipe>
    }

    interface Routing {
        fun navigateNextRecipe(recipeId: Long)
    }

    data class Recipe(
        id: Long,
        title: String
    )
}

さらに詳しい内容については https://www.objc.io/issues/13-architecture/viper/ 等の他記事を参照してください。 クックパッドアプリではこの VIPER を少し拡張して利用しています。具体的に拡張した箇所については後述します。

また先日サマーインターンシップでクックパッドの VIPER を題材にした技術講義を行ったので、より詳しい実装についてはこちらを参照してください。

スライド: https://speakerdeck.com/ksfee684/cookpad-summer-internship-2020-android

リポジトリ: https://github.com/cookpad/cookpad-internship-2020-summer-android

選定理由

クックパッドアプリで VIPER を採用した理由は主に3つありました。

5年先を見据えて選定

Android というプラットフォームは常に進化を続けています。プラットフォームの進化はアーキテクチャにも大きく関わり、新たな要素が使いづらいようなアーキテクチャでは継続的に開発を行うことは難しいです。実際にアーキテクチャ選定時の2017年から今までで Jetpack Compose や Kotlin Coroutines 等、Android アプリ開発において新たな要素が登場しています。こういった新たな要素を吸収することが可能であり長期的に開発を継続することが可能なアーキテクチャ、具体的には 5年 を見据えて選定を行いました。

VIPER アーキテクチャが5年耐えると判断した根拠は後述の2つの要素が中心となっています。

Contract による制約

VIPER は上述したように VIPER の各要素の内容を Contract として定義し、それに基づいて実装します。この Contract による制約は他のアーキテクチャではほとんど見られない要素であり、各要素の責務とその内容を可視化し、非常に見通しのよいコードを実現できます。

またプラットフォームの進化に合わせて VIPER の概念を拡張する場合には、この Contract を拡張すればよく、Contract でいかに定義するかを考えながらチームで議論することで、よりよいアーキテクチャを育てていける非常に拡張性の高いアーキテクチャだと判断しました。

View を中心としたイベントフロー

VIPER は View(UI) のイベントトリガーを中心にフローが構成されています。View を中心にしてフローを構築する場合、ユーザの操作仕様を直接反映するようにコードを実装する必要があります。そこで VIPER アーキテクチャに合わないような実装が必要となった場合、ユーザ体験を損ねる状態になっていると判断ができることを期待し、VIPER を選択しました。

実際にアーキテクチャに適合しないような無理のある実装があった場合には、実装やそもそもの機能仕様に問題が無いかを考えるきっかけとなっており、他のアーキテクチャではなかなか実現できなかったことだと捉えています。

2020年のクックパッドアプリのアーキテクチャ

VIPER アーキテクチャを拡張させながらクックパッドアプリに最適なアーキテクチャを今も模索しています。現在のクックパッドアプリのアーキテクチャは大雑把に以下の図のようになっていますが、その中でもアーキテクチャ導入時点からクックパッドアプリで導入した内容についていくつか紹介します。

f:id:ksfee:20201116204359p:plain

Rx によるデータフロー

VIPER の概要を説明した際に記述したように、Presenter からリクエストをうけた Interactor はデータを Presenter に返す必要があります。クックパッドではこの処理を Rx を利用してフローを構築しており、Interactor からは Observable が返ります。Presenter は受け取った Observable を subscribe し、そこで流れる Entity を View に受け渡し、UI の更新を促します。後述する Interactor から先の Domain レイヤーでも同様に Rx を利用してフローが構築されています。

最近では一部の実装で Kotlin Coroutines も利用されていますが、まだ Rx から乗り換えるという判断までは至っていません。今後 Kotlin Coroutines / Flow 等の Jetpack コンポーネントでのサポートが拡大した際には乗り換えるかもしれません(Presenter, Interactor 間のやり取りは非常に簡素なものが多いため、Coroutines への乗り換えも比較的簡単に行えるようになっています)。

Domain レイヤー

VIPER から下、具体的には Interactor から下のレイヤーについて説明します。

Interactor は Presenter からリクエストを受けた際、必要なデータを集め Presenter に返します。この時必要なデータを API や DB などから取得しますが、クックパッドアプリではここを Domain レイヤーとしてレイヤー構造を築いています。レイヤーは DataSource, DataStore, UseCase の3つからなり、それぞれ以下のように役割を分けています。

  • DataSource
    • API / DB / メモリからデータの操作を行う
    • 例: API の CRUD 操作
  • DataStore
    • 同じデータに対して複数の DataSource を参照する場合、それらの操作を抽象化して操作を行う
    • 例: API とインメモリキャッシュの操作を抽象化
  • UseCase
    • 共通化したいビジネスロジックを Interactor から切り出したもの
    • 例: 複雑な条件のダイアログ表示の判定

Interactor は DataSource, DataStore, UseCase からそれぞれ必要なデータを取得し、ビジネスロジックを構築します。Domain レイヤーとVIPERレイヤーで世界を分断することで、互いに及ぼす影響を最小限に抑えることでできるよう、Domain レイヤーで扱うデータ型と各 VIPER シーンで利用するデータ型(Entity) は異なっており、Interactor で VIPER の Entity への置き換えが行われています。

Paging の追加

2018年に発表された AAC の1つである Paging ライブラリはページング処理を RecyclerView で扱う際に非常に便利です。クックパッドアプリでもこの Paging ライブラリによるページネーションを実装していますが、Paging の DataSource (以降 PagingDataSource)をどのように実装するかが議論になりました。

PagingDataSource から返る PagedList は直接 Adapter に読み込むため、VIPER では PagedList で扱うオブジェクトは Entity である必要があります。通常であればこのような変換は Interactor で行いますが、Interactor は Presenter からのみ呼び出すことにしており PagingDataSource のどこで変換を行うかが問題になりました。

そこで Domain レイヤーから返るオブジェクトの変換と周辺ロジックをまとめて Paging という新たな VIPER の要素として定義し、ページング処理が必要となる画面はそれほど多くないため必要な画面のみ Paging を別途用意する方針としました。

今後の課題

現在のアーキテクチャは決して完璧なものではなく、開発を続けていくなかでいくつも課題は出てきます。その中でも現在進行系で直面している課題について少し紹介します。

ViewModel の位置付け

現在クックパッドアプリでは AAC の ViewModel は、Activity / Fragment でのみ利用する状態管理オブジェクトとして利用されています。先程紹介した Paging を持つ ViewModel も存在しますが、状態管理オブジェクト以上の責務を持つことはありません。

しかしただの状態管理オブジェクトとしては Android 開発において存在が大きく開発の混乱の元となってしまっており、現在アーキテクチャへ組み込む方法を検討しています。今のところは View を廃止して ViewModel に実装を寄せ、Paging を ViewModel に取り込むという意見が強く、今後議論を重ねてさらにアーキテクチャを拡張する予定です。

ボイラープレートの多さ

Contract を定義する事でコードの見通しが良くなるというメリットはありますが、その一方でVIPER は構成する要素が多く、新たに VIPER シーンを構築するために多くのファイル及び実装が必要となります。

この課題に対して AndroidStudio の LiveTemplate でファイル生成の簡略化を試みましたが、コストがかかるのは実装であり、あまりコストの軽減にはつながらずうまく行きませんでした。これはユニットテストにおいても同様のことがあり、こちらについては自動生成を行うことでコストの軽減につなげる余地がありそうなため今後検討していきたいと考えています。

まとめ

クックパッドでは今回紹介したようなアーキテクチャの改善を開発に関わる全てのメンバーで共有しながら進めています。こういった開発スタイルに興味がある Android エンジニアの方はぜひご連絡ください。

https://info.cookpad.com/careers/

nerman: AllenNLP と Optuna で作る固有表現抽出システム

事業開発部の @himkt です.好きなニューラルネットは BiLSTM-CRF です. 普段はクックパッドアプリのつくれぽ検索機能の開発チームで自然言語処理をしています.

本稿では,レシピテキストからの料理用語抽出システム nerman について紹介します. nerman の由来は ner (固有表現抽出 = Named Entity Recognition) + man (する太郎) です. クックパッドに投稿されたレシピから料理に関する用語を自動抽出するシステムであり,AllenNLP と Optuna を組み合わせて作られています. (コードについてすべてを説明するのは難しいため,実際のコードを簡略化している箇所があります)

料理用語の自動抽出

料理レシピには様々な料理用語が出現します. 食材や調理器具はもちろん,調理動作や食材の分量なども料理用語とみなせます. 「切る」という調理動作を考えても,「ざく切りにする」「輪切りにする」「みじん切りにする」など,用途に合わせて色々な切り方が存在します. レシピの中からこのような料理用語を抽出できれば,レシピからの情報抽出や質問応答などのタスクに応用できます.

料理用語の自動抽出には,今回は機械学習を利用します. 自然言語処理のタスクの中に,固有表現抽出というタスクが存在します. 固有表現抽出とは,自然言語の文(新聞記事などの文書が対象となることが多いです)から人名や地名,組織名などの固有表現を抽出するタスクです. このタスクは系列ラベリングと呼ばれる問題に定式化できます. 系列ラベリングを用いた固有表現抽出では,入力文を単語に分割したのち各単語に固有表現タグを付与します. タグが付与された単語列を抽出することで固有表現が得られます.

固有表現抽出の例
一般的な固有表現抽出(上)と料理用語への応用例(下)

今回は人名,地名などの代わりに食材名,調理器具名,調理動作の名前などを固有表現とみなしてモデルを学習します. 詳細な固有表現タグの定義は次の章で説明します.

データセット

機械学習モデルの学習には教師データが必要です. クックパッドでは言語データ作成の専門家の方に協力していただき,アノテーションガイドラインの整備およびコーパスの構築に取り組みました. レシピからの固有表現抽出については京都大学の森研究室でも研究されています(論文はこちら. PDF ファイルが開かれます). この研究で定義されている固有表現タグを参考にしつつ,クックパッドでのユースケースに合わせて次のような固有表現タグを抽出対象として定義しました.

固有表現タグの一覧
固有表現タグの一覧

この定義に基づき,クックパッドに投稿されたレシピの中から 500 品のレシピに対して固有表現を付与しました. データは Cookpad Parsed Corpus と名付けられ,社内の GitHub リポジトリで管理されています. また,機械学習モデルで利用するための前処理(フォーマットの変更など)をしたデータが S3 にアップロードされています.

Cookpad Parsed Corpus に関するアウトプットとして論文化にも取り組んでいます. 執筆した論文は自然言語処理の国際会議である COLING で開催される言語資源に関する研究のワークショップ LAW(Linguistic Annotation Workshop)に採択されました. 🎊

タイトルは以下の通りです.

Cookpad Parsed Corpus: Linguistic Annotations of Japanese Recipes
Jun Harashima and Makoto Hiramatsu

Cookpad Parsed Corpus に収録されているレシピは固有表現の他にも形態素と係り受けの情報が付与されており, 現在大学等の研究機関に所属されている方に利用いただけるように公開の準備を進めています.

準備: AllenNLP を用いた固有表現抽出モデル

nerman ではモデルは AllenNLP を用いて実装しています.

github.com

AllenNLP は Allen Institute for Artificial Intelligence (AllenAI) が開発している自然言語処理フレームワークであり, 最新の機械学習手法に基づく自然言語処理のためのニューラルネットワークを簡単に作成できる便利なライブラリです. AllenNLP は pip でインストールできます.

pip install allennlp

AllenNLP ではモデルの定義や学習の設定を Jsonnet 形式のファイルに記述します. 以下に今回の固有表現抽出モデルの学習で利用する設定ファイル(config.jsonnet)を示します. (モデルは BiLSTM-CRF を採用しています.)

  • config.jsonnet
local dataset_base_url = 's3://xxx/nerman/data';

{
  dataset_reader: {
    type: 'cookpad2020',
    token_indexers: {
      word: {
        type: 'single_id',
      },
    },
    coding_scheme: 'BIOUL',
  },
  train_data_path: dataset_base_url + '/cpc.bio.train',
  validation_data_path: dataset_base_url + '/cpc.bio.dev',
  model: {
    type: 'crf_tagger',
    text_field_embedder: {
      type: 'basic',
      token_embedders: {
        word: {
          type: 'embedding',
          embedding_dim: 32,
        },
      },
    },
    encoder: {
      type: 'lstm',
      input_size: 32,
      hidden_size: 32,
      dropout: 0.5,
      bidirectional: true,
    },
    label_encoding: 'BIOUL',
    calculate_span_f1: true,
    dropout: 0.5,
    initializer: {},
  },
  data_loader: {
    batch_size: 10,
  },
  trainer: {
    num_epochs: 20,
    cuda_device: -1,
    optimizer: {
      type: 'adam',
      lr: 5e-4,
    },
  },
}

モデル,データ,そして学習に関する設定がそれぞれ指定されています. AllenNLP はデータセットのパスとしてローカルのファイルパスだけでなく URL を指定できます. 現状では httphttps,そして s3 のスキーマに対応しているようです. (読んだコードはこのあたり) nerman では train_data_path および validation_data_path に S3 上の加工済み Cookpad Parsed Corpus の学習データ,バリデーションデータの URL を指定しています.

AllenNLP は自然言語処理の有名なタスクのデータセットを読み込むためのコンポーネントを提供してくれます. しかしながら,今回のように自分で構築したデータセットを利用したい場合には自分でデータセットをパースするクラス(データセットリーダー)を作成する必要があります. cookpad2020 は Cookpad Parsed Corpus を読み込むためのデータセットリーダーです. データセットリーダーの作成方法については公式チュートリアルで 説明されているので詳しく知りたい方はそちらを参照いただければと思います.

設定ファイルが作成できたら, allennlp train config.jsonnet --serialization-dir result のようにコマンドを実行することで学習がはじまります. 学習のために必要な情報すべてが設定ファイルにまとまっていて,実験を管理しやすいことが AllenNLP の特徴の1つです. serialization-dir については後述します.

今回の記事では紹介しませんが, allennlp コマンドには allennlp predict allennlp evaluate などの非常に便利なサブコマンドが用意されています. 詳しく知りたい方は公式ドキュメントを参照ください.

nerman の全体像

以下に nerman の全体像を示します.

nerman の全体像
nerman の全体像

システムは大きく分けて 3 つのバッチから構成されています.それぞれの役割は以下の通りです.

  • (1) ハイパーパラメータ最適化
  • (2) モデルの学習
  • (3) 実データ(レシピ)からの固有表現抽出(予測)

本稿では,順序を入れ替えて モデルの学習 => 実データでの予測 => ハイパーパラメータ最適化 の順に解説していきます.

モデルの学習

モデルの学習バッチは以下のようなシェルスクリプトを実行します.

  • train
#!/bin/bash

allennlp train \
  config/ner.jsonnet \
  --serialization-dir result \
  --include-package nerman

# モデルとメトリクスのアップロード
aws s3 cp result/model.tar.gz s3://xxx/nerman/model/$TIMESTAMP/model.tar.gz
aws s3 cp result/metrics.json s3://xxx/nerman/model/$TIMESTAMP/metrics.json

準備の章で解説したように, allennlp train コマンドでモデルを学習します. --serialization-dir で指定しているディレクトリにはモデルのアーカイブ(tar.gz 形式), アーカイブファイルの中にはモデルの重みの他に標準出力・標準エラー出力,そして学習したモデルのメトリクスなどのデータが保存されます.

学習が終わったら, allennlp train コマンドによって生成されたモデルのアーカイブとメトリクスを S3 にアップロードします. (アーカイブファイルにはモデルの重みなどが保存されており,このファイルがあれば即座にモデルを復元できます.) また,メトリクスファイルも同時にアップロードしておくことで,モデルの性能をトラッキングできます.

S3
S3 の様子.実行日ごとにモデルのアーカイブとメトリクスがアップロードされる.

  • metrics.json

生成されるメトリクスファイル.性能指標だけでなく学習時間や計算時間などもわかります)

{
  "best_epoch": 19,
  "peak_worker_0_memory_MB": 431.796,
  "training_duration": "0:29:38.785065",
  "training_start_epoch": 0,
  "training_epochs": 19,
  "epoch": 19,
  "training_accuracy": 0.8916963871929718,
  "training_accuracy3": 0.8938523846944327,
  "training_precision-overall": 0.8442808607021518,
  "training_recall-overall": 0.8352005377548734,
  "training_f1-measure-overall": 0.8397161522865011,
  "training_loss": 38.08172739275527,
  "training_reg_loss": 0.0,
  "training_worker_0_memory_MB": 431.796,
  "validation_accuracy": 0.8663015463917526,
  "validation_accuracy3": 0.8688788659793815,
  "validation_precision-overall": 0.8324965769055226,
  "validation_recall-overall": 0.7985989492119089,
  "validation_f1-measure-overall": 0.815195530726207,
  "validation_loss": 49.37634348869324,
  "validation_reg_loss": 0.0,
  "best_validation_accuracy": 0.8663015463917526,
  "best_validation_accuracy3": 0.8688788659793815,
  "best_validation_precision-overall": 0.8324965769055226,
  "best_validation_recall-overall": 0.7985989492119089,
  "best_validation_f1-measure-overall": 0.815195530726207,
  "best_validation_loss": 49.37634348869324,
  "best_validation_reg_loss": 0.0,
  "test_accuracy": 0.875257568552861,
  "test_accuracy3": 0.8789031542241242,
  "test_precision-overall": 0.8318906605922551,
  "test_recall-overall": 0.8214125056230319,
  "test_f1-measure-overall": 0.8266183793571253,
  "test_loss": 48.40180677297164
}

モデルの学習は EC2 インスタンス上で実行されます. 今回のケースではデータセットは比較的小さく(全データ = 500 レシピ), BiLSTM-CRF のネットワークもそこまで大きくありません. このため,通常のバッチジョブとほぼ同じ程度の規模のインスタンスでの学習が可能です. 実行環境が GPU や大容量メモリなどのリソースを必要としないため,通常のバッチ開発のフローに乗ることができました. これにより,社内に蓄積されていたバッチ運用の知見を活かしてインフラ環境の整備にかかるコストを抑えつつ学習バッチを構築できています.

また, nerman のバッチはすべてスポットインスタンスを前提として構築されています. スポットインスタンスは通常のインスタンスよりもコストが低く, 代わりに実行中に強制終了する(spot interruption と呼ばれる)可能性があるインスタンスです. モデルの学習は強制終了されてしまってもリトライをかければよく,学習にかかる時間が長すぎなければスポットインスタンスを利用することでコストを抑えられます. (ただし,学習にかかる時間が長ければ長いだけ spot interruption に遭遇する可能性が高くなります. リトライを含めた全体での実行時間が通常のインスタンスでの実行時間と比較して長くなりすぎた場合, かえってコストがかかってしまう可能性があり,注意が必要です.)

実データでの予測

以下のようなシェルスクリプトを実行して予測を実行します.

  • predict
#!/bin/bash

export MODEL_VERSION=${MODEL_VERSION:-2020-07-08}
export TIMESTAMP=${TIMESTAMP:-`date '+%Y-%m-%d'`}

export FROM_IDX=${FROM_IDX:-10000}
export LAST_IDX=${LAST_IDX:-10100}

export KUROKO2_PARALLEL_FORK_INDEX=${KUROKO2_PARALLEL_FORK_INDEX:--1}
export KUROKO2_PARALLEL_FORK_SIZE=${KUROKO2_PARALLEL_FORK_SIZE:--1}

if [ $KUROKO2_PARALLEL_FORK_SIZE = -1 ] || [ $KUROKO2_PARALLEL_FORK_INDEX = -1 ]; then
  echo $FROM_IDX $LAST_IDX ' (without parallel execution)'

else
  if (($KUROKO2_PARALLEL_FORK_INDEX >= $KUROKO2_PARALLEL_FORK_SIZE)); then
    echo '$KUROKO2_PARALLEL_FORK_INDEX'=$KUROKO2_PARALLEL_FORK_INDEX 'must be smaller than $KUROKO2_PARALLEL_FORK_SIZE' $KUROKO2_PARALLEL_FORK_SIZE
    exit
  fi

  # ==============================================================================
  # begin: FROM_IDX ~ LAST_IDX のデータを KUROKO2_PARALLEL_FORK_SIZE の値で等分する処理
  # ==============================================================================

  NUM_RECORDS=$(($LAST_IDX - $FROM_IDX))
  echo 'NUM_RECORDS = ' $NUM_RECORDS

  if (($NUM_RECORDS % $KUROKO2_PARALLEL_FORK_SIZE != 0)); then
    echo '$KUROKO2_PARALLEL_FORK_SIZE = ' $KUROKO2_PARALLEL_FORK_SIZE 'must be multiple of $NUM_RECORDS=' $NUM_RECORDS
    exit
  fi

  DIV=$(($NUM_RECORDS / $KUROKO2_PARALLEL_FORK_SIZE))
  echo 'DIV=' $DIV

  if (($DIV <= 0)); then
    echo 'Invalid DIV=' $DIV
    exit
  fi

  LAST_IDX=$(($FROM_IDX + (($KUROKO2_PARALLEL_FORK_INDEX + 1) * $DIV) ))
  FROM_IDX=$(($FROM_IDX + ($KUROKO2_PARALLEL_FORK_INDEX * $DIV) ))
  echo '$FROM_IDX = ' $FROM_IDX ' $LAST_IDX = ' $LAST_IDX

  # ============================================================================
  # end: FROM_IDX ~ LAST_IDX のデータを KUROKO2_PARALLEL_FORK_SIZE の値で等分する処理
  # ============================================================================
fi

allennlp custom-predict \
    --from-idx $FROM_IDX \
    --last-idx $LAST_IDX \
    --include-package nerman \
    --model-path s3://xxx/nerman/model/$MODEL_VERSION/model.tar.gz

aws s3 cp \
    --recursive \
    --exclude "*" \
    --include "_*.csv" \
    prediction \
    s3://xxx/nerman/output/$TIMESTAMP/prediction/

予測バッチは学習バッチが作成したモデルを読み込み,固有表現が付与されていないレシピを解析します. また,予測バッチは並列実行に対応しています. クックパッドには 340 万品以上のレシピが投稿されており,これらのレシピを一度に解析するのは容易ではありません. このため,レシピを複数のグループに分割し,それぞれを並列に解析しています.

並列処理の様子
並列処理の様子

FROM_RECIPE_IDXLAST_RECIPE_IDX で解析対象とするレシピを指定し, KUROKO2_PARALLEL_FORK_SIZE という環境変数で並列数を設定します. 並列実行されたプロセスには KUROKO2_PARALLEL_FORK_INDEX という変数が渡されるようになっていて,この変数で自身が並列実行されたプロセスのうち何番目かを識別します. プロセスの並列化は社内で利用されているジョブ管理システム kuroko2 の並列実行機能 (parallel_fork) を利用して実現しています.

custom-predict コマンドは上で定義した変数を用いて対象となるレシピを分割し, AllenNLP のモデルを用いて固有表現を抽出するコマンドです. AllenNLP では自分でサブコマンドを登録でき,このようにすべての処理を allennlp コマンドから実行できるようになっています. サブコマンドは以下のように Python スクリプト(predict_custom.py)を作成して定義できます. (サブコマンドについての公式ドキュメントはこちら

  • custom_predict.py
import argparse

from allennlp.commands import Subcommand

from nerman.data.dataset_readers import StreamSentenceDatasetReader
from nerman.predictors import KonohaSentenceTaggerPredictor


def create_predictor(model_path) -> KonohaSentenceTaggerPredictor:
    archive = load_archive(model_path)
    predictor = KonohaSentenceTaggerPredictor.from_archive(archive)
    dataset_reader = StreamSentenceDatasetReader(predictor._dataset_reader._token_indexers)
    return KonohaSentenceTaggerPredictor(predictor._model, dataset_reader)


def _predict(
  from_idx: int,
  last_idx: int,
  model_path: str,
):

    # predictor の作成
    predictor = create_predictor(model_path)
    ...  # Redshift からデータを取ってきたりモデルに入力したりする処理(今回の記事では解説は割愛します)


def predict(args: argparse.Namespace):
    from_idx = args.from_idx
    last_idx = args.last_idx
    _predict(from_idx, last_idx)


@Subcommand.register("custom-predict")
class CustomPrediction(Subcommand):
    @overrides
    def add_subparser(self, parser: argparse._SubParsersAction) -> argparse.ArgumentParser:
        description = "Script to custom predict."
        subparser = parser.add_parser(self.name, description=description, help="Predict entities.")

        subparser.add_argument("--from-idx", type=int, required=True)
        subparser.add_argument("--last-idx", type=int, required=True)
        subparser.add_argument("--model-path", type=str, required=True)

        subparser.set_defaults(func=predict)  # サブコマンドが呼ばれたときに実際に実行するメソッドを指定する
        return subparser

model_path という変数にはモデルのアーカイブファイルのパスが指定されています. アーカイブファイルのパスは load_archive というメソッドに渡されます. load_archive は AllenNLP が提供しているメソッドであり,これを利用すると保存された学習済みモデルの復元が簡単にできます. また, load_archive はデータセットのパスと同様 S3 スキーマに対応しているため,学習バッチでアップロード先に指定したパスをそのまま利用できます. (load_archive の公式ドキュメントはこちら

文字列をモデルに入力するためには AllenNLP の Predictor という機構を利用しています. 公式ドキュメントはこちらです. 系列ラベリングモデルの予測結果を扱う際に便利な SentenceTaggerPredictor クラスを継承し,以下に示す KonohaSentenceTaggerPredictor クラスを定義しています. predict メソッドに解析したい文字列を入力すると,モデルの予測結果を出力してくれます.

from allennlp.common.util import JsonDict
from allennlp.data import Instance
from allennlp.data.dataset_readers.dataset_reader import DatasetReader
from allennlp.models import Model
from allennlp.predictors import SentenceTaggerPredictor
from allennlp.predictors.predictor import Predictor
from konoha.integrations.allennlp import KonohaTokenizer
from overrides import overrides


@Predictor.register("konoha_sentence_tagger")
class KonohaSentenceTaggerPredictor(SentenceTaggerPredictor):
    def __init__(self, model: Model, dataset_reader: DatasetReader) -> None:
        super().__init__(model, dataset_reader)
        self._tokenizer = KonohaTokenizer("mecab")

    def predict(self, sentence: str) -> JsonDict:
        return self.predict_json({"sentence": sentence})

    @overrides
    def _json_to_instance(self, json_dict: JsonDict) -> Instance:
        sentence = json_dict["sentence"]
        tokens = self._tokenizer.tokenize(sentence)
        return self._dataset_reader.text_to_instance(tokens)

nerman では,日本語のレシピデータを扱うために日本語処理ツールの konoha を利用しています. KonohaTokenizer は Konoha が提供している AllenNLP インテグレーション機能です. 日本語文字列を受け取り,分かち書きもしくは形態素解析を実施, AllenNLP のトークン列を出力します. 形態素解析器には MeCab を採用しており,辞書は mecab-ipadic を使用しています.

github.com

次に,作成したモジュールを __init__.py でインポートします. 今回は nerman/commands というディレクトリに custom_predict.py を設置しています. このため, nerman/__init__.py および nerman/commands/__init__.py をそれぞれ次のように編集します.

  • nerman/__init__.py
import nerman.commands
  • nerman/commands/__init__.py
from nerman.commands import custom_predict

コマンドの定義およびインポートができたら, allennlp コマンドで実際にサブコマンドを認識させるために .allennlp_plugins というファイルをリポジトリルートに作成します.

  • .allennlp_plugins
nerman

以上の操作でサブコマンドが allennlp コマンドで実行できるようになります. allennlp --help を実行して作成したコマンドが利用できるようになっているか確認できます.

得られた予測結果は CSV 形式のファイルとして保存され,予測が終了した後に S3 へアップロードされます.

次に, S3 にアップロードした予測結果をデータベースに投入します. データは最終的に Amazon Redshift (以降 Redshift) に配置されますが, Amazon Aurora (以降 Aurora)を経由するアーキテクチャを採用しています. これは Aurora の LOAD DATA FROM S3 ステートメントという機能を利用するためです. LOAD DATA FROM S3 ステートメントは次のような SQL クエリで利用できます.

  • load.sql
load
    data
from
    S3 's3://xxx/nerman/output/$TIMESTAMP/prediction.csv'
into table recipe_step_named_entities
    fields terminated by ','
    lines  terminated by '\n'
    (recipe_text_id, start, last, name, category)
    set created_at = current_timestamp, updated_at = current_timestamp;

このクエリを実行することで, S3 にアップロードした CSV ファイルを直接 Amazon Aurora にインポートできます. LOAD DATA FROM S3 については AWS の公式ドキュメント が参考になります. バッチサイズやコミットのタイミングの調整の手間が必要なくなるため,大規模データをデータベースに投入する際に非常に便利です.

Aurora のデータベースに投入した予測結果は pipelined-migrator という社内システムを利用して定期的に Redshift へ取り込まれます. pipelined-migrator を利用することで,管理画面上で数ステップ設定を行うだけで Aurora から Redshift へデータを取り込めます. これにより, S3 からのロードと pipelined-migrator を組み合わせた手間の少ないデータの投入フローが実現できました.

解析結果をスタッフに利用してもらう方法として,データベースを利用せずに予測 API を用意する方法も考えられます. 今回のタスクの目標は「すでに投稿されたレシピからの料理用語の自動抽出」であり,これはバッチ処理であらかじめ計算可能です. このため, API サーバを用意せずにバッチ処理で予測を行う方針を採用しました.

また,エンジニア以外のスタッフにも予測結果を使ってみてもらいたいと考えていました. クックパッドはエンジニア以外のスタッフも SQL を書ける方が多いため, 予測結果をクエリ可能な形でデータベースに保存しておく方針はコストパフォーマンスがよい選択肢でした. 予測結果を利用するクエリ例を以下に示します.

  • list_tools.sql
select
    , s.recipe_id
    , e.name
    , e.category
from
    recipe_step_named_entities as e
    inner join recipe_steps as s on e.step_id = s.id
where
    e.category in ('Tg')
    and s.recipe_id = xxxx

このクエリを Redshift 上で実行することで,レシピ中に出現する調理器具のリストを取得できるようになりました.

SQL の実行結果
SQL の実行結果

Optuna を用いたハイパーパラメータの分散最適化

最後にハイパーパラメータの最適化について解説します.

github.com

nerman では Optuna を用いたハイパーパラメータの最適化を実施しています. Optuna は Preferred Networks (PFN) が開発しているハイパーパラメータ最適化のライブラリです. インストールは pip install optuna をターミナルで実行すれば完了します.

Optuna では,各インスタンスから接続可能なバックエンドエンジン(RDB or Redis)を用意し,それをストレージで使用することで, 複数インスタンスを利用した分散環境下でのハイパーパラメータ最適化を実現できます. (ストレージは Optuna が最適化結果を保存するために使用するもので,RDB や Redis などを抽象化したものです) インスタンスをまたいだ分散最適化を実施する場合,ストレージのバックエンドエンジンは MySQL もしくは PostgreSQL が推奨されています (Redis も experimental feature として利用可能になっています). 詳しくは公式ドキュメントをご参照ください. 今回はストレージとして MySQL (Aurora) を採用しています.

Optuna には AllenNLP のためのインテグレーションモジュールが存在します. しかしながら,このインテグレーションモジュールを使うと自身で最適化を実行するための Python スクリプトを記述する必要があります. そこで, AllenNLP とよりスムーズに連携するために allennlp-optuna というツールを開発しました. allennlp-optuna をインストールすると,ユーザは allennlp tune というコマンドで Optuna を利用したハイパーパラメータ最適化を実行できるようになります. このコマンドは allennlp train コマンドと互換性が高く, AllenNLP に慣れたユーザはスムーズにハイパーパラメータの最適化を試せます.

github.com

allennlp tune コマンドを実行するには,まず pip install allennlp-optuna.gitallennlp-optuna をインストールします. 次に, .allennlp_plugins を以下のように編集します.

  • .allennlp_plugins
allennlp-optuna
nerman

allennlp --help とコマンドを実行して,以下のように retrain コマンドと tune コマンドが確認できればインストール成功です.

$ allennlp --help
2020-11-05 01:54:24,567 - INFO - allennlp.common.plugins - Plugin allennlp_optuna available
usage: allennlp [-h] [--version]  ...

Run AllenNLP

optional arguments:
  -h, --help     show this help message and exit
  --version      show program's version number and exit

Commands:

    best-params  Export best hyperparameters.
    evaluate     Evaluate the specified model + dataset.
    find-lr      Find a learning rate range.
    predict      Use a trained model to make predictions.
    print-results
                 Print results from allennlp serialization directories to the console.
    retrain      Train a model with hyperparameter found by Optuna.
    test-install
                 Test AllenNLP installation.
    train        Train a model.
    tune         Optimize hyperparameter of a model.

allennlp-optuna が無事にインストールできました. 次に allennlp-optuna を利用するために必要な準備について解説します.

設定ファイルの修正

はじめに,準備の章で作成した config.jsonnet を以下のように書き換えます.

  • config.jsonnet (allennlp-optuna 用)
// ハイパーパラメータを変数化する
local lr = std.parseJson(std.extVar('lr'));
local lstm_hidden_size = std.parseInt(std.extVar('lstm_hidden_size'));
local dropout = std.parseJson(std.extVar('dropout'));
local word_embedding_dim = std.parseInt(std.extVar('word_embedding_dim'));

local cuda_device = -1;

{
  dataset_reader: {
    type: 'cookpad2020',
    token_indexers: {
      word: {
        type: 'single_id',
      },
    },
    coding_scheme: 'BIOUL',
  },
  train_data_path: 'data/cpc.bio.train',
  validation_data_path: 'data/cpc.bio.dev',
  model: {
    type: 'crf_tagger',
    text_field_embedder: {
      type: 'basic',
      token_embedders: {
        word: {
          type: 'embedding',
          embedding_dim: word_embedding_dim,
        },
      },
    },
    encoder: {
      type: 'lstm',
      input_size: word_embedding_dim,
      hidden_size: lstm_hidden_size,
      dropout: dropout,
      bidirectional: true,
    },
    label_encoding: 'BIOUL',
    calculate_span_f1: true,
    dropout: dropout,  // ここで宣言した変数を指定する
    initializer: {},
  },
  data_loader: {
    batch_size: 10,
  },
  trainer: {
    num_epochs: 20,
    cuda_device: cuda_device,
    optimizer: {
      type: 'adam',
      lr: lr,  // ここで宣言した変数を指定する
    },
  },
}

最適化したいハイパーパラメータを local lr = std.parseJson(std.extVar('lr')) のように変数化しています. std.extVar の返り値は文字列です.機械学習モデルのハイパーパラメータは整数や浮動小数であることが多いため,キャストが必要になります. 浮動小数へのキャストは std.parseJson というメソッドを利用します.整数へのキャストは std.parseInt を利用してください.

探索空間の定義

次に,ハイパーパラメータの探索空間を定義します. allennlp-optuna では,探索空間は次のような JSON ファイル(hparams.json)で定義します.

  • hparams.json
[
  {
    "type": "float",
    "attributes": {
      "name": "dropout",
      "low": 0.0,
      "high": 0.8
    }
  },
  {
    "type": "int",
    "attributes": {
      "name": "lstm_hidden_size",
      "low": 32,
      "high": 256
    },
  },
  {
    "type": "float",
    "attributes": {
      "name": "lr",
      "low": 5e-3,
      "high": 5e-1,
      "log": true
    }
  }
]

今回の例では学習率とドロップアウトの比率が最適化の対象です. それぞれについて,値の上限・下限を設定します. 学習率は対数スケールの分布からサンプリングするため, "log": true としていることに注意してください.

最適化バッチは次のようなシェルスクリプトを実行します.

  • optimize
#!/bin/bash

export N_TRIALS=${N_TRIALS:-20}  # Optuna の試行回数を制御する
export TIMEOUT=${TIMEOUT:-7200}  # # 一定時間が経過したら最適化を終了する(単位は秒): 60*60*2 => 2h
export TIMESTAMP=${TIMESTAMP:-`date '+%Y-%m-%d'`}

export OPTUNA_STORAGE=${OPTUNA_STORAGE:-mysql://$DB_USERNAME:$DB_PASSWORD@$DB_HOST_NAME/$DB_NAME}
export OPTUNA_STUDY_NAME=${OPTUNA_STUDY_NAME:-nerman-$TIMESTAMP}

# ハイパーパラメータの最適化
allennlp tune \
  config/ner.jsonnet \
  config/hparam.json \
  --serialization-dir result/hpo \
  --include-package nerman \
  --metrics best_validation_f1-measure-overall \
  --study-name $OPTUNA_STUDY_NAME \
  --storage $OPTUNA_STORAGE \
  --direction maximize \
  --n-trials $N_TRIALS \
  --skip-if-exists \
  --timeout $TIMEOUT

このコマンドを複数のインスタンスで実行することで,ハイパーパラメータの分散最適化が実行できます. オプション --skip-if-exists を指定することで,複数のインスタンスの間で最適化の途中経過を共有しています. Optuna は通常実行のたびに新しく実験環境(study と呼ばれます)を作成し,ハイパーパラメータの探索を行います. このとき,すでにストレージに同名の study が存在する場合はエラーになります. しかし, --skip-if-exists を有効にすると,ストレージに同名の study がある場合は当該の study を読み込み,途中から探索を再開します. この仕組みによって,複数のインスタンスで --skip-if-exists を有効にして探索を開始することでだけで study を共有した最適化が行われます. 上記のスクリプトによって,最適化バッチは与えられた時間(--timeout で設定されている値 = 2 時間)に最大 20 回探索を実行します.

このように, Optuna のリモートストレージ機能によって,複数のインスタンスで同じコマンドを実行するだけで分散最適化が実現できました! Optuna の分散ハイパーパラメータ最適化の詳しい仕組み,あるいはより高度な使い方については Optuna 開発者の 解説資料 が参考になるので, 興味のある方は合わせてご参照ください.

モデルの再学習

最後に,最適化されたハイパーパラメータを用いてモデルを再学習します. 再学習バッチは以下のようなシェルスクリプトで実行します.

  • retrain
#!/bin/bash

export TIMESTAMP=${TIMESTAMP:-`date '+%Y-%m-%d'`}

export OPTUNA_STORAGE=${OPTUNA_STORAGE:-mysql://$DB_USERNAME:$DB_PASSWORD@$DB_HOST_NAME/$DB_NAME}

# 最適化されたハイパーパラメータを用いたモデルの再学習
allennlp retrain \
  config/ner.jsonnet \
  --include-package nerman \
  --include-package allennlp_models \
  --serialization-dir result \
  --study-name $OPTUNA_STUDY_NAME \
  --storage $OPTUNA_STORAGE

# モデルとメトリクスのアップロード
aws s3 cp result/model.tar.gz s3://xxx/nerman/model/$TIMESTAMP/model.tar.gz
aws s3 cp result/metrics.json s3://xxx/nerman/model/$TIMESTAMP/metrics.json

このシェルスクリプトでは allennlp-optuna が提供する retrain コマンドを利用しています. allennlp retrain コマンドはストレージから最適化結果を取得し,得られたハイパーパラメータを AllenNLP に渡してモデルの学習を行ってくれます. tune コマンド同様, retrain コマンドも train コマンドとほぼ同じインターフェースを提供していることがわかります.

再学習したモデルのメトリクスを以下に示します.

  • metrics.json
{
  "best_epoch": 2,
  "peak_worker_0_memory_MB": 475.304,
  "training_duration": "0:45:46.205781",
  "training_start_epoch": 0,
  "training_epochs": 19,
  "epoch": 19,
  "training_accuracy": 0.9903080859981059,
  "training_accuracy3": 0.9904289830542626,
  "training_precision-overall": 0.9844266427651112,
  "training_recall-overall": 0.9843714989917096,
  "training_f1-measure-overall": 0.9843990701061036,
  "training_loss": 3.0297666011196327,
  "training_reg_loss": 0.0,
  "training_worker_0_memory_MB": 475.304,
  "validation_accuracy": 0.9096327319587629,
  "validation_accuracy3": 0.911243556701031,
  "validation_precision-overall": 0.884530630233583,
  "validation_recall-overall": 0.8787215411558669,
  "validation_f1-measure-overall": 0.8816165165824231,
  "validation_loss": 61.33201414346695,
  "validation_reg_loss": 0.0,
  "best_validation_accuracy": 0.9028672680412371,
  "best_validation_accuracy3": 0.9048002577319587,
  "best_validation_precision-overall": 0.8804444444444445,
  "best_validation_recall-overall": 0.867338003502627,
  "best_validation_f1-measure-overall": 0.873842082046708,
  "best_validation_loss": 38.57948366800944,
  "best_validation_reg_loss": 0.0,
  "test_accuracy": 0.8887303851640513,
  "test_accuracy3": 0.8904739261372642,
  "test_precision-overall": 0.8570790531487271,
  "test_recall-overall": 0.8632478632478633,
  "test_f1-measure-overall": 0.8601523980277404,
  "test_loss": 44.22851959539919
}

モデルの学習 の章で学習されたモデルと比較して,テストデータでの F値(test_f1-measure-overall)が 82.7 から 86.0 となり, 3.3 ポイント性能が向上しました. ハイパーパラメータの探索空間をアバウトに定めて Optuna に最適化をしてもらえば十分な性能を発揮するハイパーパラメータが得られます.便利です.

Optuna はハイパーパラメータを最適化するだけでなく, 最適化途中のメトリクスの推移やハイパーパラメータの重要度などを可視化する機能, 最適化結果を pandas DataFrame で出力する機能をはじめとする強力な実験管理機能を提供しています. より詳しく AllenNLP と Optuna の使い方を学びたい方は AllenNLP の公式ガイド なども合わせて読んでみてください.

まとめ

本稿では AllenNLP と Optuna を用いて構築した固有表現抽出システム nerman について紹介しました. nerman は AllenNLP を用いたモデル学習・実データ適用, Amazon Aurora を活用したデータ投入の手間の削減, および Optuna を活用したスケーラブルなハイパーパラメータ探索を実現しています. AllenNLP と Optuna を用いた機械学習システムの一例として,読んでくださった皆さんの参考になればうれしいです.

クックパッドでは自然言語処理の技術で毎日の料理を楽しくする仲間を募集しています. 実現したい価値のため,データセットの構築から本気で取り組みたいと考えている方にはとても楽しめる環境だと思います. 興味をもってくださった方はぜひご応募ください! クックパッドの {R&D, サービス開発現場} での自然言語処理についてカジュアルに話を聞きたいと思ってくださった方は @himkt までお気軽にご連絡ください.