I have a service, which receives different structured messages from different message queues. Having @StreamListener conditions
we can choose at every message type how that message should be handled. As an example:
We receive two different types of messages, which have different header fields and values e.g.
Incoming from "order" queue:
Order1: { Header: {catalog:groceries} }
Order2: { Header: {catalog:tools} }
Incoming from "shipment" queue:
Shipment1: { Header: {region:Europe} }
Shipment2: { Header: {region:America} }
There is a binding for each queue, and with according @StreamListener
I can process the messages by catalog and region differently
e.g.
@StreamListener(target = OrderSink.ORDER_CHANNEL, condition = "headers['catalog'] == 'groceries'")
public void onGroceriesOrder(GroceryOder order){
...
}
So the question is, how to achieve this with the new Spring Cloud Function approach?
At the documentation https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.2.RELEASE/reference/html/spring-cloud-stream.html#_event_routing it is mentioned:
Also, for SpEL, the root object of the evaluation context is Message so you can do evaluation on individual headers (or message) as well ….routing-expression=headers['type']
Is it possible to add the routing-expression to the binding like (in application.yml
)
onGroceriesOrder-in-0:
destination: order
routing-expression: "headers['catalog']==groceries"
?
EDIT after first answer If the above expression at this location is not possible, what the first answer implies, than my question goes as follows:
As far as I understand, an expression like routing-expression: headers['catalog']
must be set globally, because the result maps to certain (consumer) functions.
How can I control that the 2 different messages on each queue will be forwarted to their own consumer function, e.g.
Order1 --> MyOrderService.onGroceriesOrder()
Order2 --> MyOrderService.onToolsOrder()
Shipment1 --> MyShipmentService.onEuropeShipment()
Shipment2 --> MyShipmentService.onAmericaShipment()
That was easy with @StreamListener
, because each method gets their own @StreamListener
annotation with different conditions. How can this be achieved with the new routing-expression
setting?
?
Aside from the fact that the above is not a valid expression, but I think you meant headers['catalog']==groceries
. If so, what would you expect to happen from evaluating it as the only two option could be true/false. Anyway, these are rhetorical but helps to understand the problem and how to fix it.
The expression must result in a value of a function to route TO. So. . .
routing-expression: headers['catalog']
- assumes that the actual value of catalog
header is the name of the function to invoke
routing-expression: headers['catalog']==groceries ? 'processGroceries' : 'processOther'
- maps value 'groceries' to 'processGroceries' function.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With