I was trying out some of the new features in Java 9. So I put together a test to have a publisher, emitting numbers at a given rate. I also implemented a Subscriber to listen to those publications and just print them to console.
Although I might not fully understand how to use this Api, because the onNext()
method is not printing anything and getLastItem()
only returns 0.
The only part that seems to work is the onSubscribe()
which correctly initialises the lastItem
variable.
@Test
public void testReactiveStreams(){
//Create Publisher
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
//Register Subscriber
TestIntegerSubscriber subscriber = new TestIntegerSubscriber();
publisher.subscribe(subscriber);
assertTrue(publisher.hasSubscribers());
//Publish items
System.out.println("Publishing Items...");
List.of(1,2,3,4,5).stream().forEach(i -> {
publisher.submit(i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// blah
}
});
assertEquals(5, subscriber.getLastItem());
publisher.close();
}
private class TestIntegerSubscriber implements Flow.Subscriber<Integer> {
private int lastItem;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("Subscribed");
lastItem = 0;
}
@Override
public void onNext(Integer item) {
System.out.println("Received : "+item);
lastItem += 1; // expect increment by 1
assertTrue(lastItem == item);
}
@Override
public void onError(Throwable throwable) {
// nothing for the moment
}
@Override
public void onComplete() {
System.out.println("Completed");
}
public int getLastItem(){
return lastItem;
}
}
Can someone tell me what am I doing wrong in my test please? I would expect the test to print those numbers and return 5 as last item.
I have to say I am only using Observables and Subjects in Angular2, although they seems easier to understand.
The Flow API implements a feature called backpressure (explained here in the context of RxJava), which means the publisher should not be able to overwhelm the subscriber by publishing items faster than it can process them. The way JDK 9 implements that is by having the subscriber request items from the subscription.
For your test, the TestIntegerSubscriber
should request items onSubscription
, let's say 10, and keep track of how often onNext
has been called, so it can request more once the 10 items were pushed.
I wrote a section about the Flow API that goes into a little more detail. It also describes the interaction between publisher, subscriber, and subscription:
Publisher
and a Subscriber
.Publisher::subscribe
.Subscription
and calls Subscriber::onSubscription
with it so the subscriber can store the subscription.Subscription::request
to request a number of items.Subscriber::onNext
. It will never publish more than the requested number of items.Subscriber::onComplete
or Subscriber::onError
, respectively.Subscription::cancel
.If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With