Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Flow programming: subscriber and publisher to keep track of count?

I came over an article regarding the new Flow related interfaces in Java9. Example code from there:

public class MySubscriber<T> implements Subscriber<T> {  
  private Subscription subscription;    
  @Override  
  public void onSubscribe(Subscription subscription) {  
    this.subscription = subscription;  
    subscription.request(1); //a value of  Long.MAX_VALUE may be considered as effectively unbounded  
  }        
  @Override  
  public void onNext(T item) {  
    System.out.println("Got : " + item);  
    subscription.request(1); //a value of  Long.MAX_VALUE may be considered as effectively unbounded  
   }  

As you can see, onNext() requests one new item to be pushed.

Now I am wondering:

  • if onSubscribe() requested, say 5 items
  • and after the first item gets delivered, request(1) is called like above

Is the server now expected to send

  • five items ( 5 requested. -1 sent. +1 requested)
  • or one item (because the previous request gets "discarded" by that new request)

In other words: when request() is called several times, do those numbers add up; or are previous requests "discarded"?

Leading to the question title - whether the subscriber needs to keep track about received items, in order to avoid requesting "too many" items at some point.

like image 242
GhostCat Avatar asked Jul 19 '17 14:07

GhostCat


People also ask

Can a subscriber also be a publisher?

In terms of Reactive Streams, there is a Publisher and it could have as many Subscribers.

What is publisher and subscriber in reactive programming?

The Publish/Subscribe Pattern is a Bidirectional flow where the emitters of the events, called the Publishers, multicast events to a number of registered recipients, called the Subscribers. This is different from Observer Pattern, the key difference is that Subscribers don't know about the existence of the Publisher.

When the onNext () method of the subscriber is called?

After a Subscriber calls an Observable 's subscribe method, the Observable calls the Subscriber's Observer. onNext(T) method to emit items. A well-behaved Observable will call a Subscriber's Observer. onCompleted() method exactly once or the Subscriber's Observer.

How do I subscribe to flux?

Subscribers. To make the data flow you have to subscribe to the Flux using one of the subscribe() methods. Only those methods make the data flow. They reach back through the chain of operators you declared on your sequence (if any) and request the publisher to start creating data.


2 Answers

As Sotirios points out, the request method's Javadoc states (emphasis mine):

Adds the given number n of items to the current unfulfilled demand for this subscription. If n is less than or equal to zero, the Subscriber will receive an onError signal with an IllegalArgumentException argument. Otherwise, the Subscriber will receive up to n additional onNext invocations (or fewer if terminated).

So the answer is clearly yes, the subscriber needs to keep track of items. In fact, that's the whole point of the mechanism. Some background: The request method is meant to allow the subscriber to apply backpressure, informing upstream components that it is overloaded and "needs a break". It is hence the subscriber's (and only its) task to carefully vet when and how many new items to request. In that line it can not "reconsider" and lower the number of items to receive.

Lowering the number would also make the communication between publisher and subscriber "non-monotonic" in the sense that the number of totally requested items could suddenly lower (as it stands it can only increase). This is not only annoying in an abstract sense, it poses concrete consistency problems: The publisher could be in the process of delivering a few items when the subscriber suddenly drops the number of requested items to 1 - what now?

like image 181
Nicolai Parlog Avatar answered Oct 19 '22 21:10

Nicolai Parlog


Although Java 9 does not implement the Reactive Streams API, it offers the nearly same API and defines the corresponding behaviour.

And in the Reactive Streams specification this adding up is defined by the following:

For the Publisher in 1.1:

The total number of onNext signals sent by a Publisher to a Subscriber MUST be less than or equal to the total number of elements requested by that Subscriber´s Subscription at all times.

And for the Subscription in 3.8:

While the Subscription is not cancelled, Subscription.request(long n) MUST register the given number of additional elements to be produced to the respective subscriber.

So Java adheres to the specification that was given in the Reactive Streams API.

like image 23
P.J.Meisch Avatar answered Oct 19 '22 20:10

P.J.Meisch