JavaのReactorのFluxでフロー制御可能なPublisherの実装を試して見ました。
目次
環境
Java 11
Reactor Core 3.4.0
Publisherの実装
以下のコードで並行処理/フロー制御ありのFluxと正しく連携出来ました。
private static class _Publisher implements Publisher<String> {
private Lock _lock = new ReentrantLock();
// request数を超えた分は貯めておきます。
private LinkedList<String> _datum = new LinkedList<>();
// subscribeされた時点でデータの出力先として保持します(今回は複数回のサブスクライブには対応しません)。
private Subscriber<? super String> _subscriber;
// 指定されたrequest数に足りなかった分です。こちらが0になるまでは、データ登録時点でsubmitします。
private long _rest;
// 終了フラグです。
private boolean _completed;
// Fluxから、呼ばれます。
@Override
public void subscribe(Subscriber<? super String> s) {
_subscriber = s;
// 送信を制御するためのCallbackを登録します。
s.onSubscribe(new Subscription() {
// Flux側が処理可能になると呼ばれます。request数(n)は初期値はLongの最大値です。Flux側でlimitRate等を設定する事でnの値を変更可能です。
@Override
public void request(long n) {
_lock.lock();
try {
__logger.debug("request BEGIN count=<{}>", n);
_rest = _rest + n;
_consume();
// データが残っている間はcompleteを発行しません。
if (_completed && _datum.isEmpty()) {
__logger.debug("onComplete CALLED");
s.onComplete();
}
__logger.debug("request END");
} finally {
_lock.unlock();
}
}
// cancelはエラー等が発生してFluxが途中終了した場合に呼ばれます
@Override
public void cancel() {
__logger.debug("cancel CALL");
_datum.clear();
}
});
}
// Publisherが発行するデータを追加します。
public void add(String data) {
__logger.debug("add CALLED data=<{}>", data);
_lock.lock();
try {
if (_rest == 0) {
_datum.add(data);
} else {
_subscriber.onNext(data);
}
} finally {
_lock.unlock();
}
}
// すべてのデータの追加が完了した事を示します。
public void complete() {
__logger.debug("complete CALLED");
_lock.lock();
try {
_completed = true;
if (_subscriber != null) {
_consume();
_subscriber.onComplete();
}
} finally {
_lock.unlock();
}
}
private void _consume() {
while (_rest > 0) {
var data = _datum.poll();
if (data != null) {
__logger.debug("onNext CALLED data=<{}>", data);
_rest--;
_subscriber.onNext(data);
} else {
break;
}
}
}
}
Publisherの使用イメージ
var publisher = new _Publisher();// こちらを実装していく
var latch = new CountDownLatch(1);
// Fluxをpublisherから作成する。
// limitRate(2)を指定しているので、requestには2が渡される
// PublisherのonNextが呼ばれると、data=のログが出力される。
Flux.from(publisher). //
limitRate(2). // request数を制限する
parallel(3).runOn(Schedulers.parallel()). // 並行実行の指定(多重度3)
doOnComplete(() -> latch.countDown()). //
subscribe(d -> __logger.info("data=<{}>", d));
// Fluxを作成後にpublisherにデータを追加していく。
for (int i = 0; i < 5; i++) {
publisher.add("data_" + i);
}
publisher.complete();
try {
Assertions.assertTrue(latch.await(1000, TimeUnit.MILLISECONDS));
} catch (InterruptedException e) {
}
実行結果
limitRate = 2
以下のようにrequestが2で呼ばれて、それで指定された件数のonNext(data=)がPublisherで呼ばれた時点で次のrequestが呼ばれています。
18:39:04.207 [main] - request BEGIN count=<2>
18:39:04.210 [main] - request END
18:39:04.210 [main] - add CALLED data=<data_0>
18:39:04.210 [main] - data=<data_0>
18:39:04.210 [main] - add CALLED data=<data_1>
18:39:04.211 [main] - data=<data_1>
18:39:04.211 [main] - request BEGIN count=<2>
18:39:04.211 [main] - request END
18:39:04.211 [main] - add CALLED data=<data_2>
18:39:04.211 [main] - data=<data_2>
18:39:04.211 [main] - add CALLED data=<data_3>
18:39:04.211 [main] - data=<data_3>
18:39:04.211 [main] - request BEGIN count=<2>
18:39:04.211 [main] - request END
18:39:04.211 [main] - add CALLED data=<data_4>
18:39:04.211 [main] - data=<data_4>
18:39:04.211 [main] - complete CALLED
limitRate指定なし
requestがLongの最大値で呼ばれています。
18:38:18.689 [main] - request BEGIN count=<9223372036854775807>
18:38:18.694 [main] - request END
18:38:18.694 [main] - add CALLED data=<data_0>
18:38:18.695 [main] - data=<data_0>
18:38:18.696 [main] - add CALLED data=<data_1>
18:38:18.696 [main] - data=<data_1>
18:38:18.696 [main] - add CALLED data=<data_2>
18:38:18.696 [main] - data=<data_2>
18:38:18.696 [main] - add CALLED data=<data_3>
18:38:18.696 [main] - data=<data_3>
18:38:18.696 [main] - add CALLED data=<data_4>
18:38:18.696 [main] - data=<data_4>
18:38:18.696 [main] - complete CALLED
並行実行(多重度3)/limitRate = 2
出力(data=)が複数のスレッドで実行されています。
18:45:53.965 [main] - request BEGIN count=<2>
18:45:53.969 [main] - request END
18:45:53.969 [main] - add CALLED data=<data_0>
18:45:53.970 [main] - add CALLED data=<data_1>
18:45:53.970 [main] - request BEGIN count=<2>
18:45:53.970 [parallel-1] - data=<data_0>
18:45:53.970 [main] - request END
18:45:53.971 [main] - add CALLED data=<data_2>
18:45:53.971 [parallel-2] - data=<data_1>
18:45:53.973 [main] - add CALLED data=<data_3>
18:45:53.973 [parallel-3] - data=<data_2>
18:45:53.973 [main] - request BEGIN count=<2>
18:45:53.973 [main] - request END
18:45:53.973 [main] - add CALLED data=<data_4>
18:45:53.973 [parallel-1] - data=<data_3>
18:45:53.973 [main] - add CALLED data=<data_5>
18:45:53.973 [parallel-2] - data=<data_4>
18:45:53.973 [main] - request BEGIN count=<2>
18:45:53.973 [main] - request END
18:45:53.973 [main] - add CALLED data=<data_6>
18:45:53.973 [parallel-3] - data=<data_5>
18:45:53.973 [parallel-1] - data=<data_6>
18:45:53.973 [main] - add CALLED data=<data_7>
18:45:53.973 [main] - request BEGIN count=<2>
18:45:53.974 [main] - request END
18:45:53.974 [main] - add CALLED data=<data_8>
18:45:53.974 [parallel-2] - data=<data_7>
18:45:53.974 [main] - add CALLED data=<data_9>
18:45:53.974 [parallel-3] - data=<data_8>
18:45:53.974 [main] - request BEGIN count=<2>
18:45:53.974 [main] - request END
18:45:53.974 [main] - complete CALLED
18:45:53.974 [parallel-1] - data=<data_9>
Sinkを使った実装
Fluxのフロー制御の理解を深めるために、Publisherを自作して見ましたが、Reactor Coreにこれと同等の処理を簡単に実装する仕組みが用意されています。
var sink = Sinks.many().multicast().onBackpressureBuffer(10);
var latch = new CountDownLatch(1);
// sinkはPublisherではありませんが、以下の仕組みでFluxに変換出来ます。
sink.asFlux().parallel(2, 3).runOn(Schedulers.parallel()).doOnComplete(() -> latch.countDown())
.subscribe(d -> __logger.info("data=<{}>", d));
for (int i = 0; i < 10; i++) {
sink.tryEmitNext("data_" + i);
}
sink.tryEmitComplete();
try {
Assertions.assertTrue(latch.await(1000, TimeUnit.MILLISECONDS));
} catch (InterruptedException e) {
}
結論
自分でPublisherを実装してFluxのフロー制御の動作を確認しました。Fluxでフロー制御を正しく行うためには、Publisher側でrequestを正しく実装する必要があります。この辺を簡単にするために、Reactor Coreでは、Sinkという仕組みが用意されているようです。