会員事業部の山下(@tomorrowkey)です。
RxJavaが流行ってますね。最近Android版クックパッドでもRxJavaが導入されました。この記事は私がRxJavaを使うにあたって検証用のテストコードを書いたものをベースに、RxJavaの挙動をみなさんに紹介したいと思います。
目次
- リスト操作でおさらいする基本的なRxJavaの使い方
- Observable
- Operator
- Observer / Subscribe
- 実行順序を確認するサンプルプログラム
- 7つのサンプルプログラム
リスト操作でおさらいする基本的なRxJavaの使い方
RxJavaはAPIアクセスやイベントトリガーやリスト処理などを多岐にわたる処理に使うことができます。このエントリでは初学者に一番分かりやすいリストの処理を例に解説します。
これは1から10までの値を渡し、偶数だけにフィルタリングしたうえ、値を10倍にして、ログ出力するというプログラムです。
Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}) // 1 .filter(new Func1<Integer, Boolean>() { // 2 @Override public Boolean call(Integer i) { return (i % 2) == 0; } }) .map(new Func1<Integer, Integer>() { // 2 @Override public Integer call(Integer i) { return i * 10; } }) .subscribe(new Observer<Integer>() { // 3 @Override public void onNext(Integer integer) { Log.d("Hoge", integer.toString()); } @Override public void onCompleted() { Log.d("Hoge", "completed"); } @Override public void onError(Throwable e) { } });
実行してみると以下のように出力されます。
20 40 60 80 100 completed
RxJavaの大まかな流れを説明すると以下のようになります。
- Observableを作る
- filterやmapなどのOperatorを使って値を加工する
- Observerを使ってObservableをsubscribeする
1つずつ解説します。
1.Observableを作る
データの元となるものをデータソースといいます。これはAPIのレスポンスだったり、ディスクに保存されているファイルだったり、単純にメモリ上の変数だったりします。
RxJavaではまずデータソースを提供するObservableを作る必要があります。
データソースを提供するObservableを作成する為のstaticメソッドがいくつか定義されています。今回は説明しませんが、データソースを提供するObservableは自作することができ、ファイルアクセスやAPIアクセスする場合は、自作する必要があります。
Observableを作るメソッドをいくつか紹介しましょう。
from
Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
サンプルプログラムで使われているメソッドです。 配列を渡すことで、各要素をOperatorに渡します。 配列の他にListやIterableなどのOverloadがあります。
just
Observable.just(1, 5, 6)
10個までのオブジェクトをOperatorに渡します。 可変長引数ではないので、それ以上のオブジェクトを渡すことはできません。 配列やListではないオブジェクトをObservable化したい場合に使います。
range
Observable.range(1, 10)
startからcountまでのint値をOperatorに渡します。 サンプルプログラムではわざわざIntegerの配列を作っていましたが、これを使うことでより簡潔に書くことができます。
2.Operatorを使って値を加工する
Observableで作成されたデータを1つずつ受け取り、加工したり、フィルタリングしたり、他のObservableとマージしたりなどします。 いくつかのOperatorを紹介します。
filter
Observable.filter(new Func1<Ingeger, Boolean>() { @Override public Boolean call(Integer i) { return (i % 2) == 0; } })
名前の通り値をフィルタリングするOperatorです。trueを返せばその値を採用し、falseを返せばその値を取り除きます。 このコードではリストの値が偶数だけになるようにフィルタリングしています。
map
Observable.map(new Func1<Integer, Integer>() { @Override public Integer call(Integer i) { return i * 10; } })
受け取った値を違う値に変換するOperatorです。 このサンプルプログラムではIntegerの値を10倍にしています。 値の変換だけではなく、例えばIntegerからStringにしたりなど、違う型に変換することもできます。
3.Observerを使ってObservableをsubscribeする
ObserverではOperatorで加工した値を受け取ります。
onNext()は1つの値の処理が終わる度に実行されます。 onCompleted()ではすべての値の処理が終わったら実行されます。 onError()は一連の流れの中で例外が発生した時に呼ばれます。
Observable.subscribe(new Observer<Integer>() { @Override public void onNext(Integer integer) { Log.d("Hoge", integer.toString()); } @Override public void onCompleted() { Log.d("Hoge", "completed"); } @Override public void onError(Throwable e) { } })
Observable.subscribe()を実行することで、Observableがデータソースの準備をしてOperatorにデータを渡します。 リストをデータソースとして指定した場合、すぐにデータを提供できるのですぐに実行されます。 サンプルプログラムを分解して、解説すると
Observable observable = Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}) .filter(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer i) { return (i % 2) == 0; } }); // 1. まだobservableは実行されない observable = observable.filter(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer i) { return (i % 2) == 0; } }) // 2. まだobservableは実行されない observable.subscribe(new Observer<Integer>() { @Override public void onNext(Integer integer) { } @Override public void onCompleted() { } @Override public void onError(Throwable e) { } }); // 3. 実行される
1、2ではsubscribeするためのものを作っている段階なので実行されませんが、3で初めてsubscribeされるので一連の流れが実行されます。
サンプルプログラムをいくつか見よう
前節でRxJavaの基本的な動きはなんとなく分かったんじゃないかなと思います。
ここからは実行順序に関する7つのサンプルプログラムを提示します。変数の値がどのようになるか想像してみましょう。
ちなみにこれらのサンプルプログラムはこちらで公開しています。 https://github.com/tomorrowkey/RxAndroidTest
サンプルプログラム1
まずは簡単なサンプルプログラムです。
sb
の内容はどうなるでしょうか
final StringBuilder sb = new StringBuilder(); Observable.just(1) .map(new Func1<Integer, Integer>() { @Override public Integer call(Integer i) { sb.append("1"); return i; } }) .subscribe(new Observer<Integer>() { @Override public void onNext(Integer integer) { sb.append("2"); } @Override public void onCompleted() { sb.append("3"); } @Override public void onError(Throwable e) { sb.append("4"); } }); sb.append("5");
テストコード1
assertThat(sb.toString(), is("1235"));
前述の説明を理解できていれば簡単に分かったと思います。
Integerの値1
をデータソースにリスト処理します。mapのOperatorが1度だけ実行され、ObserverのonNext()、onCompleted()の流れで実行されます。
サンプルプログラム2
今度は実行順序を確認するためにsleepを入れてみましょう。
sb
の内容はどうなるでしょうか
final CountDownLatch latch = new CountDownLatch(1); final StringBuilder sb = new StringBuilder(); Observable.just(1) .map(new Func1<Integer, Integer>() { @Override public Integer call(Integer i) { sleep(500); sb.append("1"); return i; } }) .subscribe(new Observer<Integer>() { @Override public void onNext(Integer integer) { sb.append("2"); } @Override public void onCompleted() { sb.append("3"); latch.countDown(); } @Override public void onError(Throwable e) { sb.append("4"); } }); sb.append("5"); latch.await(10, TimeUnit.SECONDS);
テストコード2
assertThat(sb.toString(), is("1235"));
特に何も指定をしなければ、subscribeを実行した時点で実行した時のスレッドを使って最後まで実行されるため、サンプルプログラム1と同じ挙動になります。
subscribeOn
RxJavaの一連の処理を実行するスレッドを指定したければ、subscribeOnを使います。
Observable.subscribeOn(Scheduler)
例えばAndroidのmainスレッドを使いたければ以下の様に指定します。
Observable.subscribeOn(AndroidSchedulers.mainThread());
mainではない別スレッドを使いたければ以下のように指定します。
Observable.subscribeOn(Schedulers.newThread());
リストの要素一つ一つに対してネットワーク処理を行いたい場合や、ディスクIOをしたい場合など、Operatorで重たい処理をしたい時には、スレッドを指定できるので便利ですね。
サンプルプログラム3
subscribeOnを使った際の実行順序を確認してみましょう。
sb
の内容はどうなるでしょうか
final CountDownLatch latch = new CountDownLatch(1); final StringBuilder sb = new StringBuilder(); Observable.just(1) .subscribeOn(Schedulers.newThread()) .map(new Func1<Integer, Integer>() { @Override public Integer call(Integer i) { sleep(500); sb.append("1"); return i; } }) .subscribe(new Observer<Integer>() { @Override public void onNext(Integer integer) { sb.append("2"); } @Override public void onCompleted() { sb.append("3"); latch.countDown(); } @Override public void onError(Throwable e) { sb.append("4"); } }); sb.append("5"); latch.await(10, TimeUnit.SECONDS);
テストコード3
assertThat(sb.toString(), is("5123"));
Operatorはmainではないスレッドで500msec待ってから実行されるので、先にmainスレッドが実行されました。
サンプルプログラム4
さらにどの部分がどのスレッドで実行されるか確認しましょう。
list
の内容はどうなるでしょうか。
final CountDownLatch latch = new CountDownLatch(1); final List<String> list = new ArrayList<>(); Observable.just(1) .subscribeOn(Schedulers.newThread()) .map(new Func1<Integer, Integer>() { @Override public Integer call(Integer i) { list.add("1:" + Thread.currentThread().getName()); return i; } }) .subscribe(new Observer<Integer>() { @Override public void onNext(Integer integer) { list.add("2:" + Thread.currentThread().getName()); } @Override public void onCompleted() { list.add("3:" + Thread.currentThread().getName()); latch.countDown(); } @Override public void onError(Throwable e) { list.add("4:" + Thread.currentThread().getName()); } }); latch.await(10, TimeUnit.SECONDS);
テストコード4
assertThat(list.size(), is(3)); assertThat(list.get(0), is(matches("1:RxNewThreadScheduler-\\d"))); assertThat(list.get(1), is(matches("2:RxNewThreadScheduler-\\d"))); assertThat(list.get(2), is(matches("3:RxNewThreadScheduler-\\d")));
subscribeOnでのスレッド指定はObserverまで影響受けるので、OperatorだけではなくObserverのメソッドも別スレッドで実行されました。
AndroidではUIスレッド以外でViewの更新ができないので、このまま通信処理などのために利用することはできませんね。
ObserveOn
observeOnを使えばObserverが指定されたメソッドで実行されます。
subscribeOnで別スレッドを指定し、Operatorで重たい処理を実行して、Observerで処理した内容をUIに反映したいといった時にobserveOnを使います。
サンプルプログラム5
observeOnを使った時の実行順序とスレッドを確認しましょう。
list
の内容はどうなるでしょうか
final CountDownLatch latch = new CountDownLatch(1); final List<String> list = new ArrayList<>(); Observable.just(1) .subscribeOn(Schedulers.newThread()) .map(new Func1<Integer, Integer>() { @Override public Integer call(Integer i) { list.add("1:" + Thread.currentThread().getName()); return i; } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<Integer>() { @Override public void onNext(Integer integer) { list.add("2:" + Thread.currentThread().getName()); } @Override public void onCompleted() { list.add("3:" + Thread.currentThread().getName()); latch.countDown(); } @Override public void onError(Throwable e) { list.add("4:" + Thread.currentThread().getName()); } }); latch.await(10, TimeUnit.SECONDS);
テストコード5
assertThat(list.size(), is(3)); assertThat(list.get(0), is(matches("1:RxNewThreadScheduler-\\d+"))); assertThat(list.get(1), is("2:main")); assertThat(list.get(2), is("3:main"));
Operatorは別スレッドで実行され、Observerはmainスレッドで実行されるようになりました。 AsyncTaskのように見えてきませんか?だいぶRxJavaへ親しみがもてるようになってきたかと思います。
サンプルプログラム6
もうすこしsubscribeOnの挙動を見てみましょう。
2回subscribeOnを実行した場合にどうなるでしょうか。
final CountDownLatch latch = new CountDownLatch(1); final List<String> list = new ArrayList<>(); Observable.just(1) .subscribeOn(AndroidSchedulers.mainThread()) .map(new Func1<Integer, Integer>() { @Override public Integer call(Integer i) { list.add("1:" + Thread.currentThread().getName()); return i; } }) .subscribeOn(Schedulers.newThread()) .map(new Func1<Integer, Integer>() { @Override public Integer call(Integer i) { list.add("2:" + Thread.currentThread().getName()); return i; } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<Integer>() { @Override public void onNext(Integer integer) { list.add("3:" + Thread.currentThread().getName()); } @Override public void onCompleted() { list.add("4:" + Thread.currentThread().getName()); latch.countDown(); } @Override public void onError(Throwable e) { list.add("5:" + Thread.currentThread().getName()); } }); latch.await(10, TimeUnit.SECONDS);
テストコード6
assertThat(list.size(), is(4)); assertThat(list.get(0), is("1:main")); assertThat(list.get(1), is("2:main")); assertThat(list.get(2), is("3:main")); assertThat(list.get(3), is("4:main"));
subscribeOnはOperatorが実行されるスレッドを指定するもので、実行した時点でスレッドを変えるような効果はありません。 よって、先に実行されたmainスレッドの指定が採用され、すべてmainスレッドで実行されました。
サンプルプログラム7
さきほどのサンプルプログラムでだいたいsubscribeOnの挙動は分かったと思いますが、もう1つだけ確認してみましょう。 途中からsubscribeOnを指定した場合どうなるでしょうか。
final CountDownLatch latch = new CountDownLatch(1); final List<String> list = new ArrayList<>(); Observable.just(1) .map(new Func1<Integer, Integer>() { @Override public Integer call(Integer i) { list.add("1:" + Thread.currentThread().getName()); return i; } }) .subscribeOn(Schedulers.newThread()) .map(new Func1<Integer, Integer>() { @Override public Integer call(Integer i) { list.add("2:" + Thread.currentThread().getName()); return i; } }) .subscribe(new Observer<Integer>() { @Override public void onNext(Integer integer) { list.add("3:" + Thread.currentThread().getName()); } @Override public void onCompleted() { list.add("4:" + Thread.currentThread().getName()); latch.countDown(); } @Override public void onError(Throwable e) { list.add("5:" + Thread.currentThread().getName()); } }); latch.await(10, TimeUnit.SECONDS);
テストコード7
assertThat(list.size(), is(4)); assertThat(list.get(0), is(matches("1:RxNewThreadScheduler-\\d+"))); assertThat(list.get(1), is(matches("2:RxNewThreadScheduler-\\d+"))); assertThat(list.get(2), is(matches("3:RxNewThreadScheduler-\\d+"))); assertThat(list.get(3), is(matches("4:RxNewThreadScheduler-\\d+")));
途中でsubscribeOnを指定したとしても最初のOperatorからスレッドが変わります。
でも途中にsubscribeOnを書くと可読性が落ちるので最初に書くと分かりやすいでしょう。
最後に
リスト操作を通じてRxJavaの挙動を確認しました。
今回は実行順序を確認するために最低限の説明しかしませんでしたが、その他にRxJavaをAndroid向けに機能追加したRxAndroidがあったり、オリジナルのOperatorを提供する拡張ライブラリもあります。RxJavaを使いこなせるようになった!と言えるようになるためにはまだまだやることは多そうです。
RxJavaはJavaにはない独自の世界があり、いままでJavaやAndroidのコードを書いてきた人にとってはどこかとっつきにくいところがあるように思いますが、このエントリを通してすこしでも理解が深まればと思います。
クックパッドではRxJavaを始めその他最新ライブラリを駆使して、スピーディに高品質なモバイルアプリを作っていくモバイルエンジニアを募集しています!興味がある方はぜひご応募ください。
iOS/Android アプリエンジニア | クックパッド株式会社 採用情報