I am writing a websocket server in twisted
to learn the framework. It will be receiving messages from a rabbitmq
broker, and and sending out updates to connected clients. If I want to broadcast/multi-cast many messages at a time through many client connections, is calling (just as an example) deferToThread(channel.basic_consume, queue)
, or callInThread(" ")
a very good option for doing so?
If not, what would be the twisted
way of consuming messages from rabbitmq
and forwarding them to connected clients?
My strategy is thus so far:
reactor_thread: listen on port(x) to setup and maintain client connections
other_thread: subscribe to a rabbitmq queue and consume messages if any (goes on forever)
In order to consume messages there has to be a queue. When a new consumer is added, assuming there are already messages ready in the queue, deliveries will start immediately. The target queue can be empty at the time of consumer registration. In that case first deliveries will happen when new messages are enqueued.
To increase the performance and to consume more messages at a time, do as follows: Open the "RabbitMQ connection" and go to the Event sources tab. In Advanced Settings > "Other Attributes:", add “concurrentConsumers” property. For instance: concurrentConsumers=10.
To have each consumer receive the same message, you need to create a queue for each consumer and deliver the same message to each queue. The easiest way to do this is to use a fanout exchange. This will send every message to every queue that is bound to the exchange, completely ignoring the routing key.
is calling (just as an example) deferToThread(channel.basic_consume, queue), or callInThread(" ") a very good option for doing so?
Using threads won't really provide much benefit in this situation since messages are already queued in RabbitMQ. I've been in similar situations in the past and I can give you a high level overview of what I did to solve the problem without using threads. Disclaimer: I haven't worked with RabbitMQ or Websockets for a year or 2 so my knowledge may be a bit fuzzy.
Assuming you're using autobahn
for websockets, you can add a variable in the factory class (autobahn.twisted.websocket.WebSocketServerFactory
) which will keep track of connected clients. Either list
or dict
will work fine.
factory = WebSocketServerFactory()
factory.connection_list = []
The connection_list
variable will store protocol objects (autobahn.twisted.websocket.WebSocketServerProtocol
) after a connection is made. In the protocol, you would need to overload the connectionMade
function to append the protocol (self
in this case) into self.factory.connection_list
.
def connectionMade(self):
super(WSProtocol, self).connectionMade()
self.factory.connection_list.append(self)
It's probably best to create something like a "onConnect deferred" for flexibility but this is the gist of it. Maybe autobahn
provides an interface to do so.
Using pika
, you can consume messages asynchronously by using this example. Make the changes to channel and exchange names as necessary to make it work with your setup. Then we're going to make 2 changes. First we'll pass in factory.connection_list
to the callbacks, then when a message is consumed, we'll write it to the connected client's protocols.
@defer.inlineCallbacks
def run(connection, proto_list):
#...
l = task.LoopingCall(read, queue_object, proto_list)
l.start(0.01)
@defer.inlineCallbacks
def read(queue_object, proto_list):
#...
if body:
print(body)
for client in sorted(proto_list):
yield client.write(body)
yield ch.basic_ack(delivery_tag=method.delivery_tag)
#...
d.addCallback(run, factory.connection_list)
reactor.run()
In the read
callback function, every time a message is consumed, the looping task will iterate the list of connected clients and send them the message.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With