Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava scheduler always works in the same thread with sleep

I have tried to run each computation on different thread, but whatever Scheduler i used it running always on single thread.

PublishProcessor processor = PublishProcessor.create();

    processor
        .doOnNext(i ->System.out.println(i.toString()+" emitted on "+Thread.currentThread().getId()))
        .observeOn(Schedulers.newThread()).subscribe(i -> {
            System.out.println(i.toString()+" received on "+Thread.currentThread().getId());
            Thread.currentThread().sleep(5000);
        });
    processor.onNext(2);
    processor.onNext(3);
    processor.onNext(4);
    processor.onNext(5);
    processor.onNext(6);


    while (true) {}

The output would be:

2 emitted on 1
3 emitted on 1
4 emitted on 1
5 emitted on 1
6 emitted on 1
2 received on 13
3 received on 13
4 received on 13
5 received on 13
6 received on 13

Thread 13 processes the next value only after sleeping, but i want to have few separate sleeping threads in that case. Can someone explain what I'm doing wrong, please?

like image 975
V. Uspensky Avatar asked Dec 16 '25 13:12

V. Uspensky


1 Answers

.observeOn(...) makes effect by changing to the item flow to another thread but it's always the same thread.

If you want to create a new thread for every item you can do

processor
    .doOnNext(i ->System.out.println(i.toString()+" emitted on "+Thread.currentThread().getId()))
    .flatMap(item -> Observable.just(item)
                         .subscribeOn(Schedulers.newThread()))    // make every item change to a new thread
    .subscribe(i -> {
        System.out.println(i.toString()+" received on "+Thread.currentThread().getId());
        Thread.currentThread().sleep(5000);
    });
like image 61
Alberto S. Avatar answered Dec 19 '25 05:12

Alberto S.



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!