Redshiftのデータをサービス改善に役立てるデータ転送システム Queuery

こんにちは、技術部データ基盤グループの佐藤です。この記事では最近業務として主に取り組んでいたDWHから外部へのデータ転送基盤であるQueuery(きゅうり)について、OSSとしてGitHubへの公開しましたのでこの記事でご紹介をします。

github.com

Queueryというシステムは2017年の春頃にid:koba789の手により作られ、クックパッドのデータ基盤における重要な立ち位置を担っています。

背景

従来、RedshiftでSELECT文などの取得系クエリを実行するためにはRedshiftに直接接続してクエリを発行していました。この方法ではクエリ結果が巨大な場合にクライアント側のリソースを逼迫させることがありました。

しかし、それを避けるためにカーソルを使おうものなら今度はたちまちRedshiftのリーダーノードの具合が悪くなってしまいます。Redshiftから巨大な結果を得るクエリを外部から実行するためには様々な工夫が必要でした。

さらに通常の(PostgreSQLプロトコルを使った)接続方式では遠隔地(別AWSリージョン)からの接続が難しかったり、よくコネクションが切れたり、コネクションが切れると結果が取得できなかったりします。AWSのSecurity Groupの設定も忘れがちです。 また、セットアップがActiveRecord経由になるため単純に設定が面倒です。しかもActiveRecordを使ったあらゆる使い方ができてしまうため、標準化も困難です。

Queueryはこれらの問題を解決するためにあります。Queueryを使うことで、クライアントはRedshiftに直接接続せずHTTP APIで取得系クエリを実行できるようになります。

f:id:ragi256:20211202115017p:plain
アプリケーションからRedshiftへ接続する手法

仕組み

QueueryはRedshiftへUnload文を投げる役割を持つAPIサーバーと、Unload結果をS3から取得するクライアントに分かれています。クライアント側から投げられたSELECTクエリをHTTP API側で受け取り、Unload文へラップしてRedshiftに投げます。クライアント側はその結果をポーリングし続け、Unloadが完了したらS3へアクセスして結果を取得するようになっています。

できうる限りQueuery利用者の開発を単純化するため、クライアントはgem化されており、Gemfileに追加して設定ファイルを追加すればすぐ利用できるようになっています。

クライアントのサンプルコード

下記のコードをクライアント側でジョブに書き、必要なタイミングでバッチ実行するだけでRedshiftにあるデータを扱えるようになります。

Queueryの設定ファイル

# configuration
RedshiftConnector.logger = Logger.new($stdout)
GarageClient.configure do |config|
  config.name = "queuery-example"
end
QueueryClient.configure do |config|
  config.endpoint = 'queuery_api_server_host'
  config.token = 'XXXXXXXXXXXXXXXXXXXXX'
  config.token_secret = '*******************'
end

Queueryのクライアントコード

select_stmt = 'select column_a, column_b from the_great_table; -- an awesome query shows amazing fact up'
bundle = QueueryClient.query(select_stmt)
bundle.each do |row|
  # do some useful works
  p row
end

コンソール

また、簡易的なものではありますがWebコンソールも付属で用意してあり、コンソールではクライアント側の認証に必要なトークンを発行・無効化したり、直近でQueueryに投げられたクエリの様子を確認できます。

f:id:ragi256:20211202111451p:plain
QueueryのWebコンソール画面の例

QueueryサーバーのAPI側はシンプルなRailsで作られており、コンソールのフロントエンドはTypeScriptとReactでSPAにしています。

最近の改修内容

Queueryを紹介するついでに今年自分が改修を行った箇所について書いておきます。

QueueryアカウントとRedshiftユーザーの紐付け

以前はQueueryのコンソールから好きな名前のQueueryアカウントを誰でも作ることができ、既存Queueryアカウントの認証用トークンを誰でも有効・無効切り替えができる仕様になっていました。また、RedshiftでのUnload文実行はQueuery専用に用意された1つのRedshiftユーザーによって行われていました。

このままでは社員の誰かが他チームのQueueryアカウントに手を加えてしまう恐れがあります。また、Unloadに使うユーザーの権限をQueueryアカウント毎、個別に分けることもできません。DWHに関するDevOpsを進めていく一環として、利用者の権限をきちんと分離し、Queueryアカウントをそのアカウント所有者・所有チームのRedshiftユーザーにしか扱えないようにする必要がありました。

そこで、Queueryアカウントについて、作成・認証用トークン作成、削除のタイミングでRedshiftユーザーの認証を求めるようにしました。認証作業自体はRedshiftにチェック用の単純なクエリを直接を投げるのみとし、本人確認がとれればユーザー名のみ記録することとします。 その後、実際のUnload文実行時には登録されたユーザー名を使ってGetClusterCredentials APIで一時的なユーザーを作成することにしました。

temporal_credential = Aws::Redshift::Client.new.get_cluster_credentials({
  db_user: redshift_user,
  db_name: database_name,
  cluster_identifier: cluster_identifier,
  auto_create: false
})

ds.config.merge!(username: temporal_credential.db_user, password: temporal_credential.db_password)
export_execute(datasource: ds, query_statement: sql, logger: logger)

こうすることでQueueryアカウントの管理は所有者であるRedshiftユーザーのみが行え、アカウント毎のクエリ実行はそのアカウントに紐付けられたRedshiftユーザーに基づいて実行されるようになりました。

ただし、現状ではRedshiftユーザーとQueueryアカウントとの2重管理になっており、権限管理が無用に複雑化しているという問題も抱えています。この点については今後Queuery側でのアカウント管理をやめ、RedshiftユーザーをそのままQueuery側のアカウントとして扱えるようにしようかと検討しています。

Unload文のmanifest.jsonを使った型キャスト

これまでQueueryにより出力されたファイルは全て圧縮&分割されたCSVとしてS3に出力されており、Queueryクライアントではその型を自動判別することができませんでした。そのため、Queueryクライアントを利用する開発者は取得した結果に対して手動で型キャストを行うコードを書く必要がありました。

RedshiftのUnload文には様々なオプションがあり、その中にはUnload結果に関するメタ情報を出力する、MANIFESTオプションがあります。
https://docs.aws.amazon.com/ja_jp/redshift/latest/dg/r_UNLOAD.html

このオプションにより出力されるJSON形式のマニフェストファイルの中には列名とデータ型に関する情報が含まれています。このマニフェストファイルを読み、自動で各カラムの型を判別して型キャストができるよう、QueueryサーバーとQueueryクライアントの両方に改修を行いました。

sql = "selectt 1, 1::bigint, 1.0, 'hoge', false, date '2021-01-01', timestamp '2021-01-01 00:00:00', null"

bundle1 = QueueryClient.query(sql) # 従来
bundle1.each do |row|
  p row # => ["1", "1", "1.0", "hoge", "f", "2021-01-01", "2021-01-01 00:00:00", ""]
end

bundle2 = QueueryClient.query(sql, enable_cast: true) # 型キャストオプション追加
bundle2.each do |row|
  p row # => [1, 1, 1.0, "hoge", false, Fri, 01 Jan 2021, "2021-01-01 00:00:00", nil]
end

また、副産物として従来では文字列型の空文字列と区別しづらかったnullをきちんと区別できるようになりました。

BarbequeからRedshift DataAPIへの非同期処理移行

QueueryではSQLを受け付けてからUnload文の実行結果を返却するまで、処理時間はSQLの内容に依存しています。SQLによっては非常に時間がかかってしまうため、非同期化をする必要がありました。そこで、元々はBarbequeというキューシステムを利用してジョブの非同期化をしていました。BarbequeはDockerとSQSを利用したジョブキューシステムです。

以前はこれでうまくいっていたのですが、2020年4月に起きたSQS障害で影響を受けたことや、Queueryの構成が複雑化していたことなどもあり、もっとシンプルで頑健性の高い仕組みにできないかと考えられていました。 そこで、2020年にRedshift Data APIが発表され、そのAPIに含まれるexecuteStatementdescribeStatementを利用すればBarbeque依存を外せそうだという案が上がりました。調査したところ非同期処理の周辺をこちらで保つ必要がなく、Queueryの構成をシンプル化できそうだということがわかりました。

f:id:ragi256:20211202115057p:plain
移行によるQueuery構成の変化

移行後は特に問題らしい問題が発生すること無く安定して稼働し、無事Barbequeからの依存を取り除くことができました。

Queueryと弊社データ基盤の構成

そもそもRedshift Data APIが扱えるのであれば、各開発者が自由にexecuteStatementをし、各自がUnloadをすればよいのではないか? そうすればこのシステムと運用は不要になるのではないかという意見もあるかと思います。

「背景」に書いたような理由から単にデータ取得をUnload文に絞りたいというのも理由にありますが、本当はもっと根本的な理由もあります。 弊社データ基盤では権限管理やデータガバナンスなどの運用観点から、設計思想にもとづくいくつかのポリシーがあります。(下記は一部抜粋で、他にもこういったポリシーもあります)

  1. Redshift内部がカオス化するのを避けるため、Redshiftへの書き込みはDWHチームが管理しやすいよう手段を限定する
  2. Redshiftへのバルク&ストリーミングロード、DWH内部のETLバッチ(集計処理など)、外部へのデータ転送は各種専用ツールを使ってワークフローを分ける
  3. できる限り自動化を進め、権限を移譲できる部分はできる限り強い権限を各チームに移譲し、各自でやってもらう

弊社がQueueryやBricolageといったDWH用ツールを作り、運用している理由はここにあります。DWHチームによる中央集権ではなく、できる限り民主的なデータ活用を推進していくにあたって、無秩序や混沌を避けるための必要な施策がDWH周辺ツールの充実でした。Queueryもまたその1つです。

Queueryを扱うことで社内の開発者誰もが気軽にRedshiftを活用できるようにしつつも、DWHチームによるデータフロー把握や障害時対応がしやすくなります。Redshiftからのデータ取得手段をQueueryに絞ってしまうことで何か不便であったり問題が発生するようなことがあれば、その都度上記のポリシーを考慮しつつチームで解決策を考え、実装していけばいいという方針です。

つまり、利用者の権限を緩めて自由に利用してもらいつつも、必要なところは手段を固定し、DWHチームによる運用負荷を減らすために必要だったということです。

DWH基盤を整えるためのエコシステムとQueuery

2021年を通して上記のような改修作業を続け、活発な開発が行われてきたQueueryでしたがOSSとしてGitHubに公開されていたのはクライアント側の実装のみでした。開発を続けてきて構成もシンプル化できたこともあり、今回OSSとしてサーバー側の実装も公開することとしました。これで、Redshiftに対するbatchシステム用ツールファミリー bricolages以下にQueuery周辺ツールが全て揃いました。

(※ redshift_connectorはRedshiftからデータを取得した後、ActiveRecordを利用してRDBMSのテーブルを簡単に更新できるようにするgemです)

[2021-12-09 追記] Python版クライアントも公開されました。PyPiから利用できます。 https://github.com/bricolages/queuery_client_python

Techlifeでも何度かご紹介している(2017年版2019年版2020年版)通り、弊社データ基盤グループはRedshiftを中心としてDWHとその周辺システムを構成しています。Redshiftを活用したデータ基盤構築するために必要なツール群のほとんどは内製であり、ツールを組み合わせて運用しています。

今回、また1つQueueryというデータ基盤を構築するエコシステムの一部を新たに公開することができました。クックパッドではDWHだけに留まらず、bdash-serverDmemoなどの多くのデータ関連ツールをOSSとして開発し、公開しています。これらのツールがより多くの人に使われ、活発な開発のもと相互に連携し扱いやすいエコシステムを形成する未来が訪れれば良いと考えています。