Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parallel dispatch of `groupBy` groups in Reactor

I'm learning Reactor, and I'm wondering how to achieve a certain behavior. Let's say I have a stream of incoming messages. Each message is associated with a certain entity and contains some data.

interface Message {
    String getEntityId();
    Data getData();
}

Messages relating to different entities can be processed in parallel. However, messages pertaining to any single entity must be processed one at a time, i.e. processing of message 2 for entity "abc" can not start until processing of message 1 for entity "abc" has finished. While processing of a message is underway, further messages for that entiy should be buffered. Message for other entities can proceed unimpeded. One can think of it as there being on thread per entity running code like this:

public void run() {
    for (;;) {
        // Blocks until there's a message available
        Message msg = messageQueue.nextMessageFor(this.entityId);

        // Blocks until processing is finished
        processMessage(msg);
    }
}

How can I achieve this with React without blocking? The total message rate may be high, but message rate per entity will be very low. The set of entities can be very large, and is not necessarily known in advance.

I guess it might look something like this, but I don't know.

{
    incomingMessages()
            .groupBy(Message::getEntityId)
            .flatMap(entityStream -> entityStream
                    /* ... */
                    .map(msg -> /* process the message */)))
                    /* ... */
}

public static Stream<Message> incomingMessages() { /* ... */ }
like image 550
Viktor Dahl Avatar asked Jul 02 '15 12:07

Viktor Dahl


2 Answers

With ProjectReactor you can solve it in this way:

@Test
public void testMessages() {
    Flux.fromStream(incomingMessages())
            .groupBy(Message::getEntityId)
            .map(g -> g.publishOn(Schedulers.newParallel("groupByPool", 16))) //create new publisher for groups of messages
            .subscribe( //create consumer for main stream
                    stream ->
                            stream.subscribe(this::processMessage) // create consumer for group stream
            );
}

public Stream<Message> incomingMessages() {
    return IntStream.range(0, 100).mapToObj(i -> new Message(i, i % 10));
}

public void processMessage(Message message) {
    System.out.println(String.format("Message: %s processed by the thread: %s", message, Thread.currentThread().getName()));
}

private static class Message {
    private final int id;
    private final int entityId;

    public Message(int id, int entityId) {
        this.id = id;
        this.entityId = entityId;
    }

    public int getId() {
        return id;
    }

    public int getEntityId() {
        return entityId;
    }

    @Override
    public String toString() {
        return "Message{" +
                "id=" + id +
                ", entityId=" + entityId +
                '}';
    }
}

I think the similar solution could be in RxJava

like image 96
Vlad Bochenin Avatar answered Nov 14 '22 14:11

Vlad Bochenin


We had same problem in our project. Entities with same ids had to been process sequentially but with different ids can be process parallel.

Solution occurred to be very simple. Instead of using flatMap We started using concatMap. From docs of concatMap:

 * Transform the elements emitted by this {@link Flux} asynchronously into Publishers,
 * then flatten these inner publishers into a single {@link Flux}, sequentially and
 * preserving order using concatenation.

Code example:

public void receive(Flux<Data> data) {
    data
        .groupBy(Data::getPointID)
        .flatMap(service::process)
        .onErrorContinue(Logging::logError)
        .subscribe();

}

process method:

Flux<SomeEntity> process(Flux<Data> dataFlux) {
    return dataFlux
        .doOnNext(Logging::logReceived)
        .concatMap(this::proceedDefinitionsSearch)
        .doOnNext(Logging::logDefSearch)
        .flatMap(this::processData)
        .doOnNext(Logging::logDataProcessed)
        .concatMap(repository::save)
        .doOnNext(Logging::logSavedEntity);
}
like image 28
Mr Jedi Avatar answered Nov 14 '22 13:11

Mr Jedi