Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java 9 - how publisher and subscriber works

I am trying to understand how Subscriber and Publisher works in java 9.

Here I have created one subscriber here and using SubmissionPublisher for publishing item .

I am trying to publish 100 strings to subscriber. If I do not make the Client program to sleep (see commented code in MyReactiveApp), I do not see all the items are published.

why is it not waiting for all the strings processed here:

strs.stream().forEach(i -> publisher.submit(i)); // what happens here? 

If I replace the above code with, I see all the strings are printed in console

strs.stream().forEach(System.out::println);

Client program that publishes using SubmissionPublisher.

import java.util.List;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class MyReactiveApp {

    public static void main(String args[]) throws InterruptedException {

        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

        MySubscriber subs = new MySubscriber();
        publisher.subscribe(subs);


        List<String> strs = getStrs();

        System.out.println("Publishing Items to Subscriber");
        strs.stream().forEach(i -> publisher.submit(i));

        /*while (strs.size() != subs.getCounter()) {
            Thread.sleep(10);
        }*/

        //publisher.close();

        System.out.println("Exiting the app");

    }

    private static List<String> getStrs(){

        return Stream.generate(new Supplier<String>() {
            int i =1;
            @Override
            public String get() {
                return "name "+ (i++);
            }
        }).limit(100).collect(Collectors.toList());
    }

}

Subscriber

import java.util.concurrent.Flow.Subscription;

public class MySubscriber implements java.util.concurrent.Flow.Subscriber<String>{

    private Subscription subscription;

    private int counter = 0;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(100);

    }

    @Override
    public void onNext(String item) {
        System.out.println(this.getClass().getSimpleName()+" item "+item);
        //subscription.request(1);
        counter++;

    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println(this.getClass().getName()+ " an error occured "+throwable);

    }

    @Override
    public void onComplete() {
        System.out.println("activity completed");

    }
    public int getCounter() {
        return counter;
    }

}

output:

Publishing Items to Subscriber
MySubscriber item name 1
MySubscriber item name 2
MySubscriber item name 3
MySubscriber item name 4
MySubscriber item name 5
Exiting the app
MySubscriber item name 6
MySubscriber item name 7
MySubscriber item name 8
MySubscriber item name 9
MySubscriber item name 10
MySubscriber item name 11
MySubscriber item name 12
like image 790
Shiva Avatar asked Dec 05 '18 12:12

Shiva


People also ask

What is publisher and subscriber in Java?

In the publish/subscribe domain, message producers are called publishers and message consumers are called subscribers. They exchange messages by means of a destination called a topic: publishers produce messages to a topic; subscribers subscribe to a topic and consume messages from a topic.

What is the difference between a publisher and a subscriber?

The provider of the information is called a publisher . Publishers supply information about a subject without needing to know anything about the applications that are interested in the information. The consumer of the information is called a subscriber .

What is a publisher in Java?

A Publisher is a provider of an unbounded number of sequenced elements publishing them according to demand received from its Subscribers. Publisher<T> interface is responsible for publishing elements of type T and provides a subscribe() method for subscribers to connect to it.

Can a publisher also be a subscriber?

Yes, a component can be both subscriber and publisher.


1 Answers

SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

Creates a new SubmissionPublisher using the ForkJoinPool.commonPool() for async delivery to subscribers

see: https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/SubmissionPublisher.html#SubmissionPublisher--

So actually

    strs.stream().forEach(i -> publisher.submit(i));

enqueues all submissions and delivers them asynchronously on another thread. But then the application is terminated. This is independent of the progress of the worker thread. This means that the application is terminated regardless of how many elements the worker thread has already delivered.

This can be different for each run. In the worst case, the application could be terminated before the first item is delivered.

Threads

If you want to verify that the main method of MyReactiveApp and the delivery in MySubscriber's onNext happen on different threads you can print out the names of the corresponding threads, e.g. in MyReactiveApp's main:

System.out.println(Thread.currentThread().getName()) 

will output main as thread name.

Whereas MySubscriber's onNext method will e.g. output something like ForkJoinPool.commonPool-worker-1.

User and Deamon Threads

Why does the application terminate although we still have a running thread?

There are two kind of threads in Java:

  • user threads
  • daemon threads

A Java program terminates when no longer any user threads are running, even when deamon threads are still running.

The main thread is a user thread. The SubmissionPublisher uses here worker threads from ForkJoinPool.commonPool(). These are daemon threads.

All worker threads are initialized with Thread.isDaemon() set true.

https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/ForkJoinPool.html

like image 138
Stephan Schlecht Avatar answered Sep 22 '22 00:09

Stephan Schlecht