つながりを開発するブログ

Monoのsubscribeの実行スレッドについての考察

2021/05/27
By Yoshitaro Takahashi

概要

reactorのMonoのsubscribeは、非同期実行のイメージがあります。ですが、このイメージでsubscribeを使うと問題が発生することがあります。この記事では、subscribeがどのスレッドで実行されるかを簡単なサンプルを用いて考察します。

サンプルでの動作確認

単なるsubscribe

    @Test
    public void testSubscribeSameThread()  throws Exception {
        __logger.info("BEGIN");
        var mono = Mono.create(s -> {
            var value = "good";
            __logger.info("CREATE value=<{}>", value);
            s.success(value);
        });
        mono.subscribe(value -> {
            __logger.info("SUBSCRIBE value=<{}>", value);
        });
        __logger.info("END");
    }

このサンプルを実行すると以下のログが出力されます。

08:41:52.010 [main] - BEGIN
08:41:52.077 [main] - CREATE value=<good>
08:41:52.079 [main] - SUBSCRIBE value=<good>
08:41:52.079 [main] - END

このログの[main]はスレッド名です。テストを開始したスレッドとsubscribeが実行されたスレッドは同じスレッドです。

subscribeOnで実行スレッドを指定したsubscribe

    @Test
    public void testSubscribeRunOnOtherThread() throws Exception {
        __logger.info("BEGIN");
        var latch = new CountDownLatch(1);
        var mono = Mono.create(s -> {
            var value = "good";
            __logger.info("CREATE value=<{}>", value);
            s.success(value);
        }).subscribeOn(Schedulers.single());
        mono.subscribe(value -> {
            __logger.info("SUBSCRIBE value=<{}>", value);
            latch.countDown();
        });
        latch.await();
        __logger.info("END");
    }

このサンプルを実行すると以下のログが出力されます。

08:43:50.745 [main] - BEGIN
08:43:50.818 [main] - END
08:43:50.822 [single-1] - CREATE value=<good>
08:43:50.825 [single-1] - SUBSCRIBE value=<good>

ちゃんと別スレッドで実行されています。

非同期処理が途中に含まれるsubscribe

    @Test
    public void testSubscribeStartWithOtherThread() throws Exception {
        __logger.info("BEGIN");
        var latch = new CountDownLatch(1);
        var mono = Mono.create(s -> {
            var value = "good";
            Mono.just(value).subscribeOn(Schedulers.single()).subscribe(v -> {
                __logger.info("CREATE value=<{}>", value);
                s.success(v);
            });
        });
        mono.subscribe(value -> {
            __logger.info("SUBSCRIBE value=<{}>", value);
            latch.countDown();
        });
        latch.await();
        __logger.info("END");
    }

このサンプルを実行すると以下のログが出力されます。

08:50:55.570 [main] - BEGIN
08:50:55.642 [main] - END
08:50:55.645 [single-1] - CREATE value=<good>
08:50:55.648 [single-1] - SUBSCRIBE value=<good>

create時に、success()を読んだスレッドとsubscribeが実行されたスレッドが同じになっています。

処理の流れ

上記からsubscribeは以下のような処理になっていると推測されます。

  1. subscribeの呼び出しスレッド(main)で、Mono.createで指定したlambda式が実行
  2. lambda式内のMono.subscribeが別スレッド(single-1)で実行
  3. success()が別スレッド(single-1)で実行
  4. success()と同じスレッドで最初のMonoのsubscribeが実行

結論

subscribeはそのまま実行した場合には非同期にはなりません。subscribeを実行したMonoの処理が非同期ならsubscribeはそこから非同期処理になります。subscribeを明示的に非同期実行したい場合には、subscribeOnメソッドで実行スレッドを指定する必要があります。

タイトルとURLをコピーしました