I try to use TopicExchange for masking a messages.
Config:
<rabbit:connection-factory id="connectionFactory" host="localhost" username="guest" password="guest"/>
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<rabbit:queue name="sample.queue"/>
<rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory" />
<bean id="rabbitListenerContainerFactory"
class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<rabbit:annotation-driven container-factory="rabbitListenerContainerFactory"/>
<rabbit:listener-container connection-factory="connectionFactory" />
Component:
@Component
public class JmsComponent {
private final Logger log = LoggerFactory.getLogger(JmsComponent.class);
private final TopicExchange exchange = new TopicExchange("sample.exchange");
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private Queue queue;
private String received;
public void send(String msg) {
rabbitTemplate.convertAndSend("sample.queue", new SimpleMessage(msg));
}
public void bindToKey(String keyMask) {
BindingBuilder.bind(queue).to(exchange).with(keyMask);
rabbitTemplate.setExchange(exchange.getName());
}
public void sendByKey(String key, String msg) {
rabbitTemplate.convertAndSend(exchange.getName(), key, new SimpleMessage(msg));
}
@RabbitListener(queues = "sample.queue")
public void handle(SimpleMessage message) {
log.info("================ Received " + message.getMsg());
received = message.getMsg();
}
public String getReceived() {
return received;
}
When I used send (before including TopicExchange) - all works fine. Messages was sent to queue directly and handle() has receive it. But for TopicExchange.... I try to use it that:
@Test
public void bind() throws InterruptedException {
jmsComponent.bindToKey("qq");
jmsComponent.sendByKey("qq", "message");
Thread.sleep(5000);
Assert.isTrue("message".equals(jmsComponent.getReceived()));
}
Test is fail always but in log I see this - DEBUG o.s.amqp.rabbit.core.RabbitTemplate - Publishing message on exchange [sample.exchange], routingKey = [qq] What is wrong??? Thanks
This...
BindingBuilder.bind(queue).to(exchange).with(keyMask);
...does nothing except create a Binding
object and then throw it away. You need to take that Binding
object and call declareBinding
on the admin. You also need to declare the exchange.
Since you have an admin in your context; the easiest thing to do is add the <rabbit:exchange/>
to the context (along with the binding). See the documentation.
<rabbit:queue id="myQueue" name="sample.queue"/>
<topic-exchange name="sample.exchange">
<bindings>
<binding queue="myQueue" pattern="bucket.#"/>
</bindings>
</topic-exchange>
By the way, topic exchanges are intended for routing by key patterns; if you simply want to route/bind with a fixed key, like qq
then use a direct exchange. See the RabbitMQ Tutorials.
I change my component use answer of Gary Russell:
I've add
@Autowired
private RabbitAdmin rabbitAdmin;
@PostConstruct
public void init(){
rabbitAdmin.declareExchange(exchange);
}
and modify the bind method:
public void bindToKey(String keyMask) {
Binding binding = BindingBuilder.bind(queue).to(exchange).with(keyMask);
rabbitAdmin.declareBinding(binding); // re-declare binding if mask changed
rabbitTemplate.setExchange(exchange.getName());
}
and test became works after it!
More, I've add change binding mask at runtime:
@Test
public void bind() throws InterruptedException {
jmsComponent.bindToKey("qq");
jmsComponent.sendByKey("qq", "message");
Thread.sleep(5000);
Assert.isTrue("message".equals(jmsComponent.getReceived()));
jmsComponent.bindToKey("eeeee");
jmsComponent.sendByKey("eeeee", "message one");
Thread.sleep(5000);
Assert.isTrue("message one".equals(jmsComponent.getReceived()));
}
All works.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With