Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RabbitMQ broken pipe error or lost messages

Using the pika library's BlockingConnection to connect to RabbitMQ, I occasionally get an error when publishing messages:

Fatal Socket Error: error(32, 'Broken pipe')

This is from a very simple sub-process that takes some information out of an in-memory queue and sends a small JSON message into AMQP. The error only seems to come up when the system hasn't sent any messages for a few minutes.

Setup:

connection = pika.BlockingConnection(parameters)
channel = self.connection.channel()
channel.exchange_declare(
    exchange='xyz',
    exchange_type='fanout',
    passive=False,
    durable=True,
    auto_delete=False
)

Enqueue code catches any connection errors and retries:

def _enqueue(self, message_id, data):
    try:
        published = self.channel.basic_publish(
            self.amqp_exchange,
            self.amqp_routing_key,
            json.dumps(data),
            pika.BasicProperties(
                content_type="application/json",
                delivery_mode=2,
                message_id=message_id
            )
        )

        # Confirm delivery or retry
        if published:
            self.retry_count = 0
        else:
            raise EnqueueException("Message publish not confirmed.")

    except (EnqueueException, pika.exceptions.AMQPChannelError, pika.exceptions.AMQPConnectionError,
            pika.exceptions.ChannelClosed, pika.exceptions.ConnectionClosed, pika.exceptions.UnexpectedFrameError,
            pika.exceptions.UnroutableError, socket.timeout) as e:
        self.retry_count += 1
        if self.retry_count < 5:
            logging.warning("Reconnecting and resending")
            if self.connection.is_open:
                self.connection.close()
            self.connect()
            self._enqueue(message_id, data)
        else:
            raise e

This sometimes works on the second attempt. It often hangs for a while or just throws away messages before eventually throwing an exception (possibly related bug report). Since it only happens when the system is quiet for a few minutes I'm guessing it's due to a connection timeout. But AMQP has a heartbeat system and pika reportedly uses it (related bug report).

Why do I get this error or lose messages, and why won't the connection stay open when not in use?

like image 604
Matt S Avatar asked Jul 12 '17 17:07

Matt S


People also ask

What does Broken pipe error message mean?

A broken Pipe Error is generally an Input/Output Error, which is occurred at the Linux System level. The error has occurred during the reading and writing of the files and it mainly occurs during the operations of the files.

What is RabbitMQ heartbeat?

RabbitMQ heartbeats help the application layer detect interrupted connections and unresponsive peers in a timely manner. Heartbeats also prevent some network devices from disconnecting TCP connections where there is no activity for a certain period of time.


2 Answers

From another bug report:

As BlockingConnection doesn't handle heartbeats in the background and the heartbeat_interval can't override the servers suggested heartbeat interval (that's a bug too), i suggest that heartbeats should be disabled by default (rely on TCP keep-alive instead).

If processing a task in a consume block takes longer time then the server suggested heartbeat interval, the connection will be closed by the server and the client won't be able to ack the message when it's done processing.

An update in v1.0.0 may help with the issue.

So I implemented a workaround. Every 30 seconds I publish a heartbeat message through the queue. This keeps the connection open and has the added benefit of confirming to clients that my application is up and running.

like image 73
Matt S Avatar answered Oct 15 '22 23:10

Matt S


The Broken Pipe error means that server is trying to write something into the socket when connection is closed on client's side.

As i can see, you have some shared "self.connection" that may be closed before/in parallel thread?

Also you could set up logging level to DEBUG and look at client's log to determine the moment when client closes connection.

like image 35
Alex Pertsev Avatar answered Oct 15 '22 22:10

Alex Pertsev