巨大なバッチを分割して構成する 〜SQLバッチフレームワークBricolage〜

トレンド調査ラボの青木峰郎(id:mineroaoki)です。 好きなRubyのメソッドは10年前からString#slice(re, nth)ですが、 最近はRubyよりCoffeeScriptとSQLのほうが書く量が多くて悩んでいます。

今日はわたしが開発している「たべみる」の背後で働いている 巨大バッチの構成について話したいと思います。

たべみるのバッチは約3000行のSQLで構成されており、 処理時間が1日で4時間程度かかる、そこそこの規模のプログラムです。 このバッチ処理プログラムをBricolage(ブリコラージュ)というフレームワークで構造化する手法について説明します。

「たべみる」とは

まず最初に、「たべみる」がどういうものなのかごく簡単にお話ししておきましょう。

「たべみる」は企業のみに提供しているB2Bの分析サービスで、 クックパッドのレシピ検索の分析をすることができます。 具体的には、特定の語の検索頻度や、どんな語と一緒に検索されているか、 それから急激に検索が増えている語などを知ることができます。

例えば次は「バレンタイン」という語の検索頻度のグラフです。

▼「バレンタイン」の検索頻度グラフ(2014年〜2015年) f:id:mineroaoki:20150626162112p:plain

このグラフを見ると、「バレンタイン」という語の検索頻度は12月26日から上昇を始めていることが見てとれます。 つまりクリスマスが終わったとみるや次のイベント(バレンタイン)に向けて準備を始めているのですね。 恐ろしいことです。

「たべみる」バッチの構成

たべみるは上記の画面を500ミリ秒未満で表示することができます。 これは一般的なRailsアプリに比べれば遅いほうですが、 Amazon Redshiftに直接アクセスして6年分のデータ(10億件を余裕で越えます) に対して分析を行っていることを考えると、実は非常に高速と言える速度なのです。

このような分析を高速に実行できるようにするために、 背後では事前に集計を済ませた、いわゆる「サマリーテーブル」を大量に作成しています。 このサマリーテーブルを作るのが、たべみるバッチの主な仕事です。

たべみるバッチは日次(1日に1回の頻度)で動き、 次のような仕組みでサマリーテーブルを更新します。

▼たべみるバッチ概要フロー f:id:mineroaoki:20150626162328p:plain

まず元データをcookpad.comのメインデータベースであるMySQLと、 Treasure Data(Hadoop)から取得してRedshiftに入れます。 そのあとのキーワードの名寄せや集計はすべてRedshift上で行っています。

たべみるバッチではほとんどのデータ加工処理をSQLで記述しているので、 Redshiftの並列処理の恩恵を十分に受けることができます。

またSQLでの処理に更新(update)や削除(delete)はほぼ存在せず、 90%以上の処理は次のようなinsert selectで行います。 これは処理を羃等にし、vacuumを不要にするための工夫です。

insert into keyword_combination_recipe_sets
select
    l.item_id as item_id
    , r.item_id as pairing_item_id
    , l.recipe_id
from
    keyword_recipe_sets l
    inner join keyword_recipe_sets r
    on
        l.recipe_id = r.recipe_id
        and l.item_id <> r.item_id
where
            :
            略
            :
;

巨大なバッチをジョブに分割する

たべみるバッチのSQLが3000行とは言っても、もちろん1つのSQLが3000行ではありません。 上記のようなinsert select文を使った20〜30行くらいのSQLをたくさん組み合わせることで バッチは構成されています。

さて、この大量のSQLをどうやって実行していくべきでしょうか。 とにかく実行するだけでいいのであれば簡単ですね。 例えばRubyなら次のようにpgライブラリを使えばSQLを実行できるので、 このコードでひたすら実行していけばいいわけです。

require 'pg'

conn = PG::Connection.open(host: 'localhost', port: 5444, dbname: 'production', user: 'tabemirudev', password: '')
begin
  conn.query(<<-SQL)
      insert into base_search_counts
      select
          …略…
      ;   
  SQL

  conn.query(<<-SQL)
      insert into item_search_counts
      select
          …略…
      ;
  SQL

  conn.query(<<-SQL)
      insert into daily_si
      select
          …略…
      ;
  SQL

  conn.query(<<-SQL)
      insert into weekly_si
      select
          …略…
      ;
  SQL

  # 必要なだけたくさんSQLを書く
ensure
  conn.close
end

これではまずいのでしょうか?

当然、まずいのです。 まずいと言っても、メソッドに分けるべきとかそういうレベルの問題ではありません。 例えば次のような疑問・要望が湧いてきます。

  • 処理が失敗したとき、どうやって原因を追うのか?
  • 途中で失敗したらどうなるか? 失敗したところから再起動できるか?
  • SQLの単体テストはどうすればいいのか?

「運用しやすいバッチ」とは、「失敗したときに簡単に対処できるバッチ」のことです。 どこで、何が原因で失敗したのかすぐ特定でき、 問題を解決したら失敗したところから実行を再開できるのがよいバッチです。 しかもそのうえで開発しやすく、テストしやすい仕組みならベストでしょう。

このような要望を満たすために、 大きなバッチは小さな処理ごとに複数の「ジョブ」(プログラム)へ分割して、 バッチジョブの組み合わせで全体を構成するのが一般的です。 例えば1つのジョブは1つのSQL(insert select文など)を実行するだけのプログラムとし、 そのようなジョブをたくさん作ることで全体を組み立てるわけです。

SQLバッチフレームワークBricolageによるジョブ化

このようなバッチジョブを作るために、 たべみるでは独自開発したBricolageというフレームワークを使っています。 「SQL」バッチフレームワークとは言っていますが、現在は主にRedshiftを想定した実装になっています。 Bricolageは先日GitHubでOSSとして公開しました(下記URL)ので、誰でも自由に使うことができます。

Bricolageを使ってジョブを作ると決まった場所へ自動的にログが取られますし、 後述するような様々な機能を活用することで容易に開発や運用ができるようになっています。

たべみるの日次バッチは現在200程度のBricolageジョブで構成されています。 その200くらいのジョブの依存関係を図で表すと次のようになります。

▼ジョブフロー図 f:id:mineroaoki:20150626152818p:plain

Bricolageは1つ1つのジョブを作るためのフレームワークなので、 複数のジョブを連動させて上記のようなジョブフローとして動かす仕組みはあまり作り込んでいません。 いまのところ、ジョブフローをCUIで非並列で動作させる簡単な仕組み(bricolage-jobnetコマンド)だけを用意しています。

ジョブフローを実行する仕組みはジョブ管理システムに任せるのがベストでしょう。 代表的なジョブ管理システムとしては、 日立のJP1/AJS3NRIの千手IBMのTivoli Workload Scheduler などが挙げられます。

またOSSのジョブ管理システムとしては、 Hinemos(NTTデータ)、 RundeckAzkaban(LinkedIn)、 Airflow(AirBnB)などがあります。

機能だけで言えば個人的にはJP1/AJS3が使いたいのですが、ちょっと高いのと、運用が大変なので導入していません。 しかし最近、バッチの時間がのびてジョブフロー全体の最適化が緊急の課題になりつつあるので、 そろそろ何かしらのジョブ管理システムを導入する予定です。 ジョブフローのGUIは絶対にほしいので、まずはHinemosから試そうと思っています。

Bricolageの特長

Bricolageには次のような特長があります。

  1. SQLにパラメーターを埋め込める
  2. SQLの定型パターンを自動生成できる
  3. dry-runとexplainが可能

利点1. SQLにパラメーターを埋め込める

Bricolageでは「$変数名」のような記法でSQLに任意のテキストを埋め込めるようにしています。 例えば最初にinsert selectの例として見せたSQLは実は変数の展開後のコードで、 ソースファイルには次のように書かれています。

insert into $dest_table
select
    l.item_id as item_id
    , r.item_id as pairing_item_id
    , l.recipe_id
from
    $keyword_recipe_sets l
    inner join $keyword_recipe_sets r
    on
        l.recipe_id = r.recipe_id
        and l.item_id <> r.item_id
where
            :
            略
            :
;

あえてprepared statementを無視して純粋に文字列ベースでパラメーター展開(置換)をしているので、 式やテーブル名、カラム名もパラメーターにすることができます。 上記のSQLでもテーブル名を変数にしています($dest_tableと$keyword_recipe_sets)。

また、バッチ全体、サブシステム、ジョブのそれぞれのスコープで変数を定義でき、 実行時に上書きすることも可能です。 そのため、本番で一時的にターゲットテーブル名を差し替えてコピーを作る、など柔軟に運用することができます。

さらに、eRubyのタグを使ってRubyの式を埋め込むこともできます。 例えば本番用のサマリーテーブルを複数まとめて切り替えるためのジョブでは次のようなコードを使っています。

begin transaction;
<% publish_tables.each do |table| %>
  alter table $schema.<%= table %> rename to <%= table %>_org;
  alter table $schema.<%= table %>${target_suffix} rename to <%= table %>;
<% end %>
commit;

この切り替えの処理対象となるテーブルは20以上あり、しかも頻繁に増減するので、 手でテーブルリストをメンテナンスするのは事故のもとです。 この方式ならばそのような変化するテーブル群をまとめて扱うことができます。

ちなみに対象となるテーブルは次のようにテーブル定義にBricolage独特の記法で 「publish」属性を付けることで宣言できるようになっています。 「--attributes」で始まっている行が属性の宣言です。

--dest-table: ANYly_stats
--attributes: [publish, replicate]

create table $dest_table
( data_date date encode delta
, item_id int encode lzo

, si real encode raw
, sv real encode raw
, ysi real encode raw

, ci real encode raw
, csv real encode raw
, yci real encode raw
)
distkey (item_id)
sortkey (data_date)
;

利点2. SQLの定型パターンを自動生成できる

2つめの利点は、よく使うSQLの定型パターンをオプションだけで自動生成できることです。

例えばテーブルを洗い替えする場合、ターゲットテーブルと同じ定義のテーブルを別途作成し、 insert selectを完了してからrename(alter table)ですりかえるという手法をわたしは好んで使います。

そのような場合、メイン処理となるinsert select文の他に、 テンポラリーテーブルのdrop table文、create table文や、renameのためのalter table文なども必要になります。 またRedshiftの場合は統計をとるanalyze文やソート順を改善するvacuum sort only、 それに権限を与えるgrantも合わせて実行したいところです。 つまり全体では次のようなSQLになるわけですね。

drop table $dest_table;

create table $dest_table
( ……
, ……
);

insert into $dest_table
select
    ……
;

vacuum sort only $dest_table;

analyze $dest_table;

grant select on $dest_table to $tabemiru_reader_group;

たべみるの場合、余裕で100以上のテーブルがあるので、 これらについていちいちdrop、create、rename……と書きたくはありません。 そこでBricolageでは「ジョブクラス」という仕組みを使ってこれらを生成します。

ジョブクラスは、ジョブで実行されるSQL文のテンプレートです。 例えば「rebuild-rename」というジョブクラスを指定すると、 さきほど述べたdrop、create、renameはBricolageが生成してくれます。 開発者が書かなければいけないのはメインのinsert select文だけです。

また、ほとんどのジョブクラスにはanalyzeオプションやvacuum-sortオプションが 設定されており、これをtrueにするだけで処理後にanalyze、vacuum sort onlyが実行されます。

ジョブクラスとそのオプションはジョブごとに「xxx.job」という名前のYAMLファイルで 指定するようになっています。例えば次のような感じです。

class: rebuild-drop
dest-table: $app_schema.weekly_stats${target_suffix}
src-tables:
    weekly_si: $work_schema.weekly_si
    yearly_si: $work_schema.yearly_si
    latest_yearly_si: $work_schema.latest_yearly_si
    moving_si: $work_schema.moving_si

    weekly_ci: $work_schema.weekly_ci
    yearly_ci: $work_schema.yearly_ci
    latest_yearly_ci: $work_schema.latest_yearly_ci
    moving_ci: $work_schema.moving_ci
table-def: weekly_stats.ct
vacuum-sort: true
analyze: true
grant:
    privilege: select
    to: "$tabemiru_reader"

もちろんこの他に、MySQLからRedshiftへのテーブルコピーのように利用頻度が高いパターンも 「ジョブクラス」として用意してあります。

利点3. dry-runとexplainが可能

Bricolageで作成したジョブはbricolageコマンドで実行できますが、 そのとき、次のように--dry-runオプションを付けることで、SQLを実行せずに表示だけさせることができます。 パラメーターとeRubyタグも展開されて表示されます。

% bricolage --dry-run --job recipe_sets/keyword_combination_recipe_sets-rebuild.sql.job
                                                                            -- 最初のほうはオプションで自動生成されている
\timing on

\set ON_ERROR_STOP false
drop table tabemirudev.keyword_combination_recipe_sets cascade;
\set ON_ERROR_STOP true

-- recipe_sets/keyword_combination_recipe_sets.ct          ここはテーブル定義ファイルから読み込まれた
create table tabemirudev.keyword_combination_recipe_sets
( item_id int encode delta
, pairing_item_id int encode delta
, recipe_id int encode lzo
)
distkey (recipe_id)
sortkey (item_id)
;

-- recipe_sets/keyword_combination_recipe_sets-rebuild.sql.job     以下がSQLファイルに書いた部分
insert into tabemirudev.keyword_combination_recipe_sets
select
    l.item_id as item_id
    , r.item_id as pairing_item_id
    , l.recipe_id
from
    tabemirudev.keyword_recipe_sets l
    inner join tabemirudev.keyword_recipe_sets r
    on
        l.recipe_id = r.recipe_id
        and l.item_id <> r.item_id
where
        以下略

バッチでdry-runができることがどれだけありがたいかは、 バッチを運用したことのある人ならよくわかるでしょう。 実行されるSQLが事前にわかるのは精神衛生上たいへんいい効果があります。

また、--explainオプションを付けて実行すると、 メインとなるinsert selectの部分だけexplainを付けて実行することもできます。 こちらは実際にSQLがサーバーに流れるので、 文法と意味解析チェック(型チェックとか)の代わりとしても使うことが可能です。

Bricolageのその他の機能

ここで説明したBricolageの機能は一部にすぎません。 この他にBricolageには複数データソースを管理して切り替えられる機能や、 ストリーミングロードの仕組みなどが用意されています。

本番投入優先で実装しているのでドキュメントがだいぶ雑なのですが、 気になったかたはぜひGitHubのWikiを眺めてみてください。

Bricolageの今後の開発予定

今後、Bricolageでは次の2つの機能の導入を予定しています。

  1. ジョブ管理システム
  2. ジョブ全体のユニットテスト機能

ジョブ管理システムはHinemosから試すつもり、と書きましたが、 それはそれとして選ぶのも面倒になってきたので、自分で書いてしまおうかなとも思っています。 ジョブ管理システムはいろいろ面倒なことが多いので自分で書くのは避けてきたのですが、 いつまでたっても手頃なものが出てこないので痺れを切らしました。

それから、他にぜひ導入したいのがジョブのユニットテスト機能です。 入力データと正解データを書いておいたら自動的に検証してくれるような機能を予定しています。

『10年戦えるデータ分析入門』6月30日発売!

最後に個人的な宣伝です。

10年戦えるデータ分析入門 SQLを武器にデータ活用時代を生き抜く (Informatics &IDEA)

10年戦えるデータ分析入門 SQLを武器にデータ活用時代を生き抜く (Informatics &IDEA)

わたしの、たぶん10冊目の著書 『10年戦えるデータ分析入門 SQLを武器にデータ活用時代を生き抜く』 が、来週6月30日に発売されます。 池袋ジュンク堂など一部の書店ではすでに先行販売を実施中です。

本の発売タイミングがあまりにもブログ当番のタイミングと合いすぎていて 作為的なものすら感じますが、なんとまったくの偶然です。 わたし自身もびっくりしました。

RedshiftやHadoop(Hive)、Presto、Sparkのような並列分析データベースがメジャーになるにつれ、 SQLによるデータ分析は適用範囲を増しています。 これらのシステムの適用範囲は現在のところその場で考えながらクエリーを投げる探索型の分析がメインですが、 その次には分析クエリーをバッチとして定型化・固定化してダッシュボードで監視したり、 システム連携するパターンも増えていくでしょう。 今回紹介した仕組みはそのような場面で役立つはずです。 ぜひ手元のツールボックスの1つとして分析バッチの仕組みを加えておいてください。