概要
reactorのMonoのsubscribeは、非同期実行のイメージがあります。ですが、このイメージでsubscribeを使うと問題が発生することがあります。この記事では、subscribeがどのスレッドで実行されるかを簡単なサンプルを用いて考察します。
サンプルでの動作確認
単なるsubscribe
@Test
public void testSubscribeSameThread() throws Exception {
__logger.info("BEGIN");
var mono = Mono.create(s -> {
var value = "good";
__logger.info("CREATE value=<{}>", value);
s.success(value);
});
mono.subscribe(value -> {
__logger.info("SUBSCRIBE value=<{}>", value);
});
__logger.info("END");
}
このサンプルを実行すると以下のログが出力されます。
08:41:52.010 [main] - BEGIN
08:41:52.077 [main] - CREATE value=<good>
08:41:52.079 [main] - SUBSCRIBE value=<good>
08:41:52.079 [main] - END
このログの[main]はスレッド名です。テストを開始したスレッドとsubscribeが実行されたスレッドは同じスレッドです。
subscribeOnで実行スレッドを指定したsubscribe
@Test
public void testSubscribeRunOnOtherThread() throws Exception {
__logger.info("BEGIN");
var latch = new CountDownLatch(1);
var mono = Mono.create(s -> {
var value = "good";
__logger.info("CREATE value=<{}>", value);
s.success(value);
}).subscribeOn(Schedulers.single());
mono.subscribe(value -> {
__logger.info("SUBSCRIBE value=<{}>", value);
latch.countDown();
});
latch.await();
__logger.info("END");
}
このサンプルを実行すると以下のログが出力されます。
08:43:50.745 [main] - BEGIN
08:43:50.818 [main] - END
08:43:50.822 [single-1] - CREATE value=<good>
08:43:50.825 [single-1] - SUBSCRIBE value=<good>
ちゃんと別スレッドで実行されています。
非同期処理が途中に含まれるsubscribe
@Test
public void testSubscribeStartWithOtherThread() throws Exception {
__logger.info("BEGIN");
var latch = new CountDownLatch(1);
var mono = Mono.create(s -> {
var value = "good";
Mono.just(value).subscribeOn(Schedulers.single()).subscribe(v -> {
__logger.info("CREATE value=<{}>", value);
s.success(v);
});
});
mono.subscribe(value -> {
__logger.info("SUBSCRIBE value=<{}>", value);
latch.countDown();
});
latch.await();
__logger.info("END");
}
このサンプルを実行すると以下のログが出力されます。
08:50:55.570 [main] - BEGIN
08:50:55.642 [main] - END
08:50:55.645 [single-1] - CREATE value=<good>
08:50:55.648 [single-1] - SUBSCRIBE value=<good>
create時に、success()を読んだスレッドとsubscribeが実行されたスレッドが同じになっています。
処理の流れ
上記からsubscribeは以下のような処理になっていると推測されます。- subscribeの呼び出しスレッド(main)で、Mono.createで指定したlambda式が実行
- lambda式内のMono.subscribeが別スレッド(single-1)で実行
- success()が別スレッド(single-1)で実行
- success()と同じスレッドで最初のMonoのsubscribeが実行