I have the following configuration:
<bean id="mongoDbMessageStore" class="org.springframework.integration.mongodb.store.MongoDbMessageStore">
<constructor-arg ref="mongoDbFactoryDefault"/>
</bean>
<!-- the queue capacity is unbounded as it uses a persistent store-->
<int:channel id="logEntryChannel">
<int:queue message-store="mongoDbMessageStore"/>
</int:channel>
<!-- the poller will process 10 messages every 6 seconds -->
<int:outbound-channel-adapter channel="logEntryChannel" ref="logEntryPostProcessorReceiver" method="handleMessage">
<int:poller max-messages-per-poll="10" fixed-rate="6000"/>
</int:outbound-channel-adapter>
And the message handler defined as
@Override
public void handleMessage(Message<?> message) throws MessagingException {
Object payload = message.getPayload();
if (payload instanceof LogEntry) {
LogEntry logEntry = (LogEntry) payload;
String app = (String) message.getHeaders().get("app");
logger.info("LogEntry Received - " + app + " " + logEntry.getEntityType() + " " + logEntry.getAction() + " " + logEntry.getEventTime());
logEntryPostProcessService.postProcess(app, logEntry);
} else {
throw new MessageRejectedException(message, "Unknown data type has been received.");
}
}
What I would like to have is something like
@Override
public void handleMessage(List<Message<?>> messages) throws MessagingException {
...
}
so basically the poller sends all the 10 messages in one call instead of calling the method 10 times one per messages.
The reason for this is to have the possibility to bulk process all the messages in chunk therefore improving performance.
That's true, because of (AbstractPollingEndpoint
):
taskExecutor.execute(new Runnable() {
@Override
public void run() {
int count = 0;
while (initialized && (maxMessagesPerPoll <= 0 || count < maxMessagesPerPoll)) {
...
if (!pollingTask.call()) {
break;
}
...
}
});
Hence all your messages (max-messages-per-poll
) are handled within the same thread.
However they are sent to handler one by one and not as an entire bunch.
To process the in parallel you should use ExecutorChannel
before your logEntryPostProcessorReceiver
. Something like this:
<channel id="executorChannel">
<dispatcher task-executor="threadPoolExecutor"/>
</channel>
<bridge input-channel="logEntryChannel" output-channel="executorChannel">
<poller max-messages-per-poll="10" fixed-rate="6000"/>
</bridge>
<outbound-channel-adapter channel="executorChannel" ref="logEntryPostProcessorReceiver" method="handleMessage"/>
UPDATE
To process messages as one batch you should aggregate
them. Since they all are result of polling endpoint
, there is no sequenceDetails
in the messages. You can overcome it with some fake value for correlationId
:
<aggregator correlation-strategy-expression="T(Thread).currentThread().id"
release-strategy-expression="size() == 10"/>
Where size() == 10
should be equal to max-messages-per-poll
.
After that your logEntryPostProcessorReceiver
has to apply the list
of payload
s. Or just one message, which payload
is a list as a result from <aggregator>
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With