We have Clojure code which reads from a Rabbit queue. We would like to tolerate the case where the RabbitMQ server is down briefly, e.g. in the case of a restart (sudo service rabbitmq-server restart
).
There appears to be some provision for reconnecting in Langohr. We adapted the example clojurewerkz.langohr.examples.recovery.example1
(Gist here). Slight differences vs. the published example include the connection parameters, and the removal of the lb/publish
call (since we're filling the data with an external source).
We can successfully consume data from the queue and wait for more messages. However, when we restart RMQ (via the above sudo
command on the VM hosting RabbitMQ), the following exception is thrown:
Caught an exception during connection recovery!
java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:378)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:516)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:545)
at com.novemberain.langohr.Connection.recoverConnection(Connection.java:166)
at com.novemberain.langohr.Connection.beginAutomaticRecovery(Connection.java:115)
at com.novemberain.langohr.Connection.access$000(Connection.java:18)
at com.novemberain.langohr.Connection$1.shutdownCompleted(Connection.java:93)
at com.rabbitmq.client.impl.ShutdownNotifierComponent.notifyListeners(ShutdownNotifierComponent.java:75)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:573)
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; reason: java.io.EOFException
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:321)
... 8 more
Caused by: java.io.EOFException
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:273)
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:131)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:533)
It seems likely that the intended restart mechanism provided by Langohr is breaking when it kicks in. Is there an alternative pattern which is preferred in the case of these "hard" restarts? Alternatively, I suppose we have to implement connection monitoring and retries ourselves. Any suggestions would be most welcome.
We used to see such stack traces, but we no longer see them with Langohr 2.9.0. After a restart, our clojure clients reconnect and messages start flowing again.
We are using the defaults, which have connection and topology coverage turned on, as shown by this:
(infof "Automatic recovery enabled? %s" (rmq/automatic-recovery-enabled? connection))
(infof "Topology recovery enabled? %s" (rmq/automatic-topology-recovery-enabled? connection))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With