I am learning reactor core and following this https://www.baeldung.com/reactor-core
ArrayList<Integer> arrList = new ArrayList<Integer>();
System.out.println("Before: " + arrList);
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribeOn(Schedulers.parallel())
.subscribe(arrList::add);
System.out.println("After: " + arrList);
when I execute above line of code, gives out.
Before: []
[DEBUG] (main) Using Console logging
After: []
Above lines of code should start execution in another thread but it is not working at all. Can somebody help me on this ?
As mentioned in the Reactor documentation for the various subscribe methods:
Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.
This means that the end of the main method is reached, and thus the main thread exits before any thread is able to subscribe to the Reactive chain, as mentioned by Piotr.
What you want to do is wait till the entire chain completes before printing the contents of the array.
The naive way of doing this is:
ArrayList<Integer> arrList = new ArrayList<>();
System.out.println("Before: " + arrList);
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribeOn(Schedulers.parallel())
.doOnNext(arrList::add)
.blockLast();
System.out.println("After: " + arrList);
Here, you block execution on the main thread until the last element on the Flux is processed. Thus the last System.out will not execute until your ArrayList is fully populated.
Remember that the way code will run in a Console application vs a server environment like Netty, is a little different. The only way to make a Console application wait for all subscriptions to kick in, is to block.
But blocking is not permitted on parallel threads. So this approach would not work in, say, a Netty environment. There your server would be running until explicitly shutdown, and so a subscribe would be fine.
However, in the above code snippet you are blocking not just to prevent the application from exiting, but also to wait before you read the data that has been populated.
An improvement to the above code would be as follows:
ArrayList<Integer> arrList = new ArrayList<>();
System.out.println("Before: " + arrList);
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribeOn(Schedulers.parallel())
.doOnNext(arrList::add)
.doOnComplete(() -> System.out.println("After: " + arrList))
.blockLast();
Even here, the doOnComplete accesses data from outside the reactive chain. To prevent this, you would collect the elements of the Flux in the chain itself, like this:
System.out.println("Before.");
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribeOn(Schedulers.parallel())
.collectList()
.doOnSuccess(list -> System.out.println("After: " + list))
.block();
Again, remember that when running in Netty, (say a Spring Webflux application), the above code would end in a subscribe().
Note, though, that switching from a Flux to a List (or any Collection) means you are switching out of the reactive paradigm into imperative programming. You should be able to implement any functionality within the Reactive paradigm itself.
I think there is some confusion. When you call subscribeOn(Schedulers.parallel()). You specify that you want to receive items on the different thread. Also you have to slow down your code so the subscribe cen actually kick in (that is why I added Thread.sleep(100)). If you run the code that i have passed it works. You see there is no magic synchronization mechanism in reactor.
ArrayList<Integer> arrList = new ArrayList<Integer>();
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribeOn(Schedulers.parallel())
.subscribe(
t -> {
System.out.println(t + " thread id: " + Thread.currentThread().getId());
arrList.add(t);
}
);
System.out.println("size of arrList(before the wait): " + arrList.size());
System.out.println("Thread id: "+ Thread.currentThread().getId() + ": id of main thread ");
Thread.sleep(100);
System.out.println("size of arrList(after the wait): " + arrList.size());
If you want to add your items to the list in parallel reactor is not a good choice. Better to use parallel streams in java 8.
List<Integer> collect = Stream.of(1, 2, 3, 4)
.parallel()
.map(i -> i * 2)
.collect(Collectors.toList());
That tutorial you posted is not very precise when it comes to concurrency part. To the author credit he/she says that more articles is to come. But I don't think that should post that particular example at all as it creates confusion. I suggest not trusting resources on the internet that much :)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With