これはRxJava Advent Calendar 2016の第12日目の記事です。

RxJavaは非同期処理を扱うライブラリですが、それ故にRxJavaのからむ単体テストは一筋縄でいかないところがあります。この記事と次の記事では、RxJavaでの非同期処理テスト特有の問題と、その解決方法を説明します。なお、基本的に RxJava 1.x を対象としていますが、必要に応じて RxJava 2.x についても触れます。

RxJava logo image

非同期処理の単体テストの難しさ

非同期処理の単体テストは一般に難しいものです。試しに以下のコードを実行してみましょう。

  @Test public void testDoSomething() {
    new AsyncService().doSomething()
        .subscribe(s -> assertEquals("success", s));
  }

  private static class AsyncService {
    public Observable<String> doSomething() {
      return Observable.just("success").delay(1, TimeUnit.SECONDS);
    }
  }

結果はどうなるでしょうか?テストは目出たくパスします🎉 しかし、実際にはこの assertEquals は実行されないため、このテストは無意味です👹 試しに "false" を返すようにしてみましょう。残念ながらテストは成功してしまいます。

これは非同期処理が別スレッドで行われるため、メインスレッドはそれを待たずに終了してしまうためです。

このようなとき、1番簡単な方法は Thread#sleep(long) でしばらく待つことですが、では、どれくらい待ったら良いでしょうか。数秒、あるいは数十秒でしょうか?適当な値を入れても良いですが、無駄に待てば、それだけテストの実行時間がかかり、CIの時間も長くなります。できたら、完了したら即座に assert したいところです。

TestSubscriberとtest()メソッド

実はRxJavaを使っていれば、上に挙げた例はそれほど難しくありません。なぜなら、RxJavaには blocking() という非同期処理を同期処理に変換するメソッドがあるからです。

さきほどのテストは以下のようになります。

    new AsyncService().doSomething()
        .toBlocking()
        .subscribe(s -> assertEquals("success", s));

これで万事解決でしょうか?いえ、より細かいテスト、たとえば onCompleted が呼ばれたかどうか、エラーを返すかどうかといったテストを書きたい場合は、これでは煩雑になってしまいます。それに、SubjectのようなonCompletedが永遠に呼ばれないケースをテストできません。

そこでTestSubscriberの登場です。

TestSubscriber

TestSubscriberは、RxJavaに付属している単体テスト用のSubscriberです。

さきほどの例であれば、以下のように使うことができます。

    final TestSubscriber<String> subscriber = TestSubscriber.create();
    new AsyncService().doSomething().subscribe(subscriber);
    
    subscriber.awaitTerminalEvent(); // onCompleted() もしくは onError() が呼ばれるまで待つ
    subscriber.assertValue("success"); // emit されたイベントは1つだけで "success" のはず
    subscriber.assertCompleted(); // onCompleted() が呼ばれていることを確認

TestSubscriber は内部に CountDownLatch を持っており、イベントを一定数受け取ったり、あるいは onError()onCompleted() が呼ばれるまで待つことができます。この待つステップは awaitXXX というメソッドがあるので、それを使いましょう。

さらに、受け取ったイベントを検査することができるため、より細かいテストが可能となっています。

ちょっと便利な機能として、TestSubscriberには TestSubscriber#getLastSeenThread() という最後に実行されたスレッドを取得するメソッドがあります。これを使えば期待通りのスレッドでsubscribeが実行されることを保証できます。

test()メソッド

さらに、RxJava 2.x もしくは RxJava 1.2.3 以降では、 test() メソッドを使うことができます。この test() メソッドは AssertableSubscriber と呼ばれる TestSubscriber と同様のものを返し、それに対して様々な assertion を実行することができます。 AssertableSubscriber はメソッドチェインも出来るので、よりスッキリ書くことができます。

    new AsyncService().doSomething()
        .test()
        .awaitTerminalEvent()
        .assertValue("success")
        .assertCompleted();

TestSubscirberやtest()では足りない場合

TestSubscriberやtest()メソッドは便利ですが、それだけでは足りないことがあります。

たとえば、処理Aが完了する前に処理Bが走ったときの挙動を確認したい場合はどうしたら良いでしょうか。ご存知のように、RxJavaのObservableの多くはsubscribeされたタイミングで非同期処理が実行されます。その場合は、いくらobserve側をブロックしても、処理Aは完了してしまいます。

以下のようなテストを考えましょう。

1   @Test public void testCustomDoSomething() {
2     final TestSubscriber<String> subscriber = TestSubscriber.create();
3     final AsyncService service = new AsyncService();
4     service.doSomething().subscribe(subscriber);
5
6     // 返す値をセットする
7     service.value = "another_success";
8
9     subscriber.awaitTerminalEvent();
10    subscriber.assertValue("another_success");
11  }
12
13  private static class AsyncService {
14    String value = "success";
15
16    public Observable<String> doSomething() {
17      return Observable.defer(() -> Observable.just(value)).delay(1, TimeUnit.SECONDS);
18    }
19  }

このテストは失敗します。なぜなら、4行目で最初にsubscribeしたタイミングで AsyncService は初期値の value である "success" を返してしまうからです。いくら9行目でawaitしても、すでに処理は完了しているので、期待する結果 "another_success" は返ってきません。

ここで、 Observable#defer(Func0) を使って、subscribeされたタイミングで value を読み込んでいるのに注意しましょう。こうすることで、この doSomething() で返されるObservableに対してsubscribeが実行されるタイミングで初めて AsyncService#value を読み込んでくれるのです。

TestScheduler

では、どうやって、実際のsubscribeのタイミングをコントロールすれば良いでしょうか?RxJavaはこの手段も用意しています。TestSchedulerを使えば良いのです。

さきほどのテストは以下のように書けます。

    final TestScheduler scheduler = new TestScheduler();
    service.doSomething().subscribeOn(scheduler).subscribe(subscriber);

    // 返す値をセットする
    service.value = "another_success";

    // スケジューラを進める
    scheduler.triggerActions();

    subscriber.awaitTerminalEvent();
    subscriber.assertValue("another_success");

ここで subscribeOn() を呼び出していることに注意しましょう。subscribe時のスケジューラを TestScheduler にすることで、実際のsubscribeのタイミングをコントロールすることが出来るのです。

まとめ

この記事ではRxJavaを使った非同期処理のテスト方法について説明しました。最初に述べたように別スレッドで実行する処理のテストは、それ自体が難しいものです。RxJavaは幸い、TestSubscriber、test()、TestSchedulerといった便利なユーティリティがあるおかげで多少楽になっています。

これらのユーティリティについて改めて振り返ってみると、以下のような使い分けになっていることに気付くでしょう。

  • TestSubscriberとtest()メソッド:observe側のスレッドをコントロールする
  • TestScheduler:subscribe側のスレッドをコントロールする

RxJavaを使っているなら、テスト対象のメソッドの多くはObservableを返すはずです。そして、私たちがテストでコントロールできるのは、そのobserve側とsubscribe側しかありません。では、この両者があればRxJavaのからむテストは万全でしょうか?

いままでの方法では足りない場合

残念ながら、Observableを返してくれないメソッドのテストをしたいケースもあります。内部でsubscribeを行っている場合などです。次の記事では、そのような場合にどのようにテストするか述べます。