When I use RabbitMQ as Flink DataStream Source,just as the Flink Documentation said.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// checkpointing is required for exactly-once or at-least-once guarantees
env.enableCheckpointing(...);
final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setPort(5000)
...
.build();
final DataStream<String> stream = env
.addSource(new RMQSource<String>(
connectionConfig, // config for the RabbitMQ connection
"queueName", // name of the RabbitMQ queue to consume
true, // use correlation ids; can be false if only at-least-once is required
new SimpleStringSchema())) // deserialization schema to turn messages into Java objects
.setParallelism(1); // non-parallel source is only required for exactly-once
This code will connect to RabbitMQ and auto create Queue "queueName".So I have got a problem. The RabbitMQ Queue already exist,I created it before. I don't want Flink try to create again. And the problem is Flink create the Queue without some paramters, that is conflict with the Queue I created before. Here is the Exception:
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-message-ttl' for queue 'queueName' in vhost '/': received none but current is the value '604800000' of type 'long', class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:443)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:263)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:136)
... 10 more
How to make Flink just subscribe a RabbitMQ queue without try to create a new one? Thank you all.
You can write your own class extending RMQSource and override setupQueue method in order to not create queue
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With