FluxのソースとなるPublisherを実装してみる

353
NO IMAGE

JavaのReactorのFluxでフロー制御可能なPublisherの実装を試して見ました。

環境

Java 11
Reactor Core 3.4.0

Publisherの実装

以下のコードで並行処理/フロー制御ありのFluxと正しく連携出来ました。

    private static class _Publisher implements Publisher<String> {

        private Lock _lock = new ReentrantLock();

        // request数を超えた分は貯めておきます。
        private LinkedList<String> _datum = new LinkedList<>();

        // subscribeされた時点でデータの出力先として保持します(今回は複数回のサブスクライブには対応しません)。
        private Subscriber<? super String> _subscriber;

        // 指定されたrequest数に足りなかった分です。こちらが0になるまでは、データ登録時点でsubmitします。
        private long _rest;

        // 終了フラグです。
        private boolean _completed;

        // Fluxから、呼ばれます。
        @Override
        public void subscribe(Subscriber<? super String> s) {
            _subscriber = s;
            // 送信を制御するためのCallbackを登録します。
            s.onSubscribe(new Subscription() {

     // Flux側が処理可能になると呼ばれます。request数(n)は初期値はLongの最大値です。Flux側でlimitRate等を設定する事でnの値を変更可能です。
                @Override
                public void request(long n) {
                    _lock.lock();
                    try {
                        __logger.debug("request BEGIN count=<{}>", n);
                        _rest = _rest + n;
                        _consume();
                        // データが残っている間はcompleteを発行しません。
                        if (_completed && _datum.isEmpty()) {
                            __logger.debug("onComplete CALLED");
                            s.onComplete();
                        }
                        __logger.debug("request END");
                    } finally {
                        _lock.unlock();
                    }
                }

                // cancelはエラー等が発生してFluxが途中終了した場合に呼ばれます
                @Override
                public void cancel() {
                    __logger.debug("cancel CALL");
                    _datum.clear();
                }
            });
        }

        // Publisherが発行するデータを追加します。
        public void add(String data) {
            __logger.debug("add CALLED data=<{}>", data);
            _lock.lock();
            try {
                if (_rest == 0) {
                    _datum.add(data);
                } else {
                    _subscriber.onNext(data);
                }
            } finally {
                _lock.unlock();
            }
        }

        // すべてのデータの追加が完了した事を示します。
        public void complete() {
            __logger.debug("complete CALLED");
            _lock.lock();
            try {
                _completed = true;
                if (_subscriber != null) {
                    _consume();
                    _subscriber.onComplete();
                }
            } finally {
                _lock.unlock();
            }
        }

        private void _consume() {
            while (_rest > 0) {
                var data = _datum.poll();
                if (data != null) {
                    __logger.debug("onNext CALLED data=<{}>", data);
                    _rest--;
                    _subscriber.onNext(data);
                } else {
                    break;
                }
            }
        }

    }

Publisherの使用イメージ

        var publisher = new _Publisher();// こちらを実装していく
        var latch = new CountDownLatch(1);
// Fluxをpublisherから作成する。
// limitRate(2)を指定しているので、requestには2が渡される
// PublisherのonNextが呼ばれると、data=のログが出力される。
Flux.from(publisher). //
limitRate(2). // request数を制限する
parallel(3).runOn(Schedulers.parallel()). // 並行実行の指定(多重度3)
doOnComplete(() -> latch.countDown()). //
subscribe(d -> __logger.info("data=<{}>", d));
// Fluxを作成後にpublisherにデータを追加していく。
        for (int i = 0; i < 5; i++) {
            publisher.add("data_" + i);
        }
        publisher.complete();
        try {
            Assertions.assertTrue(latch.await(1000, TimeUnit.MILLISECONDS));
        } catch (InterruptedException e) {
        }

実行結果

limitRate = 2

以下のようにrequestが2で呼ばれて、それで指定された件数のonNext(data=)がPublisherで呼ばれた時点で次のrequestが呼ばれています。

18:39:04.207 [main] - request BEGIN count=<2>
18:39:04.210 [main] - request END
18:39:04.210 [main] - add CALLED data=<data_0>
18:39:04.210 [main] - data=<data_0>
18:39:04.210 [main] - add CALLED data=<data_1>
18:39:04.211 [main] - data=<data_1>
18:39:04.211 [main] - request BEGIN count=<2>
18:39:04.211 [main] - request END
18:39:04.211 [main] - add CALLED data=<data_2>
18:39:04.211 [main] - data=<data_2>
18:39:04.211 [main] - add CALLED data=<data_3>
18:39:04.211 [main] - data=<data_3>
18:39:04.211 [main] - request BEGIN count=<2>
18:39:04.211 [main] - request END
18:39:04.211 [main] - add CALLED data=<data_4>
18:39:04.211 [main] - data=<data_4>
18:39:04.211 [main] - complete CALLED

limitRate指定なし

requestがLongの最大値で呼ばれています。

18:38:18.689 [main] - request BEGIN count=<9223372036854775807>
18:38:18.694 [main] - request END
18:38:18.694 [main] - add CALLED data=<data_0>
18:38:18.695 [main] - data=<data_0>
18:38:18.696 [main] - add CALLED data=<data_1>
18:38:18.696 [main] - data=<data_1>
18:38:18.696 [main] - add CALLED data=<data_2>
18:38:18.696 [main] - data=<data_2>
18:38:18.696 [main] - add CALLED data=<data_3>
18:38:18.696 [main] - data=<data_3>
18:38:18.696 [main] - add CALLED data=<data_4>
18:38:18.696 [main] - data=<data_4>
18:38:18.696 [main] - complete CALLED

並行実行(多重度3)/limitRate = 2

出力(data=)が複数のスレッドで実行されています。

18:45:53.965 [main] - request BEGIN count=<2>
18:45:53.969 [main] - request END
18:45:53.969 [main] - add CALLED data=<data_0>
18:45:53.970 [main] - add CALLED data=<data_1>
18:45:53.970 [main] - request BEGIN count=<2>
18:45:53.970 [parallel-1] - data=<data_0>
18:45:53.970 [main] - request END
18:45:53.971 [main] - add CALLED data=<data_2>
18:45:53.971 [parallel-2] - data=<data_1>
18:45:53.973 [main] - add CALLED data=<data_3>
18:45:53.973 [parallel-3] - data=<data_2>
18:45:53.973 [main] - request BEGIN count=<2>
18:45:53.973 [main] - request END
18:45:53.973 [main] - add CALLED data=<data_4>
18:45:53.973 [parallel-1] - data=<data_3>
18:45:53.973 [main] - add CALLED data=<data_5>
18:45:53.973 [parallel-2] - data=<data_4>
18:45:53.973 [main] - request BEGIN count=<2>
18:45:53.973 [main] - request END
18:45:53.973 [main] - add CALLED data=<data_6>
18:45:53.973 [parallel-3] - data=<data_5>
18:45:53.973 [parallel-1] - data=<data_6>
18:45:53.973 [main] - add CALLED data=<data_7>
18:45:53.973 [main] - request BEGIN count=<2>
18:45:53.974 [main] - request END
18:45:53.974 [main] - add CALLED data=<data_8>
18:45:53.974 [parallel-2] - data=<data_7>
18:45:53.974 [main] - add CALLED data=<data_9>
18:45:53.974 [parallel-3] - data=<data_8>
18:45:53.974 [main] - request BEGIN count=<2>
18:45:53.974 [main] - request END
18:45:53.974 [main] - complete CALLED
18:45:53.974 [parallel-1] - data=<data_9>

Sinkを使った実装

Fluxのフロー制御の理解を深めるために、Publisherを自作して見ましたが、Reactor Coreにこれと同等の処理を簡単に実装する仕組みが用意されています。

        var sink = Sinks.many().multicast().onBackpressureBuffer(10);
        var latch = new CountDownLatch(1);
        // sinkはPublisherではありませんが、以下の仕組みでFluxに変換出来ます。
        sink.asFlux().parallel(2, 3).runOn(Schedulers.parallel()).doOnComplete(() -> latch.countDown())
                .subscribe(d -> __logger.info("data=<{}>", d));
        for (int i = 0; i < 10; i++) {
            sink.tryEmitNext("data_" + i);
        }
        sink.tryEmitComplete();
        try {
            Assertions.assertTrue(latch.await(1000, TimeUnit.MILLISECONDS));
        } catch (InterruptedException e) {
        }

結論

自分でPublisherを実装してFluxのフロー制御の動作を確認しました。Fluxでフロー制御を正しく行うためには、Publisher側でrequestを正しく実装する必要があります。この辺を簡単にするために、Reactor Coreでは、Sinkという仕組みが用意されているようです。