Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring integration - Appropriate pattern for collating/batching service calls

I have a remote service that I'm calling to load pricing data for a product, when a specific event occurs. Once loaded, the product pricing is then broadcast for another consumer to process elsewhere.

Rather than call the remote service on every event, I'd like to batch the events into small groups, and send them in one go.

I've cobbled together the following pattern based on an Aggregator. Although it works, lots of it 'smells' -- especially my SimpleCollatingAggregator. I'm new to Spring Integration, and EIP in general, and suspect I'm misusing components.

The Code

My code is triggered elsewhere in code by calling a method on the below @Gateway:

public interface ProductPricingGateway {    
    @Gateway(requestChannel="product.pricing.outbound.requests")
    public void broadcastPricing(ProductIdentifer productIdentifier);
}

This is then wired to an aggregator, as follows:

<int:channel id="product.pricing.outbound.requests" />
<int:channel id="product.pricing.outbound.requests.batch" />
<int:aggregator input-channel="product.pricing.outbound.requests"
output-channel="product.pricing.outbound.requests.batch" release-strategy="releaseStrategy"
    ref="collatingAggregator" method="collate"
    correlation-strategy-expression="0"
    expire-groups-upon-completion="true" 
    send-partial-result-on-expiry="true"/>  
<bean id="collatingAggregator" class="com.mangofactory.pricing.SimpleCollatingAggregator" />
<bean id="releaseStrategy" class="org.springframework.integration.aggregator.TimeoutCountSequenceSizeReleaseStrategy">
    <!-- Release when: 10 Messages ... or ... -->
    <constructor-arg index="0" value="10" />
    <!-- ... 5 seconds since first request -->
    <constructor-arg index="1" value="5000" />
</bean>

Here's the aggregator implementation:

public class SimpleCollatingAggregator {

    public List<?> collate(List<?> input)
    {
        return input;
    }

}

Finally, this gets consumed on the following @ServiceActivator:

@ServiceActivator(inputChannel="product.pricing.outbound.requests.batch")
public void fetchPricing(List<ProductIdentifer> identifiers)
{
        // omitted
}

Note: In practice, I'm also using @Async, to keep the calling code as quick-to-return as possible. I have a bunch of questions about that too, which I'll move to a seperate question.

Question 1: Given what I'm trying to acheive, is an aggregator pattern an appropriate choice here? This feels like a lot of boilerplate -- is there a better way?

Question 2: I'm using a fixed collation value of 0, to effectively say : 'It doesn't matter how you group these messages, take 'em as they come.'

Is this an appropriate way of achieving this?

Question 3: SimpleCollatingAggregator simply looks wrong to me.

I want this to receive my individual inbound ProductIdentifier objects, and group them into batches, and then pass them along. This works, but is it appropriate? Are there better ways of acheiving the same thing?

like image 592
Marty Pitt Avatar asked Oct 22 '22 13:10

Marty Pitt


1 Answers

Q1: Yes, but see Q3 and the further discussion below.
Q2: That is the correct way to say 'no correlation needed' (but you need the expire-groups-on-completion, which you have).
Q3: In this case, you don't need a custom Aggregator, just use the default (remove the ref and method attributes).

Note that the aggregator is a passive component; the release is triggered by the arrival of a new message; hence the second part of your release strategy will only kick in when a new message arrives (it won't spontaneously release the group after 5 seconds).

However, you can configure a MessageGroupStoreReaper for that purpose: http://static.springsource.org/spring-integration/reference/html/messaging-routing-chapter.html#aggregator

like image 154
Gary Russell Avatar answered Nov 03 '22 22:11

Gary Russell