冪等なデータ処理ジョブを書く

こんにちは、マーケティングサポート事業部データインテリジェンスグループの井上寛之(@inohiro)です。普段はマーケティングに使われるプライベートDMP(データマネジメントプラットフォーム)の開発を行っています。本稿では、その過程で得られた冪等なデータ処理ジョブの書き方に関する工夫を紹介したいと思います。今回は、RDBMS上で SQL によるデータ処理を前提に紹介しますが、この考え方は他の言語や環境におけるデータ処理についても応用できるはずです。

まずクックパッドのDMPと、冪等なジョブについて簡単に説明し、ジョブを冪等にするポイントを挙げます。また、SQL バッチジョブフレームワークである bricolage を使った、冪等なジョブの実装例を示します。

クックパッドのDMPと冪等なジョブ

クックパッドのプライベートDMPは、データウェアハウス(社内の巨大な分析用データベースで、クックパッドでは Amazon Redshift を使っている。以下 DWH) 上で構築されており、主に cookpad.com 上のターゲット広告や、社内のデータ分析に活用されています。材料となるデータは、広告のインプレッションログや、クックパッド上での検索・レシピ閲覧ログです。また他社から得たデータを DWH に取り込んで、活用したりしています。

これらのデータを活用したバッチジョブ群は、社内でも比較的大きめのサイズになっており、途中でジョブが止まってしまうことも考慮して、基本的にそれぞれのジョブが冪等な結果を生成するように開発されています。

冪等についての詳しい説明は省略しますが、簡単に言うと「あるジョブを何度実行しても、同じ結果が得られる」ということです。特にデータ処理の文脈においては、「途中で集計ジョブが失敗してしまったがために、ある日のデータが重複・欠損して生成されていた」ということはあってはなりません。ジョブが冪等になるように開発されていれば、失敗した場合のリトライも比較的簡単になります。また、ジョブが失敗しなかったとしても、(オペミス等で)たまたま複数回実行されるかもしれませんし、毎回同じ結果が生成されるべきです。

さらに、ジョブを冪等になるように開発すると、開発時に手元で試しに実行してみるときも検証が簡単なため、おすすめです。

冪等なジョブにするポイント

プライベート DMP を開発して得られた、ジョブを冪等にするためのポイントはズバリ「トランザクションを使え」です。

トランザクションを使ってロールバック

大量のデータを、長時間(N時間)かけて書き込むようなバッチジョブを考えるとき、途中で止まってしまったり、そこから復旧(リトライ)するという状況は予め考慮されているべきです。このとき、書き込む先がトランザクションをサポートするようなデータベース(一般的なRDBMSなど)ならば、トランザクションを利用しましょう。一つのトランザクションとしてまとめた一連の処理は、「すべて成功した状態」か、「すべて失敗した状態(ロールバック)」のどちらかになることが保証され、中途半端な状態にはなりません。途中で失敗しても、最初からぜんぶ書き直すことになりますが、冪等性は保たれています。

クックパッドの DMP は並列分散 RDB である Amazon Redshift 上に構築されているので、トランザクションをフルに活用しています。

自前でロールバック

一度実行された集計ジョブを再度実行した場面を考えてみます。再度実行される理由はいろいろ考えられますが、「意図せず間違って実行されてしまった」というのも同じような状況と考えられます。前回実行したときと同じ結果が得られれば問題ありませんが、集計した結果が重複してしまうと、後続のジョブが失敗するか、最悪の場合正しくない分析結果を用いて、何らかの意思決定が行われてしまうかもしれません。

つまり、現在実行中のジョブが書き込むテーブルに、今から書き込もうとしている条件で、既にデータが書き込まれているかもしれないのです。そこで、新たな結果をを書き込む前に、既存の行を削除(自前でロールバック)することで重複の発生を避けます。さらに、「削除」と「新しい結果の書き込み」を一つのトランザクションにまとめることで、このジョブは冪等になります。

冪等なデータ構造を利用する

一方で、トランザクションをサポートしないような NoSQL データベースを使っているとき、ジョブを冪等にするのは比較的簡単ではありません。このような状況で考えられる一つの解決策として、何度書き込まれても結果が変わらないデータ構造の利用が挙げられます。集合(Set)やハッシュテーブルです。これらのデータ構造は、データの順序は保証されないものの、既に存在する値(もしくはキー)を書き込んでも、要素が重複しません。

クックパッドの DMP で作成したターゲット広告用のデータは、最終的に Amazon DynamoDB *1 に書き込まれ、広告配信サーバーがそのデータを使っています。ターゲット広告用のデータは、一度に数千万要素をバッチジョブが並列で書き込みますが、このジョブが稀に失敗することがあったり、過去に書き込まれている要素が時を経て再度書き込まれることがあるため、SS(文字列のセット)型を使っています。過去には Redis のセット型を使っていることもありました。

bricolage による冪等なジョブの実装例

クックパッドの DMP だけでなく、社内で SQL バッチジョブを書くときのデファクトスタンダードになっている bricolage には、頻出パターンのジョブを書く際に便利な「ジョブ・クラス」がいくつかあり、これを使うことで冪等なジョブを簡単に実装することができます。この節では bricolage を使った「トランザクションでロールバック」パターンと、「自前でロールバック」パターンの実装例を示します。

bricolage については、ここでは詳しく説明しませんが、詳細については過去の記事「巨大なバッチを分割して構成する 〜SQLバッチフレームワークBricolage〜」や、RubyKaigi 2019 でのLT「Write ETL or ELT data processing jobs with bricolage.」をご参照ください。また inohiro/rubykaigi2019_bricolage_demo にデモプロジェクトを置いてあります。

「トランザクションでロールバック」パターン

rebuild-drop もしくは rebuild-rename ジョブ・クラスを使うと、「現行のテーブルを削除し、新規のテーブルに集計結果を書き込む」または「新規にテーブルを作り、集計結果を書き込み、現行のテーブルとすり替える」という操作を、一つのトランザクションで行うジョブを簡単に実装することができます。rebuild-drop は対象のテーブルを作り直す前に drop table し、rebuild-rename はすり替えられた古いテーブルを、別名で残しておきます。

以下は、毎日作り変えられるようなサマリーテーブルを rebuild-drop ジョブ・クラスで実装した例です。

/*
class: rebuild-drop -- ジョブ・クラスの指定
dest-table: $public_schema.articles_summary
table-def: articles_summary.ct
src-tables:
  pv_log: $public_schema.pv_log
analyze: false
*/

insert into $dest_table
select
    date_trunc('day', logtime)::date as day
    , id_param::integer as article_id
    , count(*) as pv
from
    $pv_log
where
    controller = 'articles' and action = 'show'
    and logtime < '$today'::date
group by
    1, 2
;

このジョブは、以下の SQL に変換されて実行されます。

\timing on

begin transaction; -- トランザクション開始

drop table if exists public.articles_summary cascade; -- 既存テーブルの削除

/* /Users/hiroyuki-inoue/devel/github/rubykaigi2019_bricolage_demo/demo/articles_summary.ct */
create table public.articles_summary
( day date
, article_id integer
, pv bigint
)
;

/* demo/articles_summary-rebuild.sql.job */
insert into public.articles_summary
select
    date_trunc('day', logtime)::date as day
    , id_param::integer as article_id
    , count(*) as pv
from
    public.pv_log
where
    controller = 'articles' and action = 'show'
    and logtime < '2019-07-13'::date
group by
    1, 2
;

commit; -- トランザクション終了

ジョブ全体が begin transaction;commit; で囲われているので、仮に集計クエリに問題があり失敗した場合は、元のテーブルは削除されずに残ります。

「自前でロールバック」パターン

insert-delta ジョブ・クラスは既存のテーブルに差分を書き込むために利用され、差分を書き込む直前に指定した条件でdelete を実行します。また、一連の SQL は一つのトランザクションの中で行われるので、delete 直後の差分を集計するクエリが失敗しても安心です。

以下は、日毎に広告インプレッションを蓄積しているテーブルimpressions_summary に、前日($data_date*2の集計結果を書き込むジョブの例です。delete-cond: に削除条件を指定します。今回の例では、集約条件の一つである日付を指定しています。

/*
class: insert-delta -- ジョブ・クラスの指定
dest-table: $public_schema.impressions_summary
table-def: impressions_summary.ct
src-tables:
    impressions: $ad_schema.impressions
delete-cond: "data_date = '$data_date'::date" -- 削除条件の指定
analyze: false
*/

insert into $dest_table
select
    '$data_date'::date as data_date
    , platform_id
    , device_type
    , count(*) as impressions
from
    $impressions
group by
    1, 2, 3
;

このジョブは以下のような SQL に変換され、実行されます。

\timing on

begin transaction; -- トランザクション開始

delete from impressions_summary where data_date = '2019-07-12'::date; -- 既存行を指定した条件で削除

/* demo/impressions_summary-add.sql.job */
insert into impressions_summary
select
    '2019-07-12'::date as data_date
    , platform_id
    , device_type
    , count(*) as impressions
from
    ad.impressions
group by
    1, 2, 3
;

commit; -- トランザクション終了

テーブルに書き込む前に指定した条件(delete-cond: "data_date = '$data_date'::date")で delete クエリが実行され、"掃除"してから書き込むクエリが実行されるのが確認できると思います。対象の行がなければ何も削除されませんし、対象の行が存在すれば、新たな結果を書き込む前に削除されます。

まとめ

本稿では、クックパッドの DMP 開発において「冪等なデータ処理ジョブ」を書くために行われているいくつかの工夫について紹介しました。また、bricolage を使ってこれらのジョブを実装する例を示しました。

このように、トランザクションのあるデータベースを利用する場合は、なるべくその恩恵に乗っかるのがお手軽です。また、一つのジョブに色々なことを詰め込まず、ジョブを小さく保つことで、ロールバックの対象も小さくなり、失敗した場合のリトライなどもシンプルに行えると思います。bricolage のジョブ・クラスを上手に使うことで、トランザクションを利用した冪等なデータ処理ジョブを簡単に実装することができます。ぜひお試しください。

*1:この記事を書いていて思い出しましたが、Amazon DynamoDB はトランザクションをサポートしたのでした https://aws.amazon.com/jp/blogs/news/new-amazon-dynamodb-transactions/

*2:変数には前日の日付が入るように仮定しているが、ジョブのオプションで上書きが可能