I have a spring-cloud-stream
application with kafka binding. I would like to send and receive a message from the same topic from within the same executable(jar). I have my channel definitions such as below:-
public interface ChannelDefinition {
@Input("forum")
public SubscriableChannel readMessage();
@Output("forum")
public MessageChannel postMessage();
}
I use @StreamListener
to receive messages. I get all sorts of unexpected errors. At times, i receive
Is there a working example of the above usecase?
Along with the answer above by Marius Bogoevici, here's an example of how to listen to that Input.
@StreamListener
public void handleNewOrder(@Input("input") SubscribableChannel input) {
logger.info("Subscribing...");
input.subscribe((message) -> {
logger.info("Received new message: {}", message);
});
}
This is a wrong way to define bindable channels (because of the use of the forum
name for both). We should be more thorough and fail fast on it, but you're binding both the input and the output to the same channel and creating a competing consumer within your application. That also explains your other issue with alternate messages.
What you should do is:
public interface ChannelDefinition {
@Input
public MessageChannel readMessage();
@Output
public MessageChannel postMessage();
}
And then use application properties to bind your channels to the same queue:
spring.cloud.stream.bindings.readMessage.destination=forum
spring.cloud.stream.bindings.postMessage.destination=forum
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With