cookpad storeLive のクライアントアプリ開発の裏側

こんにちは。メディアプロダクト開発部の柴原(@nshiba310)です。 趣味は Destiny2 というゲームです。

普段は cookpad storeLive(以下、storeLive)のクライアントサイド(AndroidTV)の開発を行っています。 本記事では storeLive のクライアントサイドの開発についてご紹介したいと思います。

storeLive とは

スーパーマーケットの店頭に設置した縦型55インチの大型サイネージで、著名人や料理研究家による料理デモンストレーション映像をLiveや収録動画で提供するアプリです。
プレスリリースはこちら

スーパーの担当者はまず storeLive が置かれている売り場に適した再生したい動画を選択します。 storeLive は担当者が動画を止める操作をするまで選択された動画をループで再生し続けます。
また storeLive では土・日曜日など、比較的人が集まりやすいタイミングでライブ配信を行うことがあり、ライブ配信が始まるとアプリは自動的にライブ閲覧画面を起動し、終了すると自動的に前に再生していた動画再生画面に戻るようになっています。

データの自動更新

storeLive は通常のアプリと違い基本的に人間の操作が動画の選択時しかありません。
そのため、ライブ閲覧画面の起動や動画情報の更新などは自動で行う必要があり、 storeLive ではポーリング機能を実装しライブ配信や動画の情報を自動的に更新しています。

仕様としては、

  • サーバーリクエスト時にレスポンスに次回実行時間が含まれていた場合にはその時間で次の実行を登録
    • ライブ配信が近づいてきたら高頻度で情報の更新を行うため
  • デフォルトは10分間隔で実行
    • ライブ配信を本番より15分ほど先に開始しておきクライアントから10分間隔でアクセスすることにより、必ず全ての端末でライブ閲覧画面を起動させる
    • 高頻度リクエスト時は1分間隔で実行されることを想定

の2つがあり、これを実現するために storeLive では AlarmManager を採用しました。

AlarmManager を採用する理由

近年の Android アプリ開発において、定期実行処理を実装する場合には JetPack Components の WorkManager を使うことが候補としてあがってくると思います。 しかし、 WorkManager には以下の制限があり今回要求されている仕様を満たすことができないため使用することができませんでした。

  • 最低の繰り返し実行間隔は15分
  • 厳密な実行時間は保証されない
    • 実行間隔は WorkManager に設定した時間間隔の中で端末の状態が最適かどうか(Doze Mode や WiFi 接続している等)を判断し最適なときに実行されるため常に何秒/何分後に実行される、という保証ができない

一方で AlarmManager には setAlarmClock() というメソッドがあり、これを用いると1秒単位で実行時間を設定でき、きちんと設定した時間に発火してくれます。

WorkManager にはリトライ機構もあり可能なら使いたかったですが、今回は仕様に合わず AlarmManager を採用しました。

以下の例では 100 秒後に発火するアラームを設定しています。

val triggerTimeSec = 100

val calendar = Calendar.getInstance().apply {
    timeInMillis = System.currentTimeMillis()
    add(Calendar.SECOND, triggerTimeSec)
}
val intent = Intent(context, TestBroadcastReceiver::class.java)
val pendingIntent =
    PendingIntent.getBroadcast(
        context,
        0,
        intent,
        PendingIntent.FLAG_UPDATE_CURRENT
    )
val alarmManager = context.getSystemService(Context.ALARM_SERVICE) as AlarmManager
alarmManager.setAlarmClock(
    AlarmManager.AlarmClockInfo(calendar.timeInMillis, null),
    pendingIntent
)

このようなバックグラウンド処理に関しては Android Developers にガイドがあるため一度読んでおくことをおすすめします。
https://developer.android.com/guide/background

オフライン機能

スーパーはいろいろな人が出入りする場所のためいろいろな電波が飛び交う可能性があり、ネットワーク状況が不規則に不安定になる可能性があります。
また storeLive が稼働する環境はスーパーの固定された位置を想定しているため、ネットワーク状況が悪くなってもネットワーク状況がいい場所に移動できないということもあり、ネットワークが不安定でアプリがうまく動かないという問題が稼働当初からありました。
他にも、 storeLive は基本的に動画を再生し続けるアプリであり人間の操作はほとんど存在しません。それに加えて、アプリを操作して動画を再生する人と動画を閲覧する人は別の人間です。そのため、通常のアプリでネットワークに接続出来ない場合によく取る手段としてリトライボタンを表示したり、 ネットワークの接続を確認してください といったような表示で凌ぐことはできません。

これらの問題がありますが、ネットワーク接続が切れるたびに担当者に復旧作業を行うようお願いしていては運用コストが跳ね上がってしまいかなりの負担になってしまうため、 storeLive ではできるだけアプリの安定性を高めるためにオフライン機能を実装しました。

オフライン機能を実装するにあたって取得したデータをローカルに保存する必要があり、今回はDBに保存することにしました。
Android でDBを扱うライブラリはいくつかあると思いますが、今回は Room を採用しました。
Room を採用した理由については、 storeLive プロジェクトでは Kotlin coroutines を採用していることや、 ViewModel や LiveData といった JetPack Components を採用しているので、これらと連携する手段が公式で用意されているためです。
また、チームとして Google が推奨しているアーキテクチャや JetPack Components をなるべく採用していこう、という動きがあるのも理由の一因です。

Room と LiveData で実現するオフライン機能

Room と LiveData を組み合わせて使うと簡単にそしてシンプルにオフライン機能を実装することが可能です。

Room 以下のようなデータを定義します。

@Entity(tableName = "movies")
data class Movie(
    @ColumnInfo(name = "id") val id: Int,
    @ColumnInfo(name = "movie_url") val movieUrl: String
)

次に Room で DAO を定義します。
以下の例では select 文を発行する関数を実装していますが、このとき戻り値を LiveData をラップすることでデータを LiveData 経由で非同期に取得することが可能です。
ちなみに LiveData を使う場合は裏側で勝手に別スレッドでDBに問い合わせするため、 selectAvailableMovies() 関数を呼び出すときに別スレッドで呼び出す必要はありません。

@Dao
abstract class MovieDao {

    @Query("SELECT * FROM movies")
    abstract fun selectAvailableMovies(): LiveData<List<Movie>>
}

使用するときは普通に observe してあげるだけです。
LiveData にデータが渡されるタイミングとしては、メソッドを呼び出した時の他に、 SELECT * FROM movies のクエリ結果に変更があった場合(movies table にデータが insert された場合等)に新しいデータが渡されます。

class MainActivity: DaggerAppCompatActivity() {

    @Inject
    lateinit var movieDao: MovieDao

    onCreate(savedInstanceState: Bundle?) {
    
        movieDao.selectAvailableMovies.observe(this, Observer { movies ->
            // 渡されたデータを処理する  
        })  
    }
}

こうすることで Activity や Fragment などデータが欲しい場所で API を叩くのではなく、 LiveData を observe しておき、新しいデータが欲しい場合は API を叩き結果を DB に insert すると、 UI の更新が可能です。
一見 DB が間に入っているため面倒ですが、一度でも UI が表示できれば DB にデータが入っているため、ネットワークがないとき、つまりオフラインでもアプリが動作するのでこれでオフライン機能の完成です。

また今回のオフライン機能は DB のデータを真としているため表示してる Activity/Fragment 以外からのデータの更新が可能です。
前述したとおり、 storeLive ではポーリング機能が存在してます。
もともとはポーリングでデータの更新を行ったあと BroadcastReceiver を用いて現在表示している Activity にデータの更新通知を送っていたのですが、今回のオフライン機能を実装したことでポーリングでデータの更新を行ったあと DB にデータを insert するだけで良くなったのは非常に嬉しかったです。

リストの表示には Paging と組み合わせて使う

JetPack Components の中にはリスト表示を行うためのライブラリとして Paging があります。 詳細な説明は省きますが、通常 Paging は DataSource クラスを使ってデータを読み込みます。
DataSource クラスには Factory クラスが用意されており、 Room は DataSource.Fractory クラスを戻り値にすることができます。

@Dao
abstract class MovieDao {
    
    @Query("SELECT * FROM movie_list_item")
    abstract fun selectMovieList(): DataSource.Factory<Int, MovieListItem>
}

また、 ktx の version 2.1.0-alpha01 から DataSource.Factory クラスに toLiveData() という拡張関数が用意されています。
この関数は内部で LivePagedListBuilder を用いて LiveData<PagedList> に変換してくれるため、そのまま RecyclerView でリストの表示が可能となります。

val movieList = movieDao.selectMovieList().toLiveData(
        config = Config(
            pageSize = 10,
            prefetchDistance = 10,
            enablePlaceholders = false
        )         
    )

movieList.observe(this, Observer {
    // RecyclerView で表示
    adapter.submitList(it)
})

オフライン機能の難しかった点

管理画面で操作した内容はいつ端末に反映されるのか

オフライン機能ではローカルにデータを保存するため、一度データを取得したあとはネットワークに繋がらなくてもアプリの表示が可能です。
そのため、何もしなければアプリ上からは これはいつのデータか というのはわからず、管理画面からデータの変更を行っても、それはいつ端末に反映されたのか、すでに反映された後なのか、といった判断が難しいです。
そのため、どういった操作をすれば必ずサーバーと通信しデータを更新できるのか、というのも決めておく必要があり、 storeLive では、画面遷移をするときには その画面に必要な情報自動更新に関するデータ(ライブ配信情報など) を更新する API を必ず叩くようにしました。

また、ネットワークに接続出来ていない場合には全画面上右上に ネットワークに接続していません という表示を出し、ひと目でネットワークに接続出来ているかどうかを判断できるようにしています。
ネットワーク接続判断は以下のように、接続状況が変わったら通知がくる LiveData を作り各画面で observe しています。

class NetworkCallbackLiveData(private val connectivityManager: ConnectivityManager) : LiveData<Boolean>() {

    private val networkCallback = object : ConnectivityManager.NetworkCallback() {
        override fun onAvailable(network: Network) {
            super.onAvailable(network)
            postValue(true)
        }

        override fun onUnavailable() {
            super.onUnavailable()
            postValue(false)
        }

        override fun onLost(network: Network) {
            super.onLost(network)
            postValue(false)
        }
    }

    override fun onActive() {
        super.onActive()
        val builder = NetworkRequest.Builder()
        connectivityManager.registerNetworkCallback(builder.build(), networkCallback)

        val network = connectivityManager.activeNetwork
        val networkCapabilities = connectivityManager.getNetworkCapabilities(network)
        value = network != null && networkCapabilities?.hasCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET) == true
    }

    override fun onInactive() {
        super.onInactive()
        connectivityManager.unregisterNetworkCallback(networkCallback)
    }
}

余談ですが、アプリ上ではネットワークに繋がっている判定で ネットワークに接続していません の表示がでないのですが、ルーターから先のネットワークに接続できないらしく Unable to resolve host "example.com": No address associated with hostname というエラーが出てしまってデータの更新が出来なくなる、という問題が発生していて困っています。

表示するコンテンツに掲出期間があるか

こちらはアプリやサービスの性質によりますがコンテンツには表示期間が存在することがあり、実際に storeLive では各動画に表示期間を設けています。 何度も言う通り、オフライン機能ではローカルにデータを保存するため、一度データを取得したあとはネットワークに繋がらなくてもコンテンツの表示が可能です。
逆に言うと、ネットワークに繋がらなければデータの更新は一生行われません。
そのため、表示期間が定められていると期間を過ぎてアプリ上に表示されてしまうとまずい状況になってしまう可能性があり、いつまで表示していいのかという情報も一緒に保存しておく必要があります。

Room ではデータを取得する時に where 句を書くことができるので、テーブルに表示期間のカラムを入れておくことでデータ取得時にフィルタリングできるので便利です。

@Entity(tableName = "movies")
data class Movie(
    @ColumnInfo(name = "id") val id: Int,
    @ColumnInfo(name = "movie_url") val movieUrl: String,
    @ColumnInfo(name = "starts_at") val startsAt: String,
    @ColumnInfo(name = "ends_at") val endsAt: String
)
@Dao
abstract class MovieDao {
    
    @Query("SELECT * FROM movies WHERE starts_at <= :date AND :date <= ends_at")
    abstract fun selectMovieList(date: String): DataSource.Factory<Int, MovieListItem>
}

1つ注意したいのが、この時に何も考えずに端末時刻を比較に使用してしまうと Android では端末時刻は容易に変更できるため表示期間のチェックとしては不十分な場合があります。
storeLive の場合では、アプリや端末自体の操作はユーザーには行われない想定なので、端末時刻は変わらないという前提の元比較に使用しています。もし、この方法を参考に表示期間のチェックをする場合には注意してください。

まとめ

storeLive の主にポーリング機能やオフライン機能の開発についてご紹介しました。
storeLive はまだまだサービスのあり方を模索している段階で機能追加や仕様変更などがたくさんあります。また大きな機能の実装がありましたご紹介したいともいます。

興味がある方いらっしゃいましたら、気軽にお声がけください。一緒に色々チャレンジしていきましょう。

info.cookpad.com

Amazon Athena を使ったセキュリティログ検索基盤の構築

こんにちは。技術部セキュリティグループの水谷(@m_mizutani )です。最近はFGOで一番好きな話がアニメ化され、毎週感涙に咽びながら視聴しています。

TL;DR

  • これまでセキュリティログ検索にGraylogを使っていたが、主に費用対効果の改善のため新しいセキュリティログ検索基盤を検討した
  • 自分たちの要件を整理し、Amazon Athenaを利用した独自のセキュリティログ検索基盤を構築した
  • まだ完全に移行はできていないが対象ログを1ヶ月間分(約7.5TB1)保持してもコストは1/10以下である3万円に収まる見込み

はじめに

セキュリティグループでは日頃、社内ネットワークやPC環境、クラウドサービスに関連するセキュリティアラートに対応するセキュリティ監視業務を継続しておこなっています。アラートに対応する時に頼りになるのはやはり様々なサービスやシステムのログで、そのアラートに関連したログを調べることにより誰が、いつ、どのようなコンテキストでそのアラートに関わったのかということを知ることができ、アラートのリスク評価において大きな役割を担っています。

一方でこのようなセキュリティ監視のためのログは大量になるため検索をできるようにするための基盤を整えるのも簡単ではありません。クックパッドのセキュリティグループではこれまでGraylogを使ってセキュリティ監視のためのログ検索基盤を構築・運用してきましたが、運用していく中でいくつかの課題が浮き彫りになったため、現在はAmazon Athenaを使ったログ検索基盤への移行を進めています。本記事では開発・移行を進めている新セキュリティログ検索基盤について解説します。

Graylogを利用する際の課題

過去に本ブログでも記事として紹介しましたが、クックパッドでは様々なログをセキュリティ監視のために取り込むことによって、アラート発生時に横断的な調査ができるようになっています。今日の企業における活動というのは1つの情報システムやサービスだけで完結するということはほとんどなく、複数のシステム・サービスをまたがって遂行されます。そのためアラートがあったときに全体像の把握をするためには複数種類のログを横断的に検索できるGraylogは非常に有用でした。しかし運用を続ける中で以下のような課題もでてきました。

  1. 弾力性があまり高くない:クックパッドで利用しているGraylogのバックエンドはAmazon Elasticsearch Serviceを利用しており、データノードをスケールイン・スケールアウトする機能が備わっており、データノードの負荷増大やディスク空き容量の減少に対して簡単に対応できます。ただ、スケールイン・スケールアウトは瞬時に実施できるわけではなく、経験則としては数分〜数十分の時間を要します。そのため、急なログ流量の増加のスムーズに対応するのは難しく、ログの取りこぼしなどが発生する可能性と向き合う必要があります。
  2. 使用頻度に対してコストが高い:弾力性の問題に対して余裕を持ったリソースを常時運用するという方法もありますが、今度はコストが問題になってきます。Graylogのバックエンドとして利用するElasticsearchはそれなりにリソースが必要となるため、利用するインスタンスの性能やディスクの容量を大きくとらなくてはなりません。現在クックパッドで運用しているGraylogでは一日あたり合計250GBのログを受け取り、ログの種類に応じて保管期間を最大1ヶ月、流量が多いものは1週間程度に収めることで、およそ月額40万円強、年間500万円ほどのコストをかけています。Graylogはインタラクティブかつ高速に検索が可能であるため、業務時間中は常にセキュリティ分析のために検索をしているような状況であればコストに見合ったメリットがあると考えられます。しかし、(幸いにも)クックパッド内で発生するセキュリティアラートは平均して日に2〜3回程度であるため、そのために年間500万円もかけるのはあまり効率的ではない、と考えています。
  3. Elasticsearchの運用が辛い:先述したとおりクックパッドで利用しているGraylogのバックエンドはAmazon Elasticsearch Serviceなので、自らでインスタンスを立てて運用するのに比べると比較的楽ですが、それでも負担はそれなりにあります。先述したとおりかなりの流量のログを投入するため、流量が想定を超えるとログの投入だけでなく検索にも影響がでます。また、様々な種類のログを投入するため、ログのスキーマや内容によって過大な負荷がかかるということがしばしばありました。そのため、それほど流量が多くないログでも投入に慎重にならざるをえず、またログの種類によってはそもそも取り込むのが難しいというようなこともありました。

これらの問題を解決するために、AWSのオブジェクトストレージであるS3をベースにした検索基盤が作れないか、ということをかれこれ1年くらい模索していました。

「セキュリティ監視」におけるログ検索の要件

具体的にどのようなアーキテクチャを採用したのかを説明するために、セキュリティ監視という業務をする上でのログ検索の要件を整理します。具体的には以下の5点になります。

  1. 複数種類のログに対してスキーマに依存しない検索ができる
  2. ニアリアルタイムで検索ができる
  3. 単語を識別した検索ができる
  4. ログの投入時に容易にかつ迅速にスケールアウト・スケールインが可能である
  5. 全体的な費用負担を減らす

Graylogはこれらの要件のうち1、2、3は十分に満たしていましたが、4、5の部分が期待する水準ではなかったと言えます。以下、各要件を詳しく解説します。

(要件1) 複数種類のログに対してスキーマに依存しない検索ができる

セキュリティ監視という業務の特性による最も重要な要件がこれだと考えています。通常、データ分析をしたい場合はデータごとに決まっているスキーマを理解し、そのスキーマに合わせたクエリを発行すると思います。一方でセキュリティ監視においては「あるキーワード(IPアドレス、ドメイン名、ユーザ名など)がどのフィールドに含まれるかを気にせず一気に検索する」というユースケースが圧倒的に多くなります。ログを絞り込んでいく上でフィールドを指定する必要はありますが、まず全文検索のような形でログを検索してどういった種類のログがどういった傾向で出現しているかという全体像を把握することが大切になってきます。

ログの種類が2〜3種類のみであればまだ人間がスキーマを覚えてクエリを作る事ができるかもしれませんが、数十種類になってくると人間がこれを覚えてクエリを作るのはあまり現実的ではありません。また、コード上でスキーマを管理してクエリを自動生成するというような方法も考えられますが、今度はメンテナンスが面倒になってきます(3rd partyサービスのログのスキーマがいきなり変わることはしばしばあります)そのため、ログのスキーマを全く気にせずに "10.1.2.3" というキーワードが含まれるログをバシッと検索できる仕組みが必要になります。

(要件2) ニアリアルタイムで検索ができる

セキュリティアラートが発生した場合、なるべく迅速に分析をできる状態になっていることが望ましいです。具体的なリアルタイム性(どのくらいの遅延を許容できるか)については一般的なManaged Security Service(MSS)のSLAが参考になります。いくつかのMSSではおよそ15分以内にアラートに関する第一報を報告するよう定められています。

これを基準とした場合、分析などに5〜10分ほどかかることを考えるとログが到着してからできれば5分以内、遅くても10分以内には分析ができる状態になっているのが望ましいと言えるでしょう。完全なリアルタイム性を実現する必要はありませんが、到着したログが逐次検索可能になるようなパイプラインは必要かと考えられます。

(要件3) 単語境界を識別した検索ができる

これは非常に地味ですが、意外と大切な要件だと考えています。検索する際に "10.1.2.3" を指定したら、 "110.1.2.3" や "10.1.2.30" を含む全てのログではなく、"10.1.2.3" のみが検索結果に出てくるようにするべきだと考えています。これはJSONなどの構造化データにおいて1つのフィールドに1つの単語しか入らない、ということが定まっているログについてはあまり悩む必要がありません。しかしsyslog由来のログやアプリケーションから出力されるログは自然言語のように記述されたログが頻出するため、ログ全文に対する単純な文字列マッチでは実現が難しいです。

単純な文字列マッチだけでなく、正規表現などを利用して前後の区切り文字などを排除するという方法もあることにはあります。ただその場合だと利用する人間がオリジナルのログの構文や構造などを強く意識してクエリを作成しなくてはならず、さらにトライ&エラーが発生するのを前提としてしまうため、あまり望ましくありません。さらに人間だけでなく別のシステムから自動で検索をかけたいと思った場合にトライ&エラーは期待できないため、一発で期待する検索ができることが望ましいです。

(要件4) ログの投入時に容易にかつ迅速にスケールアウト・スケールインが可能である

Graylog運用における課題1で述べたとおりですが、ログの種類が増えたり、ログの種類が同じでも流量が増えた際に速やかにスケールアウトし、負荷が低くなったらスケールインできる仕組みが求められます。場合によってはかなり頻繁にログの種類が追加されるということもあるため、その都度事前に流量や負荷の度合いを計算して準備をする、といった煩わしさがないような仕組みが望ましいと言えます。

(要件5) 全体的な費用負担を減らす

これもすでにGraylog運用における課題の2で述べたとおりですが、なるべく費用の負担が小さいに越したことはありません。特にセキュリティ監視という領域はいざというときに事業を守るために必要ではあるものの、お金をかけるほどビジネスが加速する、というものではないので費用を抑えられるに越したことはありません。

ログ検索のためのアプローチ

ここまでで説明させてもらった要件を満たす基盤を構築するため、以下の2つの方針を考えました。

(1) ログはAWS S3に保存して検索はAthenaを利用する

AWSのS3はオンラインストレージ(例えばEC2にアタッチされたEBSなど)と比較して非常に安価に巨大なデータを保持することができます。課題の部分でも説明させてもらったとおり、1日中高頻度に検索をするというシステムには向かないかもしれませんが、低頻度にアクセスするデータを保持しておくには最適な選択肢の一つだと考えられます。これによって大量のログデータを保持する場合でも全体的な費用負担を抑えることができます。

S3に保存したデータから必要となるログを検索するには、自分でSDKを使いS3からオブジェクトをダウンロードするようなコードを書く、S3 Selectを使う、Redshift Spectrumを使う、などいくつかの選択肢が用意されていますが、今回はクエリは低頻度であること+ある程度複雑なクエリが発生することなどを踏まえ、Amazon Athenaを利用することにしました。S3 selectは基本的に文字列のフィルタのみになるので、複雑な条件を指定するような検索には不向きになります。また、Redshift Spectrumは複雑なクエリを扱えますがクラスタを構築して常時稼働させるのが前提となってしまうため、低頻度にしか利用しないという今回のユースケースでは過剰に料金がかかってしまいます。Amazon Athenaは通常のSQLと同等のクエリが記述でき、さらにクエリによって読み取ったデータの量に応じて課金されるため、今回のユースケースに適していました。

(2) ログの投入時にLambdaを利用してインデックスを作成する

S3 + Athenaを使うことで費用面に関する問題は解決できますが、一方でログをそのまま保存していただけでは「スキーマに依存しない検索ができる」という課題を解決できません。AthenaはSQL形式のクエリを発行して指定したフィールドの値、あるいは集計結果を取得するため、そのままAthenaで横断的なキーワード検索をしようとすると全てのフィールドに対して検索をかけるという無茶が必要になってしまいます。

そこで、ログを保存用S3バケットに投入する際に元のログだけではなく、ログからインデックスを作成し、それをAthenaで検索できるようにします。全てのログを投入前に一度パースして辞書型に変換し、そのキーと値をもとにインデックスを作成します。例えばJSON形式で {"src_addr":"192.168.0.1", "dst_addr": "10.1.2.3"} というログがあったら、("src_addr", "192.168.0.1"), ("dst_addr", "10.1.2.3") という2つのレコードを作成し、それぞれにS3オブジェクトのIDや行番号などを付与します。これをインデックスとしてAthenaで扱うテーブルの1つとして作成します(Indexテーブル)。さらにログ本文とオブジェクトのID、行番号を含むテーブルも作成します(Messageテーブル)。イメージとして以下のようなテーブルをそれぞれ作成します。Object IDはただのカウンタであり、行番号はそのオブジェクト内で何番目にでてくるログなのかを示しています。最終的にはMessageテーブルの「ログメッセージ本文」が検索結果として返され、セキュリティ分析をする際に参照することになります。

f:id:mztnex:20191118230126p:plain

このようなテーブルを作成することで、全てのログから特定の値をもつフィールドを検索したり、あるいは特定のフィールドにのみ含まれる値をIndexテーブルから探すことができます。この結果をMessageテーブルと結合することで、特定の値を含むログをスキーマに依存せず探し出すことができるようになり、スキーマに依存しない検索が実現できます。

また、Lambdaを使ってインデックスの作成をすることで容易にスケールイン・アウトができるようになります。Lambdaは特別な設定をすることなく、タスクの増減に応じて同時実行数がシームレスに増えていきます。そのためログの流量が増えたとしても事前にスケールアウトするなどの準備も必要なく、AutoScalingが間に合わず処理の遅延やログの消失が発生する、というようなことも回避できます。また、ログ発生のイベントを受け取りLambdaで処理してS3に投入するというパイプラインによって、検索可能になるまでの遅延もおよそ1分程度に抑えることができ、セキュリティアラートが発生したあとログ検索ができなくて待たされるという問題が解決できます。

ログ検索基盤の設計と実装

f:id:mztnex:20191118230147p:plain

アプローチの節で説明したものを実装したアーキテクチャが上の図になります。実装名は Minerva(ミネルヴァ) と呼んでいます2。今回は原則としてサーバーレスで実装しており、点線内のS3バケット以外はCloudFormationでデプロイしています。これによって、バージョン変更時にstaging用の環境を作りたいと思ったら新しい設定ファイルを用意すればコマンド一発で真新しい環境を作ったり逆に不要になった環境を削除できます。Web UIについてもECSのspot instance上(参考記事)で動かしています。

このアーキテクチャは主に (1)ログの投入、(2)パーティションの作成、(3)ログのマージ、(4)ログの検索、という4つのパートに別れて構成されています3。それぞれ解説したいと思います。

f:id:mztnex:20191118230159p:plain

(パート1) ログの投入

ここまでの説明ではわかりやすさのために「ログが生成されたらそのままインデックスが生成される」というような書き方をしていましたが、実際にはログが本来保存されるS3に投入される → そのイベントをSNS+SQSで流してLambdaを起動する → Lambdaが対象のオブジェクトをダウンロード&インデックス作成をした後にMinerva用のS3バケットに投入し直す、ということをやっています。これはログの保全という観点から、まずはログをS3バケットに格納して保全できる状態にし、そこからいろいろな処理にパイプラインをつなげて可用性を高める、という考え方に基づくものです。これによってインデックス作成の処理が失敗したとしても、元のS3バケットを再度参照することで容易にリトライが可能になります。

先述の通り、処理にはLambdaを使うことでエンジニアが気にしなくてもスケールイン・アウトが可能となっています。実際には事故を防ぐために最大同時実行数を制限して運用していますが、最大同時実行数を多めに設定してもそれ自体に課金はされないので、計算した分だけの料金ですみます。

またログ検索時に本文中の単語を適切に識別して検索ができるように、Lambdaでパースをした際に単語を記号や空白と言った文字で分解したものをインデックスとして登録するようにしています。これはElasticsearchにおけるStandard Tokenizerに近い、トークン分割の独自実装を利用しています。例えば tani@cookpad.com というような文字列は tani, cookpad, com に分解してインデックスを作成し、検索する時にはこれらの単語を完全一致で全て含むログを検索するようにします。これによって tani@cookpad.com を検索したいときに mizutani@cookpad.com も引っかかってややこしい、という状況を回避できます。(もちろん、意図的に %tani を指定することで、mizutani@cookpad.com も引っかかるようにできます)

(パート2) パーティションの作成

次のパートはAthenaのテーブルのパーティション作成になります。Athenaは読み込んだS3オブジェクトのサイズの合計のみで課金が決まるため、少しでも不要な読み込みを減らすのがパフォーマンスおよびコストにとって重要になってきます。この読み取りサイズを減らす一つの方法として有効なのがパーティション作成です。S3のパスのプレフィックスをパーティションとして登録しておき、WHERE節でそのパーティションを指定するように条件を記述すれば、他のプレフィックスが読み込まれなくなり、読み込むデータサイズを大幅に削減できます。

例えばMinervaでは以下のようなフォーマットで変換したオブジェクトを保存しています。保存先のバケット名が ***-bucket です。

s3://***-bucket/some-prefix/indices/dt=2019-11-01-05/some-bucket/some-key.parquet

s3://***-bucket/some-prefix/indices まではただのプレフィックスですが、dt=2019-11-01-05 がパーティションを示すためのパスの一部になります。このパスの dt=2019-11-01-05(2019年11月1日5時の意味)をAthenaに登録しておくと、 WHERE dt = '2019-11-01-05' とすることによって、上記のプレフィックスを持つオブジェクトしかデータが読み込まれなくなります。これは '2019-11-01-04' <= dt AND dt <= '2019-11-01-06' と指定すると、4時から7時前(6時代)までを検索対象とできます。まず短い時間の範囲から検索したい単語などを探し、検索したい単語が見つからなかったりより深く対象の行動を追いたいということがあれば、その後必要に応じてより長い時間の範囲を検索していくことで、必要最低限のデータ読み込みですむ(=料金も安くてすむ)という使い方を想定しています。

(パート3) ログのマージ

IndexテーブルやMessageテーブルに投入されたオブジェクトはそのままでも検索はできるのですが、検索のパフォーマンス(クエリ速度)が悪化するという問題があります。Athenaのパフォーマンスチューニングのベストプラクティスによると128MB以下のファイルが多いとオーバーヘッドがでてしまうとあるのですが、現在のクックパッド内の環境だとログとして保存する際にある程度の大きさになるオブジェクトもあれば、完全に細切れで保存されるオブジェクトもあり、特に細切れのオブジェクトはパフォーマンスに影響を与えてしまう可能性が高いです。

そのため、一定時間ごと(現在だと1時間毎)にCloudWatch Eventを発火させて、マージすべきオブジェクトの一覧作成&どのようにマージすべきかの戦略をlistParquetというLambdaが判断し、mergeParquetというLambdaが複数のオブジェクトをマージするという作業をしています。これによって1日数十万単位で作成されていたオブジェクトを1000個程度に抑えることができるようになりました。

(パート4) ログの検索

最後は実際のログの検索です。ログの検索は当然Athenaに直接SQLを送っても結果を得られるのですが、「(1)ログの投入」パートで説明したとおり、検索対象を単語で分割するようなやや複雑なクエリを使う必要があります。これを利用する側が意識せずによりシンプルなAPIとして使えるようにAPI gatewayを通してリクエストできるようにしています。これは、この検索基盤をエンジニアがWeb UIを通して使うだけではなく、将来的に他の社内セキュリティシステムとも連動させたいと考えており、その際にSQL文構築の知識を分散させずにMinervaの中に閉じ込めておきたい、という意図もあります。

Web UIについてはまだ開発中ではありますが、現在は以下のようなシンプルなインターフェイスで動かせるよう開発を続けています。

f:id:mztnex:20191118230849p:plain

本アーキテクチャによる効果

先程の述べたとおりまだ開発中のものもありMinervaに完全に移行できたわけではないのですが、当初に考えていたコスト削減については大きな効果が期待できそうです。前述したとおり、現在クックパッドのGraylogはログの保存期間を流量に応じて1週間〜1ヶ月と調整しながら使っていて月に40万円以上かかっていますが、現在の開発しながら使っているMinervaのコストは全てのログを1ヶ月間(未圧縮で約250GB/日 x 30日=約7.5TB)保持しても月あたり3万円以下に収まっています。これによって当初の目的であったコストの抑制には大きく貢献できる見通しがたちました。

今後の課題

コストの面ではかなり大きな成果をあげられそうなMinervaですが、まだ開発における課題は以下のようなものが残っており引き続き開発を進めていきたいと考えています。

  • インデックス作成に関する計算パフォーマンスを向上させる:このシステムはインデックス作成が肝になっており、実はかかっている料金の半分以上がインデックス作成+マージのための計算コストになっています。現在は列指向ファイルの恩恵をうけるためにParquet形式を利用していますが、そのためのオブジェクト生成のためのCPUおよびメモリが必要になっており、これがコストに影響しています。そのため、より効率の良い方法でparquetに変換できるようにすることで、さらにコストを抑制できる可能性があり、コストを抑制できればさらに多くのログを長期間保存しやすくなるため、引き続き改良を続けていきたいと考えています。
  • 検索時のパフォーマンスを向上させる:当初から「検索時のクエリがGraylogより遅くなることは許容する」という前提で開発をしていたため、クエリ結果が返ってくるのが遅いという点については許容するものの、やはり早く応答が返ってくるに越したことはありません。現在だとやや複雑なクエリを投げているせいもあり、およそ20秒程度クエリに時間がかかってしまっています。一度結果が取得できればその結果からの絞り込みなどはできるようにしていますが、なるべくインタラクティブな検索を実現したいと考えています。そのためデータ構造やAthenaの使い方について、より検討を進めたいと考えています。

最後に

クックパッドではこうした「エンジニアリングでセキュリティの問題を解決する」ことを一緒にやっていける仲間を募集しています。興味のある方はぜひこちらをご参照いただくか、ご質問などあれば水谷(@m_mizutani)などまでお声がけください!


  1. 7.5TBという数字はあくまでもとのログの非圧縮状態でのデータサイズです。実際に保存される際はトークン分割などの変換や圧縮がかけられるので単純計算はできませんが、実測値でおよそ1/3〜1/4程度になります。

  2. これは決して厨二病を発症したわけではなく、Athenaの元々の意味であるギリシア神話の女神アテナの別名がミネルヴァとのことだったので、Athenaを少し変わった使い方をすることからこのような名前にしました。

  3. 今回のセキュリティログ検索基盤は弊社で別途運用されているデータ活用基盤のprismを大いに参考にさせてもらっています。詳しくはこちらの記事もご参照ください

サービス特性にあった検索システムの設計戦略

こんにちは!研究開発部ソフトウェアエンジニアの林田千瑛(@chie8842)です。あまりたくさん飲めないけど日本酒が好きです。 クックパッドが提供するサービスの検索や推薦機能の構築・改善を行っています。

本稿では、クックパッド本体の検索改善や推薦システム構築の傍らで、新規サービスであるクックパッドマート向けの検索システムをつくったので、その際の設計や精度改善の工夫について書きます。

新規サービスクックパッドマートと検索

クックパッドマートは、生鮮食品に特化したECサービスで、ステーションと呼ばれる場所に購入した食品を届けてくれるという特徴をもっています。2018年夏にサービス開始して以来順調にユーザ数を伸ばしています。中でも商品検索機能は、クックパッドマートの追加機能として9月にリリースしました。

検索システムの要件

プロダクトチームの当初の要件は以下のとおりでした。

  • まずは 1ヶ月で リリースしたい
  • 最初は商品検索機能を提供したいが、その後GISデータを用いた食品を受け取るステーション検索などが必要になる可能性がある
  • 商品検索は、UI/UXの要件上、絞り込み検索などではなく単純なキーワード検索がしたい
  • 商品インデックスのリアルタイム更新はあまり重要でない

また、データを眺めたり実際にサービスを使ったりする中で以下のようなことを予想できました。

  • インデックスサイズについて

    • 現状のインデックスサイズはそれほど大きくない(現在1G程度)
    • サービスの成長率が高く、将来的に商品数が増えることでインデックス化するデータは格段に増える可能性はある。しかし、配送機能をもつというサービスの特性上、配送エリアごとにインデックスを分けるといったことでインデックスサイズの上限は抑えられる
  • 検索精度のチューニングについて

    • 検索の使われ方と商品数を考慮すると、適合率よりユーザの目的にヒットしうる商品の再現率を高めることを重要視した方がよさそう
    • サービス側のキャンペーンなどの施策の追加が想定される。そのため今後インデックスのスキーマ変更やクエリのチューニングを行いやすいようにしたい
  • プロダクトチームの体制について

    • プロダクトチームはスピードを重視しており、メンテナンスコストは低い方がいい

上記を考慮して、最初のリリースに向けては以下の設計方針ですすめることにしました。

  • 検索インデックスやクエリは一旦今ある情報をもとに設計して一部のユーザにリリースし、実際の使い勝手を見ながらチューニングしつつ利用者を広げる(オフラインテストなどはあまりかっちりやらない)
  • 基盤設計は慎重に行い、今後のシステムのスケールやメンテナンスを行いやすいようにする

検索基盤の設計

検索サーバにはElasticsearchを利用します。 クックパッドはインフラ環境にAWSを利用しており、その上にElasticsearchサーバをデプロイする方法としては以下の3つが考えられます。

  1. Amazon Elasticsearch Service(以降AES)を利用する方法
  2. EC2上に構築する方法
  3. ECSクラスタ上に構築する方法

1.のAmazon Elasticsearch Serviceを利用するか2.3.の方法で自前でElasticsearchを構築するかという判断がまずあります。

AESはクックパッドの他のサービスでも一部取り入れられているという背景もあり、最初はAESを使うのがよいのではないかという声がありました。しかし、マネージドサービスは、システム管理の負荷を軽減できるというメリットがある一方で、そのマネージドサービスが提供する機能は大なり小なり限定されることを考慮する必要があります。よってマネージドサービスで提供される機能がサービスが必要とする機能範囲をカバーできるかどうかを見定める必要があります。

今回の要件にAESが合致するかどうかは検証を行い、結果的に利用しないことに決めました。 ボトルネックとなったのは以下の点です。

  • 既存インデックス上のAnalyzerの設定変更ができない(設定の異なる新規インデックスを作成した上でaliasを切り替えることで代用はできるがオペレーションが煩雑化する1
  • ユーザ辞書やシノニム(同義語)辞書をファイルで指定できない
  • AESはサポートされるプラグインが限られており、中でも日本語のTokenizerとしてkuromoji_tokenizerは利用できるが、素のElasticsearchであれば利用できるNEologd, UniDicといった別の辞書を用いるTokenizerを提供するプラグインが利用できない2
  • 出力できるログが限定的である。(エラーログとスローログしか取得できない)

クックパッドマートの商品検索は、例えば「じゃがいも」と入力したら「メークイン」や「ジャガイモ」でマッチする商品も検索結果に出したいといったように、日本語の辞書やシノニムといったAnalyzerのチューニングが検索精度に大きく影響するタイプです。 日本語を使わない場合やTwitterやInstagramで見られるようなシノニムの重要度が高くないタイプのキーワード検索、geolocation検索であればAESで提供される機能で十分な場合があると思いますが、今回の要件には合致しないという判断になりました。

さらに残る 2.のEC2と3.のECSについて検討を行いました。Elasticsearchはインデックスを保存するという意味である種のDBの機能を持ちますが、DBは基本的にECSなどのコンテナオーケストレーション環境にデプロイする例はあまりないと思います。なぜかというと、Elasticsearchやその他のDBはクラスタネットワーク内でノードディスカバリを行い、データの永続化やレプリケーションを行うことでデータの可用性とスケーラビリティ担保するクラスタ構成機能を持ちますが、エフェメラルな環境としてアプリのデプロイを行うことを目的として発展したコンテナ環境はこうしたことを前提として作られていないからです。(できないとは言っていないです。)

上記を考慮すると、基本的にはデプロイ先としてEC2を選ぶべきです。しかし、今回のケースは、以下のことがわかっていました。

  • (将来を考慮しても)1サーバ上でデータを持てるくらいインデックスデータが小さく保てる
  • インデックスのリアルタイムな更新が必要ないため、障害時のデータ保証を考慮したデータの永続化が比較的簡素で済む

この場合、複数台によるクラスタを組まずとも、Elasticsearchを単なる1ノードの検索アプリケーションと見てデータは外部ストレージ上に永続化することで、コンテナオーケストレーション環境上で他のRailsなどのアプリケーションと同じように可用性とリクエストに対するスケーラビリティを担保したデプロイができます。

EC2上にデプロイすることになると、物理サーバによるクラスタのメンテナンスコストがかかってしまうため、S3上でインデックスデータを管理し、ECS上にデプロイすることにしました。

f:id:chie8842:20191118113708p:plain

クックパッドマートは、他機能もECS上で動作しています。検索サーバも管理を同じ環境にすることでDockerのデプロイに慣れているメンバであれば検索サーバのデプロイ学習コストを小さくとどめることができました。

精度のチューニング

上述したとおり、今回の検索サーバは、エリア内の生鮮食品の検索という特性からそもそも適合するアイテム数が少なく、それらをすべて検索結果に出すことが大切になります。一方で明らかに関係のないアイテムが混じりすぎるのも検索体験的によくありません。今回のキーワード検索においてこのバランスを保つために開発当初から今までに行った主な改善を書いておきます。

商品検索時に利用するテキストの選択

検索のインデックスはサービスのデータベース上にある商品データから作成し、それに対して検索クエリを実行します。検索対象とすべきテキストデータとしては、主に以下がありました。

  • 商品タイトル
  • 商品の説明
  • カテゴリ情報
  • ショップの名前

当初は重みをつけた上で、上記のデータをすべて使うことを考えましたが、現在は商品検索には商品タイトル、カテゴリ情報、ショップ名を利用しています。 理由は以下のとおりです。

商品の説明を利用しないことにした理由

「商品の説明」のデータでは、例えば野菜の商品の説明で、「豚肉と炒めるとおいしいです」といった文章が出てくることがあります。こうした場合、「豚肉」と検索したときにこの野菜の商品がヒットしてしまいます。このような要因によって検索体験が下がる影響が大きいだろうという判断をしました。

カテゴリ情報を利用する理由

クックパッドマートには、「とり皮」、「砂肝」など、焼き鳥の串の商品があります。(ちなみにおいしかったので買ってみてください。) しかし商品タイトルのみに対して検索を行う場合、「鶏肉」というクエリに対してこれらの焼き鳥はヒットしません。カテゴリ情報を利用すると、これらの焼き鳥の商品に対して、「鶏肉」や「肉」といったテキスト情報を利用することができるようになり、「鶏肉」の検索結果に焼き鳥の商品をヒットさせることができるようになりました。

ショップの名前を利用する理由

ショップの名前はリリース時には商品検索のためのインデックスとしては入れていませんでした。しかしながらリリース後のユーザのクエリをみると、ショップ名で商品を検索しているユーザが一定数いることが判明しました。そこでショップ名からも商品を検索できるようにしました。

Analyzerのチューニング

検索結果の再現率を上げるためには、Analyzerのチューニングも重要です。チューニングでは、主に以下のことを行いました。

Tokenizerにkuromoji-ipadic-neologdとNGram Tokenizerを併用する

Elasticsearchにおける日本語に特化したTokenizerには、Elasticsearch本体に組み込まれているkuromoji_tokenizer(辞書はIPADic)の他に、形態素解析エンジンには同じkuromojiを利用してNEologd、UniDicといった別の辞書を利用するものや、最近作られたSudachiという形態素解析エンジンを利用するものがあります。また、辞書にない単語をとってくる方法として、NGram Tokenizerなどがあります。今回は、語彙数が多いkuromoji_ipadic_neologd TokenizerとNGram Tokenizerを重み付けして併用することで、できるだけ多く適合するアイテムをヒットさせることを目指しました。 なお、辞書としてUniDicを利用した方が細かい単位で単語を取得できる可能性もありますが、今回のようなヒットするアイテム数が少ないケースでは、検索結果のランク順くらいしか違いがでない、かつランク順がそれほど重要でないだろうと予想されるため試していません。

シノニムの重要性

上で例を出したとおり、じゃがいもを購入したいとき、検索結果には「メークイン」や「男爵いも」も出した方がいいでしょう。 このようにクックパッドマートでは、シノニムを考慮した検索が重要でした。 クックパッド本体で利用している辞書資源から必要なデータを取得できたので、最初のリリース後にシノニム情報を追加しました。

チューニング結果

クックパッドマートの検索精度は、リリース後にプロダクト側からの意見をヒアリングしつつ改善しました。 サービス自体がまだ若く、A/Bテストを行うような基盤はないため、別期間での比較になってしまいますが、リリース直後(チューニング前)では検索画面から目的の商品に出会えた確率(検索画面から商品ページへの遷移率と検索画面からカートイン率の合計)が 61% だったのが、チューニング後には 84% まで増加することができました。

今後の課題

検索システムは一度作って終わりというわけにはいきません。今後もユーザやプロダクトの成長に合わせて精度の改善を行う必要があります。また、今後は今の商品検索以外の検索機能が必要となる可能性もあります。 サービスの成長にあわせて検索の精度改善や機能追加を進めていけるとよいと思います。

最後に

検索に限らず、よいシステムをつくるには必要な機能を決めるためにUI・UXを考えるデザイナ、プロダクトオーナの協力が大切です。今回の検索システム構築においては、ryo-katsumaをはじめとしたクックパッドマートのプロダクトチームが明確な検索ストーリーを提示してくれ、また積極的に検索システムを使って実際に使って課題や要望を伝えてくれたため、スムーズに要件を固めて改善につなげることができました。 また、検索精度のチューニングについては、同じ研究開発部の@takahi-iが親身にレビューをしてくれました。

このようにサービスのドメイン知識と技術知識をもつメンバ同士で連携できたことで、スピード感をもってよい検索システムを作ることができたと思います。