DMS を利用した継続的なデータ変更検知

SRE の鈴木 (id:eagletmt) です。先日、この開発者ブログで赤松から One Experience プロジェクトについての紹介がありました。

自分もこの One Experiene プロジェクトに携わっており、このプロジェクトが始まったちょうど1年くらい前にはイギリスにあるオフィスに行き、グローバル版のシステムを開発・運用しているメンバーと直接顔を合わせたりもしていました。自分は One Experience プロジェクトにおいて主にデータ移行について担当していました。データ移行に必要な作業はいくつかありますが、この記事ではその中から日本版のシステムのデータベースで発生したデータの変更をどのようにしてグローバル版のシステムのデータベースに継続的に反映したかの部分について紹介します。

One Experience でのデータ移行

赤松の記事にも書かれていたように、日本版とグローバル版は完全に独立したシステムとして動いており、Aurora MySQL を利用しているという共通点はありつつもデータベースサーバはそれぞれ独立していました。グローバル版のシステムはいくつかのレシピサービスを買収し取り込みながら成長してきた歴史がありますが、取り込み元のサービスを一度停止してからデータを移行するのが基本でした。

今回の場合、取り込み元のサービスにあたる日本版のシステムを一度停止してからデータ移行を行うという手段を取る可能性も考えましたが、利用者の多い日本版のシステムに停止期間を設けることには高いリスクがありますし、グローバル版に移行したバージョンのモバイルアプリを段階的にリリースする計画や本番環境の実データを使って少人数に対して事前にユーザテストを行う計画が早期に決定されたので、両方のシステムを稼働させながらダウンタイム無しでデータ移行を行う方針になりました。そのためには日本版のシステムで発生したデータの変更を継続的にグローバル版のシステムに反映するしくみが必要となります。実際、このおかげで One Experience のロールアウトを段階的に安全に進めることができました。

データ変更を検知する方法

データの変更を継続的に反映するには、まずはデータの変更を検知する必要があります。日本版のシステムでは、マイクロサービス間でデータ連携をするために Ping というライブラリを用意し Rails アプリから Amazon SNS にイベントを発行するようにしていました。Ping については過去の記事 https://techlife.cookpad.com/entry/2017/05/10/130000 でも少し触れられています。

しかし、この Ping を長年運用しているうちに出てきた課題の1つとして信頼性があまり高くないという点がありました。Rails のレイヤでイベントを発行するため、たとえばデータを更新したのにイベントを発行する実装を忘れてしまったり、データを更新してからイベントを発行するまでの間に予期せぬ例外などで処理が中断してしまったり、イベントを発行する実装を経由しないようなイレギュラーな更新処理を実行してしまったりと、データが更新されているのにイベントが発行されないケースがいくつか考えられました。

今回はより信頼性の高い方法として MySQL の binlog を使うことを考えました。binlog であればどのような手段で更新されてもデータの変更を検知できそうです。一方で binlog を扱うようなソフトウェアを自作することにはややハードルがありました。binlog 中のイベントを正しく解釈する必要がありますし、binlog をどこまで読んだかを一貫性を持ってどこかに永続化する必要があったりと、考慮しなければならない点がいくつかありそうです。そこで、AWS Database Migration Service (DMS) に着目しました。DMS はその名の通りデータベース移行用のサービスですが、その機能の1つとして MySQL の binlog を読んで Amazon Kinesis Data Streams にその内容を送信することができます。
https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Task.CDC.html
https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.Kinesis.html
Kinesis Data Streams にデータをストリームすることができれば、その先は AWS Lambda を起動したり Kinesis Client Library (KCL) を使ったりすることができるので、データの処理方法の幅が一気に広がります。DMS の transformation rules でも簡単なデータの加工はできますが、日本版のシステムとグローバル版のシステムでは MySQL のテーブルのスキーマも当然異なっており自由度高くデータを加工して移行する必要があったため、今回は Kinesis Data Streams から Lambda を起動してデータを加工しながらグローバル版のシステムに継続的にデータ変更を伝えることにしました。

DMS を利用した継続的なデータ変更検知

DMS を利用した継続的なデータ変更検知の概要は以下のように図示できます。

DMS を利用した継続的なデータ変更検知

Aurora MySQL -> DMS

まず、事前準備として Aurora MySQL で binlog を作るよう設定しておきます。これはパラメータグループで設定できます。
https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/USER_LogAccess.MySQL.BinaryFormat.html
Aurora MySQL の binlog を読むような機能は DMS では CDC と呼ばれています。Aurora MySQL をソースとしたエンドポイントを作成し、レプリケーションインスタンスと CDC を指定したレプリケーションタスクを作成すればすぐに用意できます。
また、今回の用途では変更前の値と変更後の値の両方を知りたいという要件がありました。たとえばどのカラムが変更されたのかを知りたい場合、変更前の値が必要になります。DMS の Before Image という機能を使うと変更前の値も含めて Kinesis Data Streams に送ることができます。
https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.Kinesis.html#CHAP_Target.Kinesis.BeforeImage
この点も DMS の利用を決めた理由の1つです。

DMS の CDC を利用する上で注意する必要があるのは LOB の扱いです。LOB とは large binary object を意味する DMS の用語で、Aurora MySQL においては mediumtext 型の値などが該当します。
https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Source.MySQL.html
LOB のカラムの値はデフォルトでは含まれないため、mediumtext 型などがある場合は LOB に関する設定も必要になります。
https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.LOBSupport.html
そして LOB の扱い方についてはいくつか種類がありますが、Kinesis Data Streams をターゲットとしている場合は Limited LOB mode しか使えないという制限があります。
https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.Kinesis.html#CHAP_Target.Kinesis.Limitations
また、Before Image 機能を使うときにも LOB には制限があります。今回のケースでは LOB に該当するカラムが一部含まれていたので Limited LOB mode を有効にしつつ、そのカラムは変更前の値を知る必要がなかったのでそのカラムでは Before Image 機能は無効化しました。

DMS -> Kinesis Data Streams

この部分は DMS のレプリケーションタスクのターゲットに Kinesis Data Streams に向けたエンドポイントを指定するだけなのでとくにこれといった工夫はありません。日本版のシステムはマイクロサービスに分割されていたためいくつかの Aurora MySQL クラスタからデータ移行する必要があり、Aurora MySQL クラスタ毎に DMS のレプリケーションタスクを作成しつつも送信先の Kinesis Data Streams は1つだけにしました。

Kinesis Data Streams -> Lambda

Lambda の event source mapping において Kinesis Data Streams を指定することで、Kinesis Data Streams に流れてきたデータを入力として Lambda を起動することができます。
https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html
より効率的にデータを処理するため、ある程度イベントを溜めてからまとめて Lambda を起動することもできます (いわゆる batching)。 batching を行うことでより効率的にデータを処理することができますが、batching でまとめられたイベントの一部だけ処理に失敗したときのことを考慮する必要がでてきます。このような状況に対応するため、Lambda では ReportBatchItemFailures を有効化することで部分的な失敗を扱うことができます。
https://docs.aws.amazon.com/lambda/latest/dg/services-kinesis-batchfailurereporting.html
今回のデータ移行でもこれを活用し、batching を行いながら必要なリトライ回数が減るようにしています。

Lambda -> Global system

Kinesis Data Streams を経由しながら DMS がキャプチャした変更前の値と変更後の値、そしてそのメタデータを入力として Lambda が起動されます。Lambda はメタデータに含まれているテーブル名や変更が行われたカラム名を見ながら、グローバル版のシステムに必要なデータを組み立てて送信します。データの種類によっては、日本版のシステムでは2つ以上のテーブルで表現していたデータをグローバル版のシステムでは1つのテーブルにマッピングする必要があるケースもあり、DMS がキャプチャした1つのテーブルの変更内容だけでは必要なデータが足りないこともありました。その場合はデータ元の Aurora MySQL クラスタにクエリし直してデータを補うことにしました。厳密な意味では一貫性がとれていない方法ですが、このような楽観的な方法でデータ変更を反映しつつ、後からデータ移行の失敗に気付いたときに移行し直して整合性をとるという方針でデータ移行を進めました。

コスト

この構成においては主に

  • DMS
  • Kinesis Data Streams
  • Lambda

という AWS のサービスを利用していますが、料金面で支配的だったのは DMS のレプリケーションインスタンス、Kinesis Data Streams のシャード、そして Lambda のログを保存する CloudWatch Logs でした。batching の効果もあり、Lambda の料金はこれらと比べるとほんのわずかです。料金がどれくらい必要かはデータの変更頻度がどれくらいかに大きく依存しますが、クックパッドというサービスは更新系よりも参照系のほうがずっと多い性質を持っているため、DMS のレプリケーションインスタンスは dms.t3.small で十分でしたし、Kinesis Data Streams は単一シャードでもほとんど間に合う規模でした。一方でインフラ管理はすべて AWS に任せることができているので、運用の手間も含めたコストはかなり安いと言えると思います。

まとめ

One Experience プロジェクトにおけるデータ移行を担当し、その一部である日本版のシステムのデータベースで発生したデータの変更をグローバル版のシステムのデータベースに継続的に反映する方法について紹介しました。binlog を読むためのライブラリは世の中にいくつかあり、有名なところだと Go 向けのモジュールの github.com/go-mysql-org/go-mysql が実装していたりしますが、DMS を利用することで面倒な箇所を AWS のマネージドサービスに任せることができたと思います。今回は異なるシステム間の単方向の継続的なデータ移行のために利用しましたが、最初のセクションで触れた Ping のようにサービス間のデータ連携の基盤としても利用できそうですし、意外と応用範囲は広いかもしれません。