I have tried to run each computation on different thread, but whatever Scheduler i used it running always on single thread.
PublishProcessor processor = PublishProcessor.create();
processor
.doOnNext(i ->System.out.println(i.toString()+" emitted on "+Thread.currentThread().getId()))
.observeOn(Schedulers.newThread()).subscribe(i -> {
System.out.println(i.toString()+" received on "+Thread.currentThread().getId());
Thread.currentThread().sleep(5000);
});
processor.onNext(2);
processor.onNext(3);
processor.onNext(4);
processor.onNext(5);
processor.onNext(6);
while (true) {}
The output would be:
2 emitted on 1
3 emitted on 1
4 emitted on 1
5 emitted on 1
6 emitted on 1
2 received on 13
3 received on 13
4 received on 13
5 received on 13
6 received on 13
Thread 13 processes the next value only after sleeping, but i want to have few separate sleeping threads in that case. Can someone explain what I'm doing wrong, please?
.observeOn(...) makes effect by changing to the item flow to another thread but it's always the same thread.
If you want to create a new thread for every item you can do
processor
.doOnNext(i ->System.out.println(i.toString()+" emitted on "+Thread.currentThread().getId()))
.flatMap(item -> Observable.just(item)
.subscribeOn(Schedulers.newThread())) // make every item change to a new thread
.subscribe(i -> {
System.out.println(i.toString()+" received on "+Thread.currentThread().getId());
Thread.currentThread().sleep(5000);
});
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With