Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring Integration: how to process multiple messages at one time?

I have the following configuration:

<bean id="mongoDbMessageStore" class="org.springframework.integration.mongodb.store.MongoDbMessageStore">
    <constructor-arg ref="mongoDbFactoryDefault"/>
</bean>

<!-- the queue capacity is unbounded as it uses a persistent store-->
<int:channel id="logEntryChannel">
    <int:queue message-store="mongoDbMessageStore"/>
</int:channel>

<!-- the poller will process 10 messages every 6 seconds -->
<int:outbound-channel-adapter channel="logEntryChannel" ref="logEntryPostProcessorReceiver" method="handleMessage">
    <int:poller max-messages-per-poll="10" fixed-rate="6000"/>
</int:outbound-channel-adapter>

And the message handler defined as

@Override
public void handleMessage(Message<?> message) throws MessagingException {
    Object payload = message.getPayload();
    if (payload instanceof LogEntry) {
        LogEntry logEntry = (LogEntry) payload;
        String app = (String) message.getHeaders().get("app");
        logger.info("LogEntry Received - " + app + " " + logEntry.getEntityType() + " " + logEntry.getAction() + " " + logEntry.getEventTime());
        logEntryPostProcessService.postProcess(app, logEntry);
    } else {
        throw new MessageRejectedException(message, "Unknown data type has been received.");
    }
}

What I would like to have is something like

@Override
public void handleMessage(List<Message<?>> messages) throws MessagingException {
...
}

so basically the poller sends all the 10 messages in one call instead of calling the method 10 times one per messages.

The reason for this is to have the possibility to bulk process all the messages in chunk therefore improving performance.

like image 881
selvinsource Avatar asked Aug 29 '14 08:08

selvinsource


1 Answers

That's true, because of (AbstractPollingEndpoint):

taskExecutor.execute(new Runnable() {
    @Override
    public void run() {
        int count = 0;
        while (initialized && (maxMessagesPerPoll <= 0 || count < maxMessagesPerPoll)) {
...
            if (!pollingTask.call()) {
                break;
            }
...
    }
});

Hence all your messages (max-messages-per-poll) are handled within the same thread. However they are sent to handler one by one and not as an entire bunch.

To process the in parallel you should use ExecutorChannel before your logEntryPostProcessorReceiver. Something like this:

<channel id="executorChannel">
   <dispatcher task-executor="threadPoolExecutor"/>
</channel>

<bridge input-channel="logEntryChannel" output-channel="executorChannel">
   <poller max-messages-per-poll="10" fixed-rate="6000"/>
</bridge>

<outbound-channel-adapter channel="executorChannel" ref="logEntryPostProcessorReceiver" method="handleMessage"/>

UPDATE

To process messages as one batch you should aggregate them. Since they all are result of polling endpoint, there is no sequenceDetails in the messages. You can overcome it with some fake value for correlationId:

<aggregator correlation-strategy-expression="T(Thread).currentThread().id"
        release-strategy-expression="size() == 10"/>

Where size() == 10 should be equal to max-messages-per-poll.

After that your logEntryPostProcessorReceiver has to apply the list of payloads. Or just one message, which payload is a list as a result from <aggregator>.

like image 133
Artem Bilan Avatar answered Oct 09 '22 09:10

Artem Bilan