Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring Cloud Stream with Rabbit Binder - source/sink queue names don't match

Recently, I started to play with Spring Cloud Stream and RabbitMQ binder.

If I understood everything correctly when two services want to pass message, one should configure source for sending messages and other should configure sink for receiving messages - both should use the same channel.

I have channel named testchannel. I noticed, though, that source created RabbitMQ binding:

  • exchange testchannel,
  • routing key testchannel,
  • queue testchannel.default (durable),

while sink created RabbitMQ binding:

  • exchange testchannel,
  • routing key #,
  • queue testchannel.anonymous.RANDOM_ID (excusive).

I skipped prefix, for brevity.

Now when I ran both applications. First one sends message to testchannel exchange, which then is routed to both queues (I assume routing key is testchannel). Second application consumes message from random queue, but message from default queue is never consumed.

My other problem is - 2nd app is using only sink, but it also creates binding for output channel, which is output by default, because I haven't specified anything.

I build both apps with the same Gradle script:

buildscript {
    ext {
        springBootVersion = '1.3.2.RELEASE'
    }
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

apply plugin: 'java'
apply plugin: 'spring-boot'

repositories {
    mavenCentral()
    maven { url 'https://repo.spring.io/snapshot' }
    maven { url 'https://repo.spring.io/milestone' }
}

dependencies {
    compile(
            'org.springframework.cloud:spring-cloud-starter-stream-rabbit',
    )
}

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:Brixton.BUILD-SNAPSHOT"
    }
}

First app properties:

server.port=8010
spring.cloud.stream.binder.rabbit.default.prefix=z.
spring.cloud.stream.bindings.input=start
spring.cloud.stream.bindings.output=testchannel
spring.rabbitmq.addresses=host1:5672,host2:5672
spring.rabbitmq.username=user
spring.rabbitmq.password=psw

Fisrt app source code:

@EnableBinding(Processor.class)
...
@ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public byte[] handleIncomingMessage(byte[] payload) {}

Second app properties:

server.port=8011
spring.cloud.stream.binder.rabbit.default.prefix=z.
spring.cloud.stream.bindings.input=testchannel
spring.rabbitmq.addresses=host1:5672,host2:5672
spring.rabbitmq.username=user
spring.rabbitmq.password=psw

Second app source code:

@EnableBinding(Sink.class)
...
@ServiceActivator(inputChannel = Sink.INPUT)
public void handleIncomingMessage(byte[] payload) {}

So my questions are.

  • Shouldn't source and sink use the same channel and as result the same broker queue? What is proper configuration to achieve that? (My goal is to have have multiple sink service instances, but only one should consume the message.)
  • Should framework create output binding when I am using only sink? If yes, how to disable it.
like image 776
wst Avatar asked Oct 11 '25 21:10

wst


1 Answers

By default; consumers each get their own queue; it's a publish/subscribe scenario.

There is a notion of a consumer group so you can have multiple instances compete for messages from the same queue.

When binding the producer, a default queue is bound.

If you wish to subscribe to the default group; you have to set the group:

spring.cloud.stream.bindings.input.group=default

If you don't provide a group, you get an exclusive, auto-delete queue.

EDIT

Since the default queue is durable, you should also set

spring.cloud.stream.bindings.input.durableSubscription=true

to avoid a warning when the consumer binds and to make sure the queue is durable if the consumer binds first and the queue doesn't exist yet.

like image 105
Gary Russell Avatar answered Oct 15 '25 07:10

Gary Russell