Basically, we have a producer which produce a random number at a time and several consumers which sleep 1 second then print a number.
Every consumer is exclusive, every number can only have one receiver.
This behavior is similar to JMS queue or BlockingQueue in java.
In akka stream, I can find
balance[T] – (1 input, N outputs) given an input element emits to one of its output ports.
But I can't find any built-in components in rxjava do the same job.
Observable always broadcast message to all observers like pub-sub style. What should I do if I need queue style.
Am I miss anything?
I think the mental model you have doesn't really match what Rx is built around - think streams of many small operations, not messages between large-ish components.
I'd suggest a) a capped thread pool b) an RX scheduler around that and then c:
databaseSource
.fetchItems()
.flatMap(item ->
Obsevable.just(item)
.observeOn(cappedThreadScheduler)
.map(item -> longRunningOperation(item))
)
OTOH, you can do it like this too:
databaseSource
.fetchItems()
.flatMap(item ->
Obsevable.just(item)
.observeOn(schedulers.io())
.map(item -> longRunningOperation(item))
, 16
)
To have at most 16 operations running in parallel.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With