I´ve been playing with Java Flow offer
operator but after read the documentation and do my test I dont understand.
Here my test
@Test
public void offer() throws InterruptedException {
//Create Publisher for expected items Strings
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
//Register Subscriber
publisher.subscribe(new CustomSubscriber<>());
publisher.subscribe(new CustomSubscriber<>());
publisher.subscribe(new CustomSubscriber<>());
publisher.offer("item", (subscriber, value) -> false);
Thread.sleep(500);
}
The offer operator receive an item to be emitted and a BiPredicate function, and as far as I understand reading the documentation, only in case that the predicate function is true the item it will be emitted.
Bur after pass the test the result is
Subscription done:
Subscription done:
Subscription done:
Got : item --> onNext() callback
Got : item --> onNext() callback
Got : item --> onNext() callback
There´s no change in the result if instead of false I return true.
Anybody can explain me this operator a little bit better please.
offer. Publishes the given item, if possible, to each current subscriber by asynchronously invoking its onNext method.
In the publish/subscribe domain, message producers are called publishers and message consumers are called subscribers. They exchange messages by means of a destination called a topic: publishers produce messages to a topic; subscribers subscribe to a topic and consume messages from a topic.
A Publisher interface is a provider of an unbounded number of sequenced elements, publishing them according to the demand received from its Subscriber(s). In response to call Publisher. subscribe(Subscriber), the possible invocation sequences for methods on the Subscriber.
Flow API is official support for reactive streams specification since Java 9. It is a combination of both Iterator and Observer patterns. The Flow API is an interoperation specification and not an end-user API like RxJava.
Nope, the predicate function is used to decide whether to retry the publishing operation as mentioned in the docs:
onDrop
- if non-null, the handler invoked upon a drop to a subscriber, with arguments of the subscriber and item; if it returns true, an offer is re-attempted (once)
It does not affect whether or not the item is to be sent initially.
EDIT: An example of how drops can occur when using the offer
method
I came up with an example of how drops could occur when calling the offer
method. I don't think the output is 100% deterministic, but there is a clear difference when it is run several times. You can just change the handler to return true instead of false, to see how the retry reduces the drops due to saturated buffers. In this example, the drop would typically occur because the max buffer capacity is explicitly small (passed to the constructor of SubmissionPublisher
). But when the retry is enabled after a small sleep period, the drops are removed:
public class SubmissionPubliserDropTest {
public static void main(String[] args) throws InterruptedException {
// Create Publisher for expected items Strings
// Note the small buffer max capacity to be able to cause drops
SubmissionPublisher<String> publisher =
new SubmissionPublisher<>(ForkJoinPool.commonPool(), 2);
// Register Subscriber
publisher.subscribe(new CustomSubscriber<>());
publisher.subscribe(new CustomSubscriber<>());
publisher.subscribe(new CustomSubscriber<>());
// publish 3 items for each subscriber
for(int i = 0; i < 3; i++) {
int result = publisher.offer("item" + i, (subscriber, value) -> {
// sleep for a small period before deciding whether to retry or not
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return false; // you can switch to true to see that drops are reduced
});
// show the number of dropped items
if(result < 0) {
System.err.println("dropped: " + result);
}
}
Thread.sleep(3000);
publisher.close();
}
}
class CustomSubscriber<T> implements Flow.Subscriber<T> {
private Subscription sub;
@Override
public void onComplete() {
System.out.println("onComplete");
}
@Override
public void onError(Throwable th) {
th.printStackTrace();
sub.cancel();
}
@Override
public void onNext(T arg0) {
System.out.println("Got : " + arg0 + " --> onNext() callback");
sub.request(1);
}
@Override
public void onSubscribe(Subscription sub) {
System.out.println("Subscription done");
this.sub = sub;
sub.request(1);
}
}
SubmissionPublisher.offer
states that
The item may be dropped by one or more subscribers if resource limits are exceeded, in which case the given handler (if non-null) is invoked, and if it returns
true
, retried once.
Just to understand, in both of your calls
publisher.offer("item", (subscriber, value) -> true); // the handler would be invoked
publisher.offer("item", (subscriber, value) -> false); // the handler wouldn't be invoked
But still the publisher
publishes the given item, to each of its current subscriber. which happens in your current scenario.
The scenario to validate if the handler that you've provided is getting invoked or not by trying to reproduce is tough in terms of resource limitations, as the doc suggests:
The item may be dropped by one or more subscribers if resource limits are exceeded, in which case the given handler (if non-null) is invoked, and if it returns true, retried once.
Yet you can try dropping the items with timeouts set to base minimum using
the overloaded method for offer(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
timeout
- how long to wait for resources for any subscriber before giving up, in units of unit
unit
- a TimeUnit determining how to interpret the timeout parameter
Since the offer
methods may drop items (either immediately or with bounded timeout), which would provide an opportunity to interpose a handler and then retry.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With