Is there a way to publish data to subscribers in a way that only one subscriber will receive it ? What i am trying to achieve is that the subscriber publisher model will work as a queue that has multiple readers but one publisher. Once the publisher will publish data, the first subscriber that receives it, will be the only one that will process it.
Thanks in advance !!!
submit. Publishes the given item to each current subscriber by asynchronously invoking its onNext method, blocking uninterruptibly while resources for any subscriber are unavailable.
In the publish/subscribe domain, message producers are called publishers and message consumers are called subscribers. They exchange messages by means of a destination called a topic: publishers produce messages to a topic; subscribers subscribe to a topic and consume messages from a topic.
Subscribing MethodAMPS creates a background thread that receives messages and copies them into the MessageStream that you iterate over. This means that the client application as a whole can continue to receive messages while you are doing processing work.
A Publisher is a provider of an unbounded number of sequenced elements publishing them according to demand received from its Subscribers. Publisher<T> interface is responsible for publishing elements of type T and provides a subscribe() method for subscribers to connect to it.
In reactive streams (at least, in their java.util.concurrent.Flow
incarnation), subscribers just ask for data, and only the publisher is in control of how to publish that data.
The only general purpose implementation of Flow.Publisher
that exists in Java 9 is SubmissionPublisher
that follows the standard pub/sub way of publishing any published item to all the subscribers. I did not find any easy way to hack SubmissionPublisher
to make it only publish to one subscriber.
But you could try to write your own Flow.Publisher
implementation, something like this:
class QueueLikePublisher<T> implements Publisher<T> {
private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
private List<QueueLikeSubscription<? super T>> subscriptions = new CopyOnWriteArrayList<>();
public synchronized void subscribe(Subscriber<? super T> subscriber) {
// subscribing: adding a new subscription to the list
QueueLikeSubscription<? super T> subscription = new QueueLikeSubscription<>(subscriber, executor);
subscriptions.add(subscription);
subscriber.onSubscribe(subscription);
}
public void submit(T item) {
// we got some data: looking for non-completed and demanding
// subscription and give it the data item
for (QueueLikeSubscription<? super T> subscription : subscriptions) {
if (!subscription.completed && subscription.demand > 0) {
subscription.offer(item);
// we just give it to one subscriber; probaly offer() call needs
// to be wrapped in a try/catch
break;
}
}
}
static class QueueLikeSubscription<T> implements Subscription {
private final Subscriber<? super T> subscriber;
private final ExecutorService executor;
volatile int demand = 0;
volatile boolean completed = false;
QueueLikeSubscription(Subscriber<? super T> subscriber,
ExecutorService executor) {
this.subscriber = subscriber;
this.executor = executor;
}
public synchronized void request(long n) {
if (n != 0 && !completed) {
if (n < 0) {
IllegalArgumentException ex = new IllegalArgumentException();
executor.execute(() -> subscriber.onError(ex));
} else {
// just extending the demand
demand += n;
}
}
}
public synchronized void cancel() {
completed = true;
}
Future<?> offer(T item) {
return executor.submit(() -> {
try {
subscriber.onNext(item);
} catch (Exception e) {
subscriber.onError(e);
}
});
}
}
}
It publishes the item to the first subscriber that is not yet completed (for example, cancelled) and that has non-zero demand.
Please note that this code is just an outline for illistrative purposes to demonstrate the idea. For example, it should probably contain more exception handling (like handling the RejectedExecutionException
).
The case when only one subscriber should receive every data item is common. For example, subscribers can be database connections and data items - requests to a database, and the publisher - the central entry point to the whole connection pool. It is a different data exchange protocol, so using interfaces from j.u.c.Flow may be confusing.
Generally, those interfaces can be used for this master-worker protocol, but there is a subtle but important difference: subscribers should not request more than one data item at a time. Otherwise, one worker can take several items while other workers would sit without work. So the method Subscription#request()
can be removed from interface. It is assumed that by the act of subscribing, the subscriber agrees to accept one data item. As soon as that item is submitted to the subscriber, subscriber is unsubscribed. This allows not to scan the list of subscriptions trying to find acceptable subscriber (as in the @Roman Puchkovskiy implementation) but submit next data item to the first subscribed subscriber. As soon as subscriber needs more data, it subscribes again. This is exactly how worker threads in a thread pool request next tasks.
Since the method cancel()
remains the only method in Subscription
, we can replace it with new method Publisher#cancel(Subscriber)
and eliminate Subscription
interface entirely. The method Subscriber#onSubscribe(Subscription)
is replaced then with the methodSubscriber#onSubscribe(Publisher)
.
I am developing an asynchronous library (it is not yet of production quality) which contains the solution for your use case: class PickPoint.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With