I have a remote service that I'm calling to load pricing data for a product, when a specific event occurs. Once loaded, the product pricing is then broadcast for another consumer to process elsewhere.
Rather than call the remote service on every event, I'd like to batch the events into small groups, and send them in one go.
I've cobbled together the following pattern based on an Aggregator. Although it works, lots of it 'smells' -- especially my SimpleCollatingAggregator
. I'm new to Spring Integration, and EIP in general, and suspect I'm misusing components.
My code is triggered elsewhere in code by calling a method on the below @Gateway
:
public interface ProductPricingGateway {
@Gateway(requestChannel="product.pricing.outbound.requests")
public void broadcastPricing(ProductIdentifer productIdentifier);
}
This is then wired to an aggregator, as follows:
<int:channel id="product.pricing.outbound.requests" />
<int:channel id="product.pricing.outbound.requests.batch" />
<int:aggregator input-channel="product.pricing.outbound.requests"
output-channel="product.pricing.outbound.requests.batch" release-strategy="releaseStrategy"
ref="collatingAggregator" method="collate"
correlation-strategy-expression="0"
expire-groups-upon-completion="true"
send-partial-result-on-expiry="true"/>
<bean id="collatingAggregator" class="com.mangofactory.pricing.SimpleCollatingAggregator" />
<bean id="releaseStrategy" class="org.springframework.integration.aggregator.TimeoutCountSequenceSizeReleaseStrategy">
<!-- Release when: 10 Messages ... or ... -->
<constructor-arg index="0" value="10" />
<!-- ... 5 seconds since first request -->
<constructor-arg index="1" value="5000" />
</bean>
Here's the aggregator implementation:
public class SimpleCollatingAggregator {
public List<?> collate(List<?> input)
{
return input;
}
}
Finally, this gets consumed on the following @ServiceActivator
:
@ServiceActivator(inputChannel="product.pricing.outbound.requests.batch")
public void fetchPricing(List<ProductIdentifer> identifiers)
{
// omitted
}
Note: In practice, I'm also using @Async, to keep the calling code as quick-to-return as possible. I have a bunch of questions about that too, which I'll move to a seperate question.
Question 1: Given what I'm trying to acheive, is an aggregator pattern an appropriate choice here? This feels like a lot of boilerplate -- is there a better way?
Question 2:
I'm using a fixed collation value of 0
, to effectively say : 'It doesn't matter how you group these messages, take 'em as they come.'
Is this an appropriate way of achieving this?
Question 3:
SimpleCollatingAggregator
simply looks wrong to me.
I want this to receive my individual inbound ProductIdentifier
objects, and group them into batches, and then pass them along. This works, but is it appropriate? Are there better ways of acheiving the same thing?
Q1: Yes, but see Q3 and the further discussion below.
Q2: That is the correct way to say 'no correlation needed' (but you need the expire-groups-on-completion
, which you have).
Q3: In this case, you don't need a custom Aggregator, just use the default (remove the ref and method attributes).
Note that the aggregator is a passive component; the release is triggered by the arrival of a new message; hence the second part of your release strategy will only kick in when a new message arrives (it won't spontaneously release the group after 5 seconds).
However, you can configure a MessageGroupStoreReaper
for that purpose: http://static.springsource.org/spring-integration/reference/html/messaging-routing-chapter.html#aggregator
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With