Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava delay for each item of list emitted

I'm struggling to implement something I assumed would be fairly simple in Rx.

I have a list of items, and I want to have each item emitted with a delay.

It seems the Rx delay() operator just shifts the emission of all items by the specified delay, not each individual item.

Here's some testing code. It groups items in a list. Each group should then have a delay applied before being emitted.

Observable.range(1, 5)     .groupBy(n -> n % 5)     .flatMap(g -> g.toList())     .delay(50, TimeUnit.MILLISECONDS)     .doOnNext(item -> {         System.out.println(System.currentTimeMillis() - timeNow);         System.out.println(item);         System.out.println(" ");     }).toList().toBlocking().first(); 

The result is:

154ms [5]  155ms [2]  155ms [1]  155ms [3]  155ms [4] 

But what I would expect to see is something like this:

174ms [5]  230ms [2]  285ms [1]  345ms [3]  399ms [4] 

What am I doing wrong?

like image 817
athor Avatar asked Oct 22 '15 21:10

athor


1 Answers

The simplest way to do this seems to be just using concatMap and wrapping each item in a delayed Obserable.

long startTime = System.currentTimeMillis(); Observable.range(1, 5)         .concatMap(i-> Observable.just(i).delay(50, TimeUnit.MILLISECONDS))         .doOnNext(i-> System.out.println(                 "Item: " + i + ", Time: " + (System.currentTimeMillis() - startTime) +"ms"))         .toCompletable().await(); 

Prints:

Item: 1, Time: 51ms Item: 2, Time: 101ms Item: 3, Time: 151ms Item: 4, Time: 202ms Item: 5, Time: 252ms 
like image 78
Magnus Avatar answered Oct 07 '22 23:10

Magnus