I'm struggling to implement something I assumed would be fairly simple in Rx.
I have a list of items, and I want to have each item emitted with a delay.
It seems the Rx delay() operator just shifts the emission of all items by the specified delay, not each individual item.
Here's some testing code. It groups items in a list. Each group should then have a delay applied before being emitted.
Observable.range(1, 5) .groupBy(n -> n % 5) .flatMap(g -> g.toList()) .delay(50, TimeUnit.MILLISECONDS) .doOnNext(item -> { System.out.println(System.currentTimeMillis() - timeNow); System.out.println(item); System.out.println(" "); }).toList().toBlocking().first();
The result is:
154ms [5] 155ms [2] 155ms [1] 155ms [3] 155ms [4]
But what I would expect to see is something like this:
174ms [5] 230ms [2] 285ms [1] 345ms [3] 399ms [4]
What am I doing wrong?
The simplest way to do this seems to be just using concatMap
and wrapping each item in a delayed Obserable.
long startTime = System.currentTimeMillis(); Observable.range(1, 5) .concatMap(i-> Observable.just(i).delay(50, TimeUnit.MILLISECONDS)) .doOnNext(i-> System.out.println( "Item: " + i + ", Time: " + (System.currentTimeMillis() - startTime) +"ms")) .toCompletable().await();
Prints:
Item: 1, Time: 51ms Item: 2, Time: 101ms Item: 3, Time: 151ms Item: 4, Time: 202ms Item: 5, Time: 252ms
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With