Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java 9 Behavior of Flow SubmissionPublisher offer method

I´ve been playing with Java Flow offer operator but after read the documentation and do my test I dont understand.

Here my test

@Test
public void offer() throws InterruptedException {
    //Create Publisher for expected items Strings
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    //Register Subscriber
    publisher.subscribe(new CustomSubscriber<>());
    publisher.subscribe(new CustomSubscriber<>());
    publisher.subscribe(new CustomSubscriber<>());
    publisher.offer("item", (subscriber, value) -> false);
    Thread.sleep(500);
}

The offer operator receive an item to be emitted and a BiPredicate function, and as far as I understand reading the documentation, only in case that the predicate function is true the item it will be emitted.

Bur after pass the test the result is

Subscription done:
Subscription done:
Subscription done:
Got : item --> onNext() callback
Got : item --> onNext() callback
Got : item --> onNext() callback

There´s no change in the result if instead of false I return true.

Anybody can explain me this operator a little bit better please.

like image 813
paul Avatar asked Sep 30 '17 11:09

paul


People also ask

Which method of the Submissionpublisher class publishes the given item to each current subscriber?

offer. Publishes the given item, if possible, to each current subscriber by asynchronously invoking its onNext method.

What is publisher and subscriber in Java?

In the publish/subscribe domain, message producers are called publishers and message consumers are called subscribers. They exchange messages by means of a destination called a topic: publishers produce messages to a topic; subscribers subscribe to a topic and consume messages from a topic.

What is publisher Java?

A Publisher interface is a provider of an unbounded number of sequenced elements, publishing them according to the demand received from its Subscriber(s). In response to call Publisher. subscribe(Subscriber), the possible invocation sequences for methods on the Subscriber.

What is Java flow?

Flow API is official support for reactive streams specification since Java 9. It is a combination of both Iterator and Observer patterns. The Flow API is an interoperation specification and not an end-user API like RxJava.


2 Answers

Nope, the predicate function is used to decide whether to retry the publishing operation as mentioned in the docs:

onDrop - if non-null, the handler invoked upon a drop to a subscriber, with arguments of the subscriber and item; if it returns true, an offer is re-attempted (once)

It does not affect whether or not the item is to be sent initially.

EDIT: An example of how drops can occur when using the offer method

I came up with an example of how drops could occur when calling the offer method. I don't think the output is 100% deterministic, but there is a clear difference when it is run several times. You can just change the handler to return true instead of false, to see how the retry reduces the drops due to saturated buffers. In this example, the drop would typically occur because the max buffer capacity is explicitly small (passed to the constructor of SubmissionPublisher). But when the retry is enabled after a small sleep period, the drops are removed:

public class SubmissionPubliserDropTest {

    public static void main(String[] args) throws InterruptedException {
        // Create Publisher for expected items Strings
        // Note the small buffer max capacity to be able to cause drops
        SubmissionPublisher<String> publisher =
                               new SubmissionPublisher<>(ForkJoinPool.commonPool(), 2);
        // Register Subscriber
        publisher.subscribe(new CustomSubscriber<>());
        publisher.subscribe(new CustomSubscriber<>());
        publisher.subscribe(new CustomSubscriber<>());
        // publish 3 items for each subscriber
        for(int i = 0; i < 3; i++) {
            int result = publisher.offer("item" + i, (subscriber, value) -> {
                // sleep for a small period before deciding whether to retry or not
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return false;  // you can switch to true to see that drops are reduced
            });
            // show the number of dropped items
            if(result < 0) {
                System.err.println("dropped: " + result);
            }
        }
        Thread.sleep(3000);
        publisher.close();
    }
}

class CustomSubscriber<T> implements Flow.Subscriber<T> {

    private Subscription sub;

    @Override
    public void onComplete() {
        System.out.println("onComplete");
    }

    @Override
    public void onError(Throwable th) {
        th.printStackTrace();
        sub.cancel();
    }

    @Override
    public void onNext(T arg0) {
        System.out.println("Got : " + arg0 + " --> onNext() callback");
        sub.request(1);
    }

    @Override
    public void onSubscribe(Subscription sub) {
        System.out.println("Subscription done");
        this.sub = sub;
        sub.request(1);
    }

}
like image 115
M A Avatar answered Oct 18 '22 21:10

M A


SubmissionPublisher.offer states that

The item may be dropped by one or more subscribers if resource limits are exceeded, in which case the given handler (if non-null) is invoked, and if it returns true, retried once.

Just to understand, in both of your calls

publisher.offer("item", (subscriber, value) -> true); // the handler would be invoked

publisher.offer("item", (subscriber, value) -> false); // the handler wouldn't be invoked

But still the publisher publishes the given item, to each of its current subscriber. which happens in your current scenario.


The scenario to validate if the handler that you've provided is getting invoked or not by trying to reproduce is tough in terms of resource limitations, as the doc suggests:

The item may be dropped by one or more subscribers if resource limits are exceeded, in which case the given handler (if non-null) is invoked, and if it returns true, retried once.

Yet you can try dropping the items with timeouts set to base minimum using the overloaded method for offer​(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)

timeout - how long to wait for resources for any subscriber before giving up, in units of unit

unit - a TimeUnit determining how to interpret the timeout parameter

Since the offer methods may drop items (either immediately or with bounded timeout), which would provide an opportunity to interpose a handler and then retry.

like image 28
Naman Avatar answered Oct 18 '22 22:10

Naman