I have a RabbitMQ consumer script in Go. This is a simple script from RabbitMQ tutorial that uses streadway/amqp
library.
The problem is that if the RabbitMQ server is stopped, the consumer script does not exit; and when RabbitMQ server is restarted, the consumer does not receive messages anymore.
Is there a way to detect that the consumer connection is dead and reconnect, or at least terminate the consumer script?
I know that the library sets a default 10 sec. heartbeat interval for the connection; is it possible to use that someway?
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"test_task_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set QoS")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
d.Ack(false)
dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
time.Sleep(t * time.Second)
log.Printf("Done")
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
Here are the recommended steps: Make sure the node is running using rabbitmq-diagnostics status. Verify config file is correctly placed and has correct syntax/structure. Inspect listeners using rabbitmq-diagnostics listeners or the listeners section in rabbitmq-diagnostics status.
RabbitMQ uses so-called 'heartbeats' as a keep-alive mechanism with which, in simple terms, the client is expected to send heartbeats to the server informing it that it is still alive and running.
If a queue is not durable, all messages will be lost if RabbitMQ is shut down for any reason. For messages to survive restarts, both of these configurations must be true. This article talks about RabbitMQ Durable queues extensively.
blocked notification is sent to publishing connections the first time RabbitMQ is low on a resource. For example, when a RabbitMQ node detects that it is low on RAM, it sends connection. blocked to all connected publishing clients supporting this feature.
amqp.Connection
has method NotifyClose()
which return channel signalling a transport or protocol error.
So something like
for { //reconnection loop
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") //setup
notify := conn.NotifyClose(make(chan *amqp.Error)) //error channel
...
ch, err := conn.Channel()
msgs, err := ch.Consume(
...
for{ //receive loop
select { //check connection
case err = <-notify:
//work with error
break //reconnect
case d = <- msgs:
//work with message
...
}
}
}
There are a couple ways of doing this: checking whether the delivery channel is closed or using Channel.NotifyClose
.
After starting the consumer, you will receive from the delivery channel. As you know, the receive operation may take the special form x, ok := <-ch
, where ok
is false when x
has a zero value due the channel being closed (and empty):
conn, _ := amqp.Dial(url)
ch, _ := conn.Channel()
delivery, _ := ch.Consume(
queueName,
consumerName,
true, // auto ack
false, // exclusive
false, // no local
true, // no wait,
nil, // table
)
for {
payload, ok := <- delivery
if !ok {
// ... channel closed
return
}
}
This works because the Go channel <-chan amqp.Delivery
will be closed when the AMQP channel is closed or an error occurs:
[It] continues deliveries to the returned chan Delivery until Channel.Cancel, Connection.Close, Channel.Close, or an AMQP exception occurs.
Channel.NotifyClose
This is straightforward. And the principle is the same:
NotifyClose registers a listener for when the server sends a channel or connection exception in the form of a Connection.Close or Channel.Close method.
The channel returned by NotifyClose
is the same you pass as argument; the method only registers it internally, so you can do just:
errC := ch.NotifyClose(make(chan *amqp.Error, n))
where n
is a non-zero buffer size. Make sure to pass a buffered channel to NotifyClose
otherwise, depending on how your code is structured, the library may block on send.
Then you can receive on the errC
channel and take action depending on the type of error you get. In short, the error can be:
nil
if the program calls conn.Close()
on purposeTo know whether the error is recoverable or not, you can inspect the amqp.Error
's Code
field and/or the Recover
field, which is set to true in case of soft exceptions.
The following func shows how error codes can be distinguished — this is provided as additional insight. For the general case, just check Error.Recover
:
const (
ConnectionError = 1
ChannelError = 2
)
func isConnectionError(err *amqp.Error) bool {
return errorType(err.Code) == ConnectionError
}
func isChannelError(err *amqp.Error) bool {
return errorType(err.Code) == ChannelError
}
func errorType(code int) int {
switch code {
case
amqp.ContentTooLarge, // 311
amqp.NoConsumers, // 313
amqp.AccessRefused, // 403
amqp.NotFound, // 404
amqp.ResourceLocked, // 405
amqp.PreconditionFailed: // 406
return ChannelError
case
amqp.ConnectionForced, // 320
amqp.InvalidPath, // 402
amqp.FrameError, // 501
amqp.SyntaxError, // 502
amqp.CommandInvalid, // 503
amqp.ChannelError, // 504
amqp.UnexpectedFrame, // 505
amqp.ResourceError, // 506
amqp.NotAllowed, // 530
amqp.NotImplemented, // 540
amqp.InternalError: // 541
fallthrough
default:
return ConnectionError
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With