Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring Integration Java DSL -- Configuration of aggregator

Tags:

I have a very simple integration flow, where a RESTful request is forwarded to two providers using a publish-subscribe channel. The result from both RESTful services is then aggregated in a single array. The sketch of the integration flow is as shown below:

@Bean
IntegrationFlow flow() throws Exception {
    return IntegrationFlows.from("inputChannel")
            .publishSubscribeChannel(s -> s.applySequence(true)
                .subscribe(f -> f
                        .handle(Http.outboundGateway("http://provider1.com/...")
                                .httpMethod(HttpMethod.GET)
                                .expectedResponseType(ItemDTO[].class))
                ).subscribe(f -> f
                        .handle(Http.outboundGateway("http://provider2.com/...")
                                .httpMethod(HttpMethod.GET)
                                .expectedResponseType(ItemDTO[].class)
                        )
                )
            )
            .aggregate()
            .get();
}

However, when running my code, the resulting array contains the items returned by only one of the RESTful services. Is there any configuration step I am missing?

UPDATE

The following version corresponds to the full solution, taking into account Artem's comments.

@Bean
IntegrationFlow flow() throws Exception {
    return IntegrationFlows.from("inputChannel-scatter")
            .publishSubscribeChannel(s -> s.applySequence(true)
                .subscribe(f -> f
                        .handle(Http.outboundGateway("http://provider1.com/...")
                                .httpMethod(HttpMethod.GET)
                                .expectedResponseType(ItemDTO[].class))
                        .channel("inputChannel-gather"))
                .subscribe(f -> f
                        .handle(Http.outboundGateway("http://provider2.com/...")
                                .httpMethod(HttpMethod.GET)
                                .expectedResponseType(ItemDTO[].class))
                        .channel("inputChannel-gather")))
            .get();
}

@Bean
IntegrationFlow gatherFlow() {
    return IntegrationFlows.from("inputChannel-gather")
            .aggregate(a -> a.outputProcessor(g ->  new GenericMessage<ItemDTO[]>(
                        g.getMessages().stream()
                                .flatMap(m -> Arrays.stream((ItemDTO[]) m.getPayload()))
                                .collect(Collectors.toList()).toArray(new ItemDTO[0]))))
            .get();
}
like image 480
user3329862 Avatar asked Apr 20 '16 11:04

user3329862


People also ask

What is DSL in Spring Integration?

The Java DSL for Spring Integration is essentially a facade for Spring Integration. The DSL provides a simple way to embed Spring Integration Message Flows into your application by using the fluent Builder pattern together with existing Java configuration from Spring Framework and Spring Integration.

What are the algorithms that are paired with an aggregator called in Spring Integration?

Two related concepts are correlation and release. Correlation determines how messages are grouped for aggregation. In Spring Integration, correlation is done by default, based on the IntegrationMessageHeaderAccessor. CORRELATION_ID message header.

What is service activator in Spring Integration?

The service activator is the endpoint type for connecting any Spring-managed object to an input channel so that it may play the role of a service. If the service produces output, it may also be connected to an output channel.

What is Bridge in Spring Integration?

A bridge in Spring Integration is used to connect two message channels or adapters if for any reason they can't connect directly.


1 Answers

Actually it doesn't work that way.

The .aggregate() is a third subscriber to that publishSubscribeChannel.

You have to sever your flow to two of them. Like this:

    @Bean
    public IntegrationFlow publishSubscribeFlow() {
        return flow -> flow
                .publishSubscribeChannel(s -> s
                        .applySequence(true)
                        .subscribe(f -> f
                                .handle((p, h) -> "Hello")
                                .channel("publishSubscribeAggregateFlow.input"))
                        .subscribe(f -> f
                                .handle((p, h) -> "World!")
                                .channel("publishSubscribeAggregateFlow.input"))
                );
    }

    @Bean
    public IntegrationFlow publishSubscribeAggregateFlow() {
        return flow -> flow
                .aggregate(a -> a.outputProcessor(g -> g.getMessages()
                        .stream()
                        .<String>map(m -> (String) m.getPayload())
                        .collect(Collectors.joining(" "))))
                .channel(c -> c.queue("subscriberAggregateResult"));
    }

Pay attention, please, to the .channel("publishSubscribeAggregateFlow.input") usage from both subscribers.

To be honest that is a point of any publish-subscribe. We must know where to send the result of all subscribers if we are going to aggregate them.

Your use-case recalls me the Scatter-Gather EIP pattern.

We don't have its implementation in the DSL yet. Feel free to raise a GH issue on the matter and we will try to handle it in the upcoming 1.2 version.

UPDATE

The GH issue on the matter: https://github.com/spring-projects/spring-integration-java-dsl/issues/75

like image 58
Artem Bilan Avatar answered Sep 28 '22 02:09

Artem Bilan