Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

how to send and receive from the same topic within spring cloud stream and kafka

I have a spring-cloud-stream application with kafka binding. I would like to send and receive a message from the same topic from within the same executable(jar). I have my channel definitions such as below:- public interface ChannelDefinition { @Input("forum") public SubscriableChannel readMessage(); @Output("forum") public MessageChannel postMessage(); }

I use @StreamListener to receive messages. I get all sorts of unexpected errors. At times, i receive

  1. No dispatcher found for unknown.message.channel for every other message
  2. If i attach a command line kafka subscriber to the above forum topic, it recieves every other message.
  3. My application receives every other message, which is exclusive set of messages from command line subscriber. I have made sure that my application subscribes under a specific group name.

Is there a working example of the above usecase?

like image 251
Jimm Avatar asked Jul 21 '16 02:07

Jimm


2 Answers

Along with the answer above by Marius Bogoevici, here's an example of how to listen to that Input.

@StreamListener
public void handleNewOrder(@Input("input") SubscribableChannel input) {
    logger.info("Subscribing...");
    input.subscribe((message) -> {
        logger.info("Received new message: {}", message);
    });
}
like image 57
Tony Zampogna Avatar answered Sep 30 '22 11:09

Tony Zampogna


This is a wrong way to define bindable channels (because of the use of the forum name for both). We should be more thorough and fail fast on it, but you're binding both the input and the output to the same channel and creating a competing consumer within your application. That also explains your other issue with alternate messages.

What you should do is:

public interface ChannelDefinition { 

   @Input
   public MessageChannel readMessage();

   @Output
   public MessageChannel postMessage();
}

And then use application properties to bind your channels to the same queue:

spring.cloud.stream.bindings.readMessage.destination=forum
spring.cloud.stream.bindings.postMessage.destination=forum
like image 30
Marius Bogoevici Avatar answered Sep 30 '22 09:09

Marius Bogoevici