Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Receiving items from reactive stream SubmissionPublisher

I was trying out some of the new features in Java 9. So I put together a test to have a publisher, emitting numbers at a given rate. I also implemented a Subscriber to listen to those publications and just print them to console.

Although I might not fully understand how to use this Api, because the onNext() method is not printing anything and getLastItem() only returns 0.

The only part that seems to work is the onSubscribe() which correctly initialises the lastItem variable.

@Test
public void testReactiveStreams(){
    //Create Publisher
    SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();

    //Register Subscriber
    TestIntegerSubscriber subscriber = new TestIntegerSubscriber();
    publisher.subscribe(subscriber);

    assertTrue(publisher.hasSubscribers());

    //Publish items
    System.out.println("Publishing Items...");

    List.of(1,2,3,4,5).stream().forEach(i -> {
        publisher.submit(i);
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            // blah
        }
    });
    assertEquals(5, subscriber.getLastItem());

    publisher.close();
}


private class TestIntegerSubscriber implements Flow.Subscriber<Integer> {

    private int lastItem;
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.println("Subscribed");
        lastItem = 0;
    }

    @Override
    public void onNext(Integer item) {
        System.out.println("Received : "+item);
        lastItem += 1; // expect increment by 1
        assertTrue(lastItem == item);
    }

    @Override
    public void onError(Throwable throwable) {
        // nothing for the moment
    }

    @Override
    public void onComplete() {
        System.out.println("Completed");
    }

    public int getLastItem(){
        return lastItem;
    }
}

Can someone tell me what am I doing wrong in my test please? I would expect the test to print those numbers and return 5 as last item.

I have to say I am only using Observables and Subjects in Angular2, although they seems easier to understand.

like image 772
nuvio Avatar asked Mar 14 '17 18:03

nuvio


1 Answers

The Flow API implements a feature called backpressure (explained here in the context of RxJava), which means the publisher should not be able to overwhelm the subscriber by publishing items faster than it can process them. The way JDK 9 implements that is by having the subscriber request items from the subscription.

For your test, the TestIntegerSubscriber should request items onSubscription, let's say 10, and keep track of how often onNext has been called, so it can request more once the 10 items were pushed.

I wrote a section about the Flow API that goes into a little more detail. It also describes the interaction between publisher, subscriber, and subscription:

  1. Create a Publisher and a Subscriber.
  2. Subscribe the subscriber with Publisher::subscribe.
  3. The publisher creates a Subscription and calls Subscriber::onSubscription with it so the subscriber can store the subscription.
  4. At some point the subscriber calls Subscription::request to request a number of items.
  5. The publisher starts handing items to the subscriber by calling Subscriber::onNext. It will never publish more than the requested number of items.
  6. The publisher might at some point be depleted or run into trouble and call Subscriber::onComplete or Subscriber::onError, respectively.
  7. The subscriber might either continue to request more items every now and then or cut the connection by calling Subscription::cancel.
like image 53
Nicolai Parlog Avatar answered Oct 13 '22 07:10

Nicolai Parlog