Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Single Stream and Multiple Subscribers

I'm testing the waters with Java9 reactive streams and RxJava2. I dont really have a preference on either but am looking for some guidance on if this is possible.

  1. I'm creating a pre-configured amount of subscribers like so:

    for(int i = 0; i<MAX_SUBSCRIBERS; i++) {  
         System.out.println("Creating subscriber: " + i);  
         publisher.subscribe(new MySubscriber<>(i + "-subscriber"));   
    }
    
  2. I'm reading in a list of files from a directory for the purposes of concurrent uploads to some 3rd-party system.

    Stream<Path> paths = Files.list(Paths.get("/my/dir/with/files"));
    paths
    .filter((Files::isRegularFile))
    .forEach(pathName -> publisher.submit(pathName.toString()));
    

I'm receiving the following output:

    0-subscriber: /my/dir/with/files/test0.txt received in onNext
    0-subscriber: /my/dir/with/files/test1.txt received in onNext
    1-subscriber: /my/dir/with/files/test0.txt received in onNext
    1-subscriber: /my/dir/with/files/test1.txt received in onNext

Ideally, we should see the following behavior. Each subscriber should be performing work on a unique file.

    0-subscriber: /my/dir/with/files/test0.txt received in onNext
    1-subscriber: /my/dir/with/files/test1.txt received in onNext

Is this possible? Any tips would be awesome!

like image 257
massnerder Avatar asked Jun 14 '26 13:06

massnerder


1 Answers

The Java 9 Flow API consist of 4 interfaces and the SubmissionPublisher class which dispatches every submitted value to all of its Subscribers. There are no JDK tools currently to support your dataflow.

In contrast, RxJava is a rich fluent library with hundreds of operators where you can perform parallel processing without duplication:

    ParallelFlowable<Path> pf = 
            Flowable.<Path, Stream<Path>>using(
                () -> Files.list(Paths.get("/my/dir/with/files")),
                files -> Flowable.fromIterable((Iterable<Path>)() -> files.iterator()),
                AutoCloseable::close
            )
            .parallel(2)
            .runOn(Schedulers.computation())
            .filter(Files::isRegularFile);

pf.subscribe(new Subscriber[] {
    new MySubscriber<>("0-subscriber"),
    new MySubscriber<>("1-subscriber"),
});
like image 59
akarnokd Avatar answered Jun 17 '26 21:06

akarnokd



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!