I have a very simple integration flow, where a RESTful request is forwarded to two providers using a publish-subscribe channel. The result from both RESTful services is then aggregated in a single array. The sketch of the integration flow is as shown below:
@Bean
IntegrationFlow flow() throws Exception {
return IntegrationFlows.from("inputChannel")
.publishSubscribeChannel(s -> s.applySequence(true)
.subscribe(f -> f
.handle(Http.outboundGateway("http://provider1.com/...")
.httpMethod(HttpMethod.GET)
.expectedResponseType(ItemDTO[].class))
).subscribe(f -> f
.handle(Http.outboundGateway("http://provider2.com/...")
.httpMethod(HttpMethod.GET)
.expectedResponseType(ItemDTO[].class)
)
)
)
.aggregate()
.get();
}
However, when running my code, the resulting array contains the items returned by only one of the RESTful services. Is there any configuration step I am missing?
UPDATE
The following version corresponds to the full solution, taking into account Artem's comments.
@Bean
IntegrationFlow flow() throws Exception {
return IntegrationFlows.from("inputChannel-scatter")
.publishSubscribeChannel(s -> s.applySequence(true)
.subscribe(f -> f
.handle(Http.outboundGateway("http://provider1.com/...")
.httpMethod(HttpMethod.GET)
.expectedResponseType(ItemDTO[].class))
.channel("inputChannel-gather"))
.subscribe(f -> f
.handle(Http.outboundGateway("http://provider2.com/...")
.httpMethod(HttpMethod.GET)
.expectedResponseType(ItemDTO[].class))
.channel("inputChannel-gather")))
.get();
}
@Bean
IntegrationFlow gatherFlow() {
return IntegrationFlows.from("inputChannel-gather")
.aggregate(a -> a.outputProcessor(g -> new GenericMessage<ItemDTO[]>(
g.getMessages().stream()
.flatMap(m -> Arrays.stream((ItemDTO[]) m.getPayload()))
.collect(Collectors.toList()).toArray(new ItemDTO[0]))))
.get();
}
The Java DSL for Spring Integration is essentially a facade for Spring Integration. The DSL provides a simple way to embed Spring Integration Message Flows into your application by using the fluent Builder pattern together with existing Java configuration from Spring Framework and Spring Integration.
Two related concepts are correlation and release. Correlation determines how messages are grouped for aggregation. In Spring Integration, correlation is done by default, based on the IntegrationMessageHeaderAccessor. CORRELATION_ID message header.
The service activator is the endpoint type for connecting any Spring-managed object to an input channel so that it may play the role of a service. If the service produces output, it may also be connected to an output channel.
A bridge in Spring Integration is used to connect two message channels or adapters if for any reason they can't connect directly.
Actually it doesn't work that way.
The .aggregate()
is a third subscriber to that publishSubscribeChannel
.
You have to sever your flow to two of them. Like this:
@Bean
public IntegrationFlow publishSubscribeFlow() {
return flow -> flow
.publishSubscribeChannel(s -> s
.applySequence(true)
.subscribe(f -> f
.handle((p, h) -> "Hello")
.channel("publishSubscribeAggregateFlow.input"))
.subscribe(f -> f
.handle((p, h) -> "World!")
.channel("publishSubscribeAggregateFlow.input"))
);
}
@Bean
public IntegrationFlow publishSubscribeAggregateFlow() {
return flow -> flow
.aggregate(a -> a.outputProcessor(g -> g.getMessages()
.stream()
.<String>map(m -> (String) m.getPayload())
.collect(Collectors.joining(" "))))
.channel(c -> c.queue("subscriberAggregateResult"));
}
Pay attention, please, to the .channel("publishSubscribeAggregateFlow.input")
usage from both subscribers.
To be honest that is a point of any publish-subscribe
. We must know where to send the result of all subscribers if we are going to aggregate them.
Your use-case recalls me the Scatter-Gather EIP pattern.
We don't have its implementation in the DSL yet.
Feel free to raise a GH issue on the matter and we will try to handle it in the upcoming 1.2
version.
UPDATE
The GH issue on the matter: https://github.com/spring-projects/spring-integration-java-dsl/issues/75
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With