Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement balance fanout in rxjava?

Tags:

rx-java

Basically, we have a producer which produce a random number at a time and several consumers which sleep 1 second then print a number.

Every consumer is exclusive, every number can only have one receiver.
This behavior is similar to JMS queue or BlockingQueue in java.

In akka stream, I can find

balance[T] – (1 input, N outputs) given an input element emits to one of its output ports.

But I can't find any built-in components in rxjava do the same job.
Observable always broadcast message to all observers like pub-sub style. What should I do if I need queue style.

Am I miss anything?

like image 434
XuDong Wu Avatar asked Dec 22 '25 20:12

XuDong Wu


1 Answers

I think the mental model you have doesn't really match what Rx is built around - think streams of many small operations, not messages between large-ish components.

I'd suggest a) a capped thread pool b) an RX scheduler around that and then c:

databaseSource
.fetchItems()
.flatMap(item -> 
   Obsevable.just(item)
   .observeOn(cappedThreadScheduler)
   .map(item -> longRunningOperation(item))
)

OTOH, you can do it like this too:

databaseSource
.fetchItems()
.flatMap(item -> 
   Obsevable.just(item)
   .observeOn(schedulers.io())
   .map(item -> longRunningOperation(item))
   , 16
)

To have at most 16 operations running in parallel.

like image 131
Tassos Bassoukos Avatar answered Dec 24 '25 11:12

Tassos Bassoukos



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!