Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring rabbitmq send to exchange with dynamic binding

I try to use TopicExchange for masking a messages.

Config:

    <rabbit:connection-factory id="connectionFactory"  host="localhost" username="guest" password="guest"/>

<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>

<rabbit:queue name="sample.queue"/>

<rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory" />

<bean id="rabbitListenerContainerFactory"
      class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
    <property name="connectionFactory" ref="connectionFactory"/>
</bean>

<rabbit:annotation-driven container-factory="rabbitListenerContainerFactory"/>

<rabbit:listener-container connection-factory="connectionFactory" />

Component:

@Component
public class JmsComponent {

    private final Logger log = LoggerFactory.getLogger(JmsComponent.class);

    private final TopicExchange exchange = new TopicExchange("sample.exchange");

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private Queue queue;

    private String received;

    public void send(String msg) {
        rabbitTemplate.convertAndSend("sample.queue", new SimpleMessage(msg));
    }

    public void bindToKey(String keyMask) {
        BindingBuilder.bind(queue).to(exchange).with(keyMask);
        rabbitTemplate.setExchange(exchange.getName());
    }


    public void sendByKey(String key, String msg) {
        rabbitTemplate.convertAndSend(exchange.getName(), key, new SimpleMessage(msg));
    }

    @RabbitListener(queues = "sample.queue")
    public void handle(SimpleMessage message) {
        log.info("================ Received  " + message.getMsg());
        received = message.getMsg();
    }

    public String getReceived() {
        return received;
    }

When I used send (before including TopicExchange) - all works fine. Messages was sent to queue directly and handle() has receive it. But for TopicExchange.... I try to use it that:

@Test
public void bind() throws InterruptedException {
    jmsComponent.bindToKey("qq");
    jmsComponent.sendByKey("qq", "message");
    Thread.sleep(5000);
    Assert.isTrue("message".equals(jmsComponent.getReceived()));
}

Test is fail always but in log I see this - DEBUG o.s.amqp.rabbit.core.RabbitTemplate - Publishing message on exchange [sample.exchange], routingKey = [qq] What is wrong??? Thanks

like image 615
Grigorichev Denis Avatar asked Feb 08 '23 18:02

Grigorichev Denis


2 Answers

This...

BindingBuilder.bind(queue).to(exchange).with(keyMask);

...does nothing except create a Binding object and then throw it away. You need to take that Binding object and call declareBinding on the admin. You also need to declare the exchange.

Since you have an admin in your context; the easiest thing to do is add the <rabbit:exchange/> to the context (along with the binding). See the documentation.

<rabbit:queue id="myQueue" name="sample.queue"/>

<topic-exchange name="sample.exchange">
    <bindings>
        <binding queue="myQueue" pattern="bucket.#"/>
    </bindings>
</topic-exchange>

By the way, topic exchanges are intended for routing by key patterns; if you simply want to route/bind with a fixed key, like qq then use a direct exchange. See the RabbitMQ Tutorials.

like image 140
Gary Russell Avatar answered Feb 12 '23 11:02

Gary Russell


I change my component use answer of Gary Russell:

  • I've add

    @Autowired
    private RabbitAdmin rabbitAdmin;
    
    @PostConstruct
    public void init(){
        rabbitAdmin.declareExchange(exchange);
    }
    
  • and modify the bind method:

    public void bindToKey(String keyMask) {
        Binding binding = BindingBuilder.bind(queue).to(exchange).with(keyMask);
        rabbitAdmin.declareBinding(binding); // re-declare binding if mask changed
        rabbitTemplate.setExchange(exchange.getName());
    }
    
  • and test became works after it!

More, I've add change binding mask at runtime:

@Test
public void bind() throws InterruptedException {
    jmsComponent.bindToKey("qq");
    jmsComponent.sendByKey("qq", "message");
    Thread.sleep(5000);
    Assert.isTrue("message".equals(jmsComponent.getReceived()));

    jmsComponent.bindToKey("eeeee");
    jmsComponent.sendByKey("eeeee", "message one");
    Thread.sleep(5000);
    Assert.isTrue("message one".equals(jmsComponent.getReceived()));
}

All works.

like image 26
Grigorichev Denis Avatar answered Feb 12 '23 11:02

Grigorichev Denis