I came over an article regarding the new Flow
related interfaces in Java9. Example code from there:
public class MySubscriber<T> implements Subscriber<T> {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1); //a value of Long.MAX_VALUE may be considered as effectively unbounded
}
@Override
public void onNext(T item) {
System.out.println("Got : " + item);
subscription.request(1); //a value of Long.MAX_VALUE may be considered as effectively unbounded
}
As you can see, onNext()
requests one new item to be pushed.
Now I am wondering:
onSubscribe()
requested, say 5 items request(1)
is called like aboveIs the server now expected to send
In other words: when request()
is called several times, do those numbers add up; or are previous requests "discarded"?
Leading to the question title - whether the subscriber needs to keep track about received items, in order to avoid requesting "too many" items at some point.
In terms of Reactive Streams, there is a Publisher and it could have as many Subscribers.
The Publish/Subscribe Pattern is a Bidirectional flow where the emitters of the events, called the Publishers, multicast events to a number of registered recipients, called the Subscribers. This is different from Observer Pattern, the key difference is that Subscribers don't know about the existence of the Publisher.
After a Subscriber calls an Observable 's subscribe method, the Observable calls the Subscriber's Observer. onNext(T) method to emit items. A well-behaved Observable will call a Subscriber's Observer. onCompleted() method exactly once or the Subscriber's Observer.
Subscribers. To make the data flow you have to subscribe to the Flux using one of the subscribe() methods. Only those methods make the data flow. They reach back through the chain of operators you declared on your sequence (if any) and request the publisher to start creating data.
As Sotirios points out, the request
method's Javadoc states (emphasis mine):
Adds the given number
n
of items to the current unfulfilled demand for this subscription. Ifn
is less than or equal to zero, the Subscriber will receive an onError signal with an IllegalArgumentException argument. Otherwise, the Subscriber will receive up ton
additional onNext invocations (or fewer if terminated).
So the answer is clearly yes, the subscriber needs to keep track of items. In fact, that's the whole point of the mechanism. Some background: The request
method is meant to allow the subscriber to apply backpressure, informing upstream components that it is overloaded and "needs a break". It is hence the subscriber's (and only its) task to carefully vet when and how many new items to request. In that line it can not "reconsider" and lower the number of items to receive.
Lowering the number would also make the communication between publisher and subscriber "non-monotonic" in the sense that the number of totally requested items could suddenly lower (as it stands it can only increase). This is not only annoying in an abstract sense, it poses concrete consistency problems: The publisher could be in the process of delivering a few items when the subscriber suddenly drops the number of requested items to 1 - what now?
Although Java 9 does not implement the Reactive Streams API, it offers the nearly same API and defines the corresponding behaviour.
And in the Reactive Streams specification this adding up is defined by the following:
For the Publisher in 1.1:
The total number of onNext signals sent by a Publisher to a Subscriber MUST be less than or equal to the total number of elements requested by that Subscriber´s Subscription at all times.
And for the Subscription in 3.8:
While the Subscription is not cancelled, Subscription.request(long n) MUST register the given number of additional elements to be produced to the respective subscriber.
So Java adheres to the specification that was given in the Reactive Streams API.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With