I'm learning Reactor, and I'm wondering how to achieve a certain behavior. Let's say I have a stream of incoming messages. Each message is associated with a certain entity and contains some data.
interface Message {
String getEntityId();
Data getData();
}
Messages relating to different entities can be processed in parallel.
However, messages pertaining to any single entity must be processed one at a time, i.e. processing of message 2 for entity "abc"
can not start until processing of message 1 for entity "abc"
has finished.
While processing of a message is underway, further messages for that entiy should be buffered.
Message for other entities can proceed unimpeded.
One can think of it as there being on thread per entity running code like this:
public void run() {
for (;;) {
// Blocks until there's a message available
Message msg = messageQueue.nextMessageFor(this.entityId);
// Blocks until processing is finished
processMessage(msg);
}
}
How can I achieve this with React without blocking? The total message rate may be high, but message rate per entity will be very low. The set of entities can be very large, and is not necessarily known in advance.
I guess it might look something like this, but I don't know.
{
incomingMessages()
.groupBy(Message::getEntityId)
.flatMap(entityStream -> entityStream
/* ... */
.map(msg -> /* process the message */)))
/* ... */
}
public static Stream<Message> incomingMessages() { /* ... */ }
With ProjectReactor you can solve it in this way:
@Test
public void testMessages() {
Flux.fromStream(incomingMessages())
.groupBy(Message::getEntityId)
.map(g -> g.publishOn(Schedulers.newParallel("groupByPool", 16))) //create new publisher for groups of messages
.subscribe( //create consumer for main stream
stream ->
stream.subscribe(this::processMessage) // create consumer for group stream
);
}
public Stream<Message> incomingMessages() {
return IntStream.range(0, 100).mapToObj(i -> new Message(i, i % 10));
}
public void processMessage(Message message) {
System.out.println(String.format("Message: %s processed by the thread: %s", message, Thread.currentThread().getName()));
}
private static class Message {
private final int id;
private final int entityId;
public Message(int id, int entityId) {
this.id = id;
this.entityId = entityId;
}
public int getId() {
return id;
}
public int getEntityId() {
return entityId;
}
@Override
public String toString() {
return "Message{" +
"id=" + id +
", entityId=" + entityId +
'}';
}
}
I think the similar solution could be in RxJava
We had same problem in our project. Entities with same ids had to been process sequentially but with different ids can be process parallel.
Solution occurred to be very simple. Instead of using flatMap We started using concatMap. From docs of concatMap:
* Transform the elements emitted by this {@link Flux} asynchronously into Publishers,
* then flatten these inner publishers into a single {@link Flux}, sequentially and
* preserving order using concatenation.
Code example:
public void receive(Flux<Data> data) {
data
.groupBy(Data::getPointID)
.flatMap(service::process)
.onErrorContinue(Logging::logError)
.subscribe();
}
process method:
Flux<SomeEntity> process(Flux<Data> dataFlux) {
return dataFlux
.doOnNext(Logging::logReceived)
.concatMap(this::proceedDefinitionsSearch)
.doOnNext(Logging::logDefSearch)
.flatMap(this::processData)
.doOnNext(Logging::logDataProcessed)
.concatMap(repository::save)
.doOnNext(Logging::logSavedEntity);
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With