Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to detect dead RabbitMQ connection?

Tags:

go

rabbitmq

amqp

I have a RabbitMQ consumer script in Go. This is a simple script from RabbitMQ tutorial that uses streadway/amqp library.

The problem is that if the RabbitMQ server is stopped, the consumer script does not exit; and when RabbitMQ server is restarted, the consumer does not receive messages anymore.

Is there a way to detect that the consumer connection is dead and reconnect, or at least terminate the consumer script?

I know that the library sets a default 10 sec. heartbeat interval for the connection; is it possible to use that someway?

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "test_task_queue", // name
        true,         // durable
        false,        // delete when unused
        false,        // exclusive
        false,        // no-wait
        nil,          // arguments
    )
    failOnError(err, "Failed to declare a queue")

    err = ch.Qos(
        1,     // prefetch count
        0,     // prefetch size
        false, // global
    )
    failOnError(err, "Failed to set QoS")

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
            d.Ack(false)
            dot_count := bytes.Count(d.Body, []byte("."))
            t := time.Duration(dot_count)
            time.Sleep(t * time.Second)
            log.Printf("Done")
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}
like image 736
Alexander Avatar asked Feb 01 '17 23:02

Alexander


People also ask

How do I check my RabbitMQ connection?

Here are the recommended steps: Make sure the node is running using rabbitmq-diagnostics status. Verify config file is correctly placed and has correct syntax/structure. Inspect listeners using rabbitmq-diagnostics listeners or the listeners section in rabbitmq-diagnostics status.

How do you keep RabbitMQ connection alive?

RabbitMQ uses so-called 'heartbeats' as a keep-alive mechanism with which, in simple terms, the client is expected to send heartbeats to the server informing it that it is still alive and running.

What happens if RabbitMQ down?

If a queue is not durable, all messages will be lost if RabbitMQ is shut down for any reason. For messages to survive restarts, both of these configurations must be true. This article talks about RabbitMQ Durable queues extensively.

Why is RabbitMQ connection blocked?

blocked notification is sent to publishing connections the first time RabbitMQ is low on a resource. For example, when a RabbitMQ node detects that it is low on RAM, it sends connection. blocked to all connected publishing clients supporting this feature.


2 Answers

amqp.Connection has method NotifyClose() which return channel signalling a transport or protocol error. So something like

for {  //reconnection loop
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") //setup
    notify := conn.NotifyClose(make(chan *amqp.Error)) //error channel
...
    ch, err := conn.Channel()
    msgs, err := ch.Consume(
...
    for{  //receive loop
        select {  //check connection
            case err = <-notify:
            //work with error
            break //reconnect
        case d = <- msgs:
            //work with message
        ...
        }
    }
}
like image 190
Uvelichitel Avatar answered Sep 23 '22 02:09

Uvelichitel


There are a couple ways of doing this: checking whether the delivery channel is closed or using Channel.NotifyClose.

Checking the delivery channel

After starting the consumer, you will receive from the delivery channel. As you know, the receive operation may take the special form x, ok := <-ch, where ok is false when x has a zero value due the channel being closed (and empty):

conn, _ := amqp.Dial(url)
ch, _ := conn.Channel()

delivery, _ := ch.Consume(
        queueName,
        consumerName,
        true,  // auto ack
        false, // exclusive
        false, // no local
        true,  // no wait,
        nil,   // table
    )

for {
    payload, ok := <- delivery
    if !ok {
        // ... channel closed
        return
    }
}

This works because the Go channel <-chan amqp.Delivery will be closed when the AMQP channel is closed or an error occurs:

[It] continues deliveries to the returned chan Delivery until Channel.Cancel, Connection.Close, Channel.Close, or an AMQP exception occurs.

Using Channel.NotifyClose

This is straightforward. And the principle is the same:

NotifyClose registers a listener for when the server sends a channel or connection exception in the form of a Connection.Close or Channel.Close method.

The channel returned by NotifyClose is the same you pass as argument; the method only registers it internally, so you can do just:

errC := ch.NotifyClose(make(chan *amqp.Error, n))

where n is a non-zero buffer size. Make sure to pass a buffered channel to NotifyClose otherwise, depending on how your code is structured, the library may block on send.

Then you can receive on the errC channel and take action depending on the type of error you get. In short, the error can be:

  • a connection error, usually unrecoverable
  • a channel error, also called soft exception, usually recoverable by resetting the connection
  • nil if the program calls conn.Close() on purpose

To know whether the error is recoverable or not, you can inspect the amqp.Error's Code field and/or the Recover field, which is set to true in case of soft exceptions.

The following func shows how error codes can be distinguished — this is provided as additional insight. For the general case, just check Error.Recover:

const (
    ConnectionError = 1
    ChannelError    = 2
)

func isConnectionError(err *amqp.Error) bool {
    return errorType(err.Code) == ConnectionError
}

func isChannelError(err *amqp.Error) bool {
    return errorType(err.Code) == ChannelError
}

func errorType(code int) int {
    switch code {
    case
        amqp.ContentTooLarge,    // 311
        amqp.NoConsumers,        // 313
        amqp.AccessRefused,      // 403
        amqp.NotFound,           // 404
        amqp.ResourceLocked,     // 405
        amqp.PreconditionFailed: // 406
        return ChannelError

    case
        amqp.ConnectionForced, // 320
        amqp.InvalidPath,      // 402
        amqp.FrameError,       // 501
        amqp.SyntaxError,      // 502
        amqp.CommandInvalid,   // 503
        amqp.ChannelError,     // 504
        amqp.UnexpectedFrame,  // 505
        amqp.ResourceError,    // 506
        amqp.NotAllowed,       // 530
        amqp.NotImplemented,   // 540
        amqp.InternalError:    // 541
        fallthrough

    default:
        return ConnectionError
    }
}
like image 27
blackgreen Avatar answered Sep 23 '22 02:09

blackgreen