S3に保存したログファイルをストリーム処理するサーバーレスアプリケーションの紹介

インフラストラクチャー部セキュリティグループの水谷(@m_mizutani)です。

クックパッドでは現在セキュリティ監視の高度化に取り組んでおり、その一環としてセキュリティ関連のログ収集およびその分析に力を入れています。ログ収集の部分では可用性などの観点からAWSのオブジェクトストレージサービスであるS3に一部のサービスやサーバのログをまず保存し、後から保存されたファイルを読み込んで分析などに利用しています。分析のためにS3に保存したファイルを前処理する方法としてAWS Glueなどを用いたバッチ処理がありますが、到着したログをなるべくストリームデータのように扱いたい場合もあります。特にセキュリティ関連のログでは以下のようなユースケースで利用しています。

  • アラートの検出: ログを検査してその中から危険度の高いと考えられるログを探し出し、アラートとして発報します。アラートの具体的な例としてはオフィス環境からの不審な通信やスタッフ用クラウドサービスでの不審な活動、スタッフPC上の不審イベントなどが挙げられ、ログの各種パラメータをチェックして危険度を判定します。
  • ログの変換と転送: S3に蓄積されたログの形式を変換して別のデータソースへ転送します。具体的にはGraylogに転送してログ分析をしやすいようにしたり、Athenaによって検索できるように変換して別のS3バケットに配置するといったことをしています。

こうした処理はバッチで対応できないこともありませんが、例えばセキュリティ分析の文脈においては、2, 3分程度の遅延であれば許容できても、1時間単位の遅れが発生するのは少し困ります。できる限り到着したものから順次処理して分析までのレイテンシを短くしたいところです。本記事ではこのような処理に対し、S3に保存したログファイルをなるべくレイテンシが短くなるように処理するためのアーキテクチャ、そしてそのアーキテクチャのためのフレームワークを紹介したいと思います。

最もシンプルなS3オブジェクトの処理構成とその課題

AWSの環境において、S3に到着したログを処理するのに最もシンプルな構成はどのようなものでしょうか? おそらく下の図のように、S3にオブジェクト(ファイル)が生成されたというイベントによってサーバーレスコンピューティングサービスであるAWS Lambdaが起動し、その後起動したLambdaがオブジェクトそのものをダウンロードして処理する、という構成かと思います。

f:id:mztnex:20180502110255p:plain

この構成はシンプルで構築しやすいのですが、実際に運用してみるといくつかの課題が見えてきました。

  • S3のObjectCreatedイベントの送信は、1つのS3バケットにつき1つの宛先にしか設定できない: 単独のサービスしか動かしていない場合は直接Lambdaを起動するだけで事足ります。しかし、S3にオブジェクトが生成されたタイミングで何か処理をしたいという要求が2つ以上でてくると、直接Lambdaを起動するのではなく別の方法を考えないとなりません。
  • 流量制限が難しい: 例えばS3に保存されたオブジェクトを変換して別にデータストアに投入するというタスクでは、投入する先のデータストアに対する流量を気遣う必要があります。データストアの種類にもよりますが流量を超えすぎると全体のパフォーマンスが低下したり、データをロスするといったことが起きる可能性があります。当然、予想される流量に対して受け側のキャパシティを確保してくのは重要なのですが、ログの種類によっては一過性のバーストがしばしば起こります。S3から直接Lambdaを起動しているとこういった任意のタイミングで外部から流量をコントロールするのは難しいです。AWSの機能でLambdaの同時実行数を制限してスロットリングさせる方法もありますが、処理順序が時系列と大幅にずれたり複数回失敗して自前で再試行しないとならないといったところで煩雑になってしまいます。
  • エラーのリトライが大変: システムやサービスを常に更新・拡張していると実運用におけるエラーの発生は避けられません。よってエラーが起きたリクエストに対して再試行するという場面はたびたびあるのですが、これをなるべく簡単にやりたいという思いがあります。例えばエラーの発生はCloudWatch Logsに実行時ログとして残すことができますが、実行時ログの量が多くなるとそもそも検索が難しくなるなどの問題があります。また、どのエラーに対して再試行したのか・していないのかを手動で管理することになり、煩雑になってしまいます。

実際に使われているサーバーレスアプリケーションの構成図

f:id:mztnex:20180502110311p:plain

いくつかの課題を解決してスムーズな運用を試行錯誤した結果、上記の図のような構成に落ち着きました。シンプルな構成に比べるとそれなりに複雑なサーバーレスアプリケーションに仕上がりましたが、課題となっていた部分を解決しています。S3にオブジェクトが作成されてからどのように処理されるかというワークフローを、順を追って以下で説明します。

  1. S3のイベントはLambdaで直で受けるのではなくSNSで配信するようにしています。これによって複数の処理を実行したい場合でもSNSからイベントを受け取ることができるようになります。
  2. S3のObjectCreatedイベントを受けた EventPusher というLambdaが一度これをKinesis Streamにイベントを流します。
    • Kinesis Streamは一度データを投入すると指定時間(デフォルトで24時間)保持されるので、これ以降の処理を一時的に止めなければいけなくなってもイベントを蓄積し、あとから自由に読み出すことが可能です。
    • Kinesis Streamには Fast-laneSlow-lane という2つを用意しています。前者が高優先、後者が低優先にS3オブジェクトを処理するもので、EventPusher が送られてきたイベントの中身に応じてどちらかに振り分けます。それぞれのStreamに設定の違いはありませんが、この先にあるDispatcherの役割が異なります。
  3. それぞれKinesis Streamからイベントを受け取ったFastDispatcher と SlowDispatcherMain を非同期で呼び出します。Main はシンプルな構成のLambdaに該当し、最終的にユーザがやらせたい処理を請け負います
    • 便宜上、 FastDispatcherSlowDispatcher と名前をつけていますが、構成に違いはありません。そのかわり、 EventPusher でなるべく遅延なく処理したいと思うものを Fast-lane からの FastDispatcher 、 多少遅延してもいいものを Slow-lane からの SlowDispatcher にそれぞれ振り分けます
    • 各 Dispatcher には DELAY という環境変数が設定されており、これに整数値 N をセットすることで Main を呼び出した後に指定した N 秒 sleep した後に Dispatcher が終了します。Kinesis StreamからはLambdaは直列&同期的に呼び出されるため、時間単位で呼び出される回数が抑制され、全体の流量が抑制されます。基本的には SlowDispatcher の遅延を増やすことを想定しています。
    • 特に流量制限はこのサーバーレスアプリケーションの外部のメトリック値(例えばDBに実際に投入されるデータの流量など)を参照するためこの図には流量制御の仕組みはでてきませんが、例として定期的にメトリック値をチェックしたりCloudWatch Alarmのような仕組みで遅延を調整するという方法が考えられます。
  4. 起動した Main はイベント情報として作成されたS3オブジェクトのバケット名とキーが引き渡されるので、実際にS3からオブジェクトをダウンロードして必要な処理をします。
  5. Main の処理が適切に終了しなかった場合、2回まで再試行されますが、それでも正常終了しなかった場合はDLQにエラーの情報が引き渡されます。ここでは実装上の都合などからSNSを使っています。
  6. DLQ(SNS)経由で Reporter が呼び出され Lambda の requset_id と呼び出し時の引数(EventPusher から射出されたイベント)が引き渡されます
  7. Reporter は渡されたエラーの内容を粛々と ErrorTaskTable に投入します。ここまでで自動的に処理されるデータフローは一度終了します。

これ以降は保存されたエラーをどのように対応するか、というワークフローになります。

  1. 任意のタイミングでユーザが Drain を起動します。瞬間的なバーストや不安定なリソースによる失敗は (5) の2回までの再試行である程度解消されると期待されますが、それ以外のコード由来のエラーなどは何度試行しても同じ結果になるためループさせるとシステム全体の負荷になってしまします。なので、ここまでエラーで残ったものについてはユーザが確認してから再度処理のワークフローに戻すのが望ましいため手動で発火させる仕様にしています。
  2. 起動した DrainErrorTaskTable からエラーを全て、あるいは選択的に吸い出してます。
  3. Drain は吸い出したエラーになったイベントを再度Kinesis Streamに放流します。この時送信するイベントは EventPusher が作成したものですが、 EventPusher はKinesis Stream投入時にどちらのstreamに投入したかの情報を付与しているため、もともとのstreamに戻されます。

このようなワークフローで処理することで、エラーが起きた場合でも少ない手間で修正・再実行し開発と運用のサイクルを回していくことができます。

実装

上記のサーバーレスアプリケーションを展開するために slips というフレームワークを実装して利用しています。このフレームワークはPythonのライブラリとして動作し、導入したプロジェクトを先程のサーバーレスアプリケーションの構成で動かすための大部分をサポートしてくれます。サーバーレスアプリケーションを作成する時に苦労するポイントの1つにコンポーネントの設定や各コンポーネント同士のつなぎ込みといった作業があるかと思います。slipsは「S3からログファイルを読み込んで順次処理をする」というタスクを実行するサーバーレスアプリケーション作成におけるコンポーネントの設定、および統合をサポートしてくれます。これによって開発者がサーバーレスアプリケーションのコンポーネントを構成する際の消耗を最小限におさえ、ログデータを処理する部分の開発に集中できます。

slipsの実態としてはCloudFormationのラッパーになっています。サーバーレスアプリケーションのコンポーネントを制御するフレームワークとしては他にもAWS Server Application ModelServerless Frameworkが挙げられますが、ここの部分を自作したのは以下のような理由によります。

  • 構成の共通化: 冒頭で複数の処理に利用したいと書きましたが、最終的に読み取ったログの処理する部分以外はほとんどが共通した構成のサーバーレスアプリケーションになります。また、構成そのものをアップデートしてよりよい処理に変えていく必要があるため、複数のほぼ同じ構成を管理するのは冗長でメンテナンスの負荷が大きくなってしまいます。そのため、大部分の構成の定義を自動的に生成してCloudFormationで展開するというアプローチになっています。
  • 環境による差分の調整: ほとんどが同じ構成と上述しましたが、一方で環境ごとに微妙な構成の差もあるためこれを吸収する必要があります。例えばある環境ではコンポーネントを自動生成できるが、別の環境では専用の承認フローに基づいて作成されたコンポーネントを利用する必要がある、といった状況を吸収する必要があります。既存フレームワーク内にこういった環境ごとの差分を簡単なロジックで書き込むことは可能ですが、数が多くなったりロジックが複雑になったりすると管理が難しくなるので、割り切ってPythonのコードとしてロジックを作成しています。

このフレームワークを利用することで、先程のサーバーレスアプリケーションにでてきたAWS上の構成をほぼ自動で展開してくれます。先程のアーキテクチャの図を元に説明すると以下のような構成になります。

f:id:mztnex:20180502110328p:plain

S3へのログ保存とSNSへの通知設定、そしてS3に蓄えられたログをパースした後の処理(例えば、形式を変換したり、どこかへ転送したり、ログの内容を検査したりなど)を実施するためのコードは自前で用意する必要がありますが、それ以外の部分のコンポーネントをCloudFormationを利用してシュッと構築してくれます。これを利用することで、S3のログで何か処理をしたいユーザ(開発者)はS3に保存されたログが途中どのように扱われるかを意識することなく、最終的な処理の部分に集中することができるようになります。

導入と設定ファイルの作成

まずPythonのプロジェクト用ディレクトリを作成します。適当に作成したディレクトリに入って virtualenv などで環境を作成し、pipenv で slips を導入します。

$ virtualenv venv
$ source venv/bin/activate
$ pipenv install -e 'git+https://github.com/m-mizutani/slips.git#egg=slips'

次に設定ファイルを作成します。ファイル名を config.yml とします。サンプルはここ にありますが、本記事では要点のみ解説します。

backend:
  role_arn:
    event_pusher: arn:aws:iam::1234xxxxxxx:role/LambdaSlamProcessorEventPusher
  sns_topics:
    - name: SecLogUplaod
      arn: arn:aws:sns:ap-northeast-1:1234xxxxxx:seclog-event

上記は構築するサーバーレスアプリケーションの各要素の設定を記述します。ObjectCrated のイベントが流れてくるSNSトピックの指定は必要ですが、 role_arn の項では既存のIAMロールを割り当てることもできますし、省略すると必要なIAMロールをCloudFormationで自動的に追加します。同様にKinesis StreamやDLQに使われるSNSも、ARNを指定すれば既存のものが利用され、指定しなければ必要なものが自動的に生成されます。

これは組織や環境によって各リソースの管理方法が異なるため、別途作成されたリソースを使う場合とCloudFormationで自動的に生成する場合を切り替えることができるようにしています。例えばクックパッドでは本番環境のIAM権限などいくつかのリソースはGithub Enterprise上のレポジトリで厳密に管理されていますが、開発環境では比較的自由にリソースを作成できるようになっています。このような環境にあわせて容易に展開できるような実装になっています。

handler:
  path: src/handler.py
  args:
    sample

上記の設定は自分が実行させたい処理についての定義をしています。path で実行させたいファイルのパスを指定し、args で実行時に渡したい引数の内容を指定します。

bucket_mapping:
  mizutani-test:
    - prefix: logs/azure_ad/signinEvents/
      format: [s3-lines, json, azure-ad-event]
    - prefix: logs/g_suite/
      format: [s3-lines, json, g-suite-login]

上記のパートでどのS3バケットにどのようなフォーマットのオブジェクトがアップロードされるかを指定します。これを記述しておくことで、自分で書くスクリプトはパース済みのデータを受け取れるようになります。backet_mapping 以下のキーがS3バケット名、その下のリストでS3キーのプレフィックスとパースするためのフォーマットを指定します。上記例にでてくる指定は以下のような意味になります。

  • s3-lines: S3のオブジェクトをテキストファイルとして1行ずつファイルを読み込む
  • json: 1行をJSON形式としてパースする
  • azure-ad-event: AzureADのログ形式だと仮定して、タグ付けしたりタイムスタンプを取得する

コードの準備

設定の準備ができたら、自分が処理したいコードを用意します。デフォルトだと ./src/ 以下においたコードがzipファイルにアーカイブされてAWS上にアップロードされるので、今回は ./src/handler.py というファイルを作成します。サンプルとしてログの件数をCloudWatchのカスタムメトリクスに送信するコードを書いてみます。

import boto3
import slips.interface

class LogCounter(slips.interface.Handler):
    def setup(self, args):
        self._namespace = args['namespace'],
        self._count = 0

    def recv(self, meta, event):
        self._count += 1

    def result(self):
        metric = {
            'MetricName': 'LogCount',
            'Value': float(self._count),
            'Unit': 'Count',
        }

        cloudwatch = boto3.client('cloudwatch')
        cloudwatch.put_metric_data(Namespace=self._namespace, MetricData=[metric])

        return 'ok'  # Return some value if you need.

slips.interface モジュールに含まれている slips.interface.Handler クラスを継承してクラスを定義すると、これが読み出されて実行されます。新しく作成するクラスには以下のメソッドをそれぞれ定義します。

  • def setup(self, args): 最初に1度だけ呼び出されます。argsには handler設定項目内の args で指定した構造データが渡されます
  • def recv(self, meta, event): パースしたログ1件に対して1度呼び出されます。meta はパースする過程で得られた付加情報が格納されています。event は辞書型のパース済みログデータが渡されます
  • def result(self): 終了時に呼び出されます。returnで値を返してテストなど利用します

デプロイ

以上の準備ができたらAWSの環境にデプロイできます。ディレクトリには以下のようになっているかと思います。参考までに、サンプルの構成をgithubにあげておきます。

$ tree
.
├── Pipfile
├── Pipfile.lock
├── README.md
├── config.yml
├── src
│   └── handler.py
└── venv
    ├── bin
(省略)

この状態で slipsdeploy コマンドを実行します。実行にはCLI用のAWSのCredential情報が必要です(詳しくはこちら)以下が実行結果になります。

$ slips -c config.yml deploy
2018-04-13 14:40:07.379 INFO [cli.py:466] Bulding stack: log-counter
2018-04-13 14:40:07.379 INFO [cli.py:154] no package file is given, building
2018-04-13 14:40:07.736 INFO [cli.py:470] package file: /var/folders/3_/nv_wpjw173vgvd3ct4vzjp2r0000gp/T/tmpem2eyhgf.zip
2018-04-13 14:40:07.736 INFO [cli.py:474] no SAM template file is given, building
2018-04-13 14:40:07.763 INFO [cli.py:481] SAM template file: /var/folders/3_/nv_wpjw173vgvd3ct4vzjp2r0000gp/T/tmp1bahl7dn.yml
2018-04-13 14:40:07.763 INFO [cli.py:418] package command: aws cloudformation package --template-file /var/folders/3_/nv_wpjw173vgvd3ct4vzjp2r0000gp/T/tmp1bahl7dn.yml --s3-bucket home-network.mgmt --output-template-file /var/folders/3_/nv_wpjw173vgvd3ct4vzjp2r0000gp/T/tmpa1r9z4e7.yml
2018-04-13 14:40:08.652 INFO [cli.py:431] generated SAM file: /var/folders/3_/nv_wpjw173vgvd3ct4vzjp2r0000gp/T/tmpa1r9z4e7.yml
2018-04-13 14:42:11.138 INFO [cli.py:461] Completed (Applied)
2018-04-13 14:42:11.138 INFO [cli.py:559] exit

これがうまくいくとバックグラウンドでCloudFormationが実行され、対象のアカウントに必要なリソースがバーンと展開されます。同時に、CloudWatchのダッシュボードも自動生成されるので、それを見るとこのサーバーレスアプリケーションの実行状況がある程度把握できます。

f:id:mztnex:20180502110345p:plain

まとめ

AWS Lambdaや各種マネージドサービスを使ったサーバーレスアプリケーションは便利に利用できる反面、実行の制御やエラー処理などの勝手がインスタンス上とは異なり、実行の制御やエラー処理をスムーズに実施できるような構成にする必要があります。これをサポートする機能やサービスもAWSには充実しており、クックパッドでは日々それらを活用しながら改善に取り組んでいます。

今回はS3からログを順次読み取って処理するサーバーレスアプリケーションの構成と、それを補助するためのフレームワーク slips を紹介しました。今後、新たにサーバーレスアプリケーションを構築する方の参考になれば幸いです。