I am looking for a way to publish messages to a rabbitmq server from my django application. This is not for task offloading, so I don't want to use Celery. The purpose is to publish to the exchange using the django application and have a sister (non-django) application in the docker container consume from that queue.
This all seems very straightforward, however, I can't seem to publish to the exchange without establishing and closing a connection each time, even without explicitly calling for that to happen.
In an attempt to solve this, I have defined a class with a nested singleton class that maintains a connection to the rabbitmq server using Pika. The idea was that the nested singleton would be instantiated only once, declaring the connection at that time. Any time something is to be published to the queue, the singleton handles it.
import logging
import pika
import os
logger = logging.getLogger('django')
class PikaChannelSingleton:
class __Singleton:
channel = pika.adapters.blocking_connection.BlockingChannel
def __init__(self):
self.initialize_connection()
def initialize_connection(self):
logger.info('Attempting to establish RabbitMQ connection')
credentials = pika.PlainCredentials(rmq_username, rmq_password)
parameters = pika.ConnectionParameters(rmq_host, rmq_port, rmq_vhost, credentials, heartbeat=0)
connection = pika.BlockingConnection(parameters)
con_chan = connection.channel()
con_chan.exchange_declare(exchange='xchng', exchange_type='topic', durable=True)
self.channel = con_chan
def send(self, routing_key, message):
if self.channel.is_closed:
PikaChannelSingleton.instance.initialize_connection()
self.channel.basic_publish(exchange='xchng', routing_key=routing_key,
body=message)
instance = None
def __init__(self, *args, **kwargs):
if not PikaChannelSingleton.instance:
logger.info('Creating channel singleton')
PikaChannelSingleton.instance = PikaChannelSingleton.__Singleton()
@staticmethod
def send(routing_key, message):
PikaChannelSingleton.instance.send(routing_key, message)
rmq_connection = PikaChannelSingleton()
I then import rmq_connection where needed in the django application. Everything works in toy applications and in the python repl, but a new connection is being established every time the send function is being called in the django application. The connection then immediately closes with the message 'client unexpectedly closed TCP connection'. The message does get published to the exchange correctly.
So I am sure there is something going on with django and how it handles processes and such. The question still remains, how do I post numerous messages to a queue without re-establishing a connection each time?
If I understand correctly, connections cannot be kept alive like that in a single-threaded context. As your Django app continues executing, the amqp client is not sending the heartbeats on the channel and the connection will die.
You could use SelectConnection
instead of BlockingConnection
, probably not easy in the context of Django.
A good compromise could be to simply collect messages in your singleton but only send them all at once with a BlockingConnection
at the very end of your Django request.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With