Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Publishing data on java 9 Flow to subscribers in a way that only one subscriber will consume it

Is there a way to publish data to subscribers in a way that only one subscriber will receive it ? What i am trying to achieve is that the subscriber publisher model will work as a queue that has multiple readers but one publisher. Once the publisher will publish data, the first subscriber that receives it, will be the only one that will process it.

Thanks in advance !!!

like image 996
Roy Kornviets Avatar asked May 01 '18 07:05

Roy Kornviets


People also ask

Which method of the submission publisher class publishes the given item to each current subscriber?

submit. Publishes the given item to each current subscriber by asynchronously invoking its onNext method, blocking uninterruptibly while resources for any subscriber are unavailable.

What is publisher and subscriber in Java?

In the publish/subscribe domain, message producers are called publishers and message consumers are called subscribers. They exchange messages by means of a destination called a topic: publishers produce messages to a topic; subscribers subscribe to a topic and consume messages from a topic.

How does subscribe work in Java?

Subscribing MethodAMPS creates a background thread that receives messages and copies them into the MessageStream that you iterate over. This means that the client application as a whole can continue to receive messages while you are doing processing work.

What is publisher in Java?

A Publisher is a provider of an unbounded number of sequenced elements publishing them according to demand received from its Subscribers. Publisher<T> interface is responsible for publishing elements of type T and provides a subscribe() method for subscribers to connect to it.


2 Answers

In reactive streams (at least, in their java.util.concurrent.Flow incarnation), subscribers just ask for data, and only the publisher is in control of how to publish that data.

The only general purpose implementation of Flow.Publisher that exists in Java 9 is SubmissionPublisher that follows the standard pub/sub way of publishing any published item to all the subscribers. I did not find any easy way to hack SubmissionPublisher to make it only publish to one subscriber.

But you could try to write your own Flow.Publisher implementation, something like this:

class QueueLikePublisher<T> implements Publisher<T> {
    private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
    private List<QueueLikeSubscription<? super T>> subscriptions = new CopyOnWriteArrayList<>();

    public synchronized void subscribe(Subscriber<? super T> subscriber) {
        // subscribing: adding a new subscription to the list
        QueueLikeSubscription<? super T> subscription = new QueueLikeSubscription<>(subscriber, executor);
        subscriptions.add(subscription);
        subscriber.onSubscribe(subscription);
    }

    public void submit(T item) {
        // we got some data: looking for non-completed and demanding
        // subscription and give it the data item

        for (QueueLikeSubscription<? super T> subscription : subscriptions) {
            if (!subscription.completed && subscription.demand > 0) {
                subscription.offer(item);
                // we just give it to one subscriber; probaly offer() call needs
                // to be wrapped in a try/catch
                break;
            }
        }
    }

    static class QueueLikeSubscription<T> implements Subscription {
        private final Subscriber<? super T> subscriber;
        private final ExecutorService executor;
        volatile int demand = 0;
        volatile boolean completed = false;

        QueueLikeSubscription(Subscriber<? super T> subscriber,
                ExecutorService executor) {
            this.subscriber = subscriber;
            this.executor = executor;
        }

        public synchronized void request(long n) {
            if (n != 0 && !completed) {
                if (n < 0) {
                    IllegalArgumentException ex = new IllegalArgumentException();
                    executor.execute(() -> subscriber.onError(ex));
                } else {
                    // just extending the demand
                    demand += n;
                }
            }
        }

        public synchronized void cancel() {
            completed = true;
        }

        Future<?> offer(T item) {
            return executor.submit(() -> {
                try {
                    subscriber.onNext(item);
                } catch (Exception e) {
                    subscriber.onError(e);
                }
            });
        }
    }
}

It publishes the item to the first subscriber that is not yet completed (for example, cancelled) and that has non-zero demand.

Please note that this code is just an outline for illistrative purposes to demonstrate the idea. For example, it should probably contain more exception handling (like handling the RejectedExecutionException).

like image 198
Roman Puchkovskiy Avatar answered Oct 18 '22 15:10

Roman Puchkovskiy


The case when only one subscriber should receive every data item is common. For example, subscribers can be database connections and data items - requests to a database, and the publisher - the central entry point to the whole connection pool. It is a different data exchange protocol, so using interfaces from j.u.c.Flow may be confusing.

Generally, those interfaces can be used for this master-worker protocol, but there is a subtle but important difference: subscribers should not request more than one data item at a time. Otherwise, one worker can take several items while other workers would sit without work. So the method Subscription#request() can be removed from interface. It is assumed that by the act of subscribing, the subscriber agrees to accept one data item. As soon as that item is submitted to the subscriber, subscriber is unsubscribed. This allows not to scan the list of subscriptions trying to find acceptable subscriber (as in the @Roman Puchkovskiy implementation) but submit next data item to the first subscribed subscriber. As soon as subscriber needs more data, it subscribes again. This is exactly how worker threads in a thread pool request next tasks.

Since the method cancel() remains the only method in Subscription, we can replace it with new method Publisher#cancel(Subscriber) and eliminate Subscription interface entirely. The method Subscriber#onSubscribe(Subscription) is replaced then with the methodSubscriber#onSubscribe(Publisher).

I am developing an asynchronous library (it is not yet of production quality) which contains the solution for your use case: class PickPoint.

like image 21
Alexei Kaigorodov Avatar answered Oct 18 '22 13:10

Alexei Kaigorodov