Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Need a thread-safe asynchronous message queue

I'm looking for a Python class (preferably part of the standard language, rather than a 3rd party library) to manage asynchronous 'broadcast style' messaging.

I will have one thread which puts messages on the queue (the 'putMessageOnQueue' method must not block) and then multiple other threads which will all be waiting for messages, having presumably called some blocking 'waitForMessage' function. When a message is placed on the queue I want each of the waiting threads to get its own copy of the message.

I've looked at the built-in Queue class, but I don't think this is suitable because consuming messages seems to involve removing them from the queue, so only 1 client thread would see each one.

This seems like it should be a common use-case, can anyone recommend a solution?

like image 390
codebox Avatar asked May 31 '13 12:05

codebox


People also ask

How do I make a queue thread-safe?

Thread safe means that you have to isolate any shared data. Here your shared data is the pointer to the queue.So , in general , any time you have operations on the queue you need to protect queue and prevent multiple threads reach your queue at the same time. One good way is to implement Condition Variables.

Is message queue thread-safe?

A message queue allows a sender to post messages which another thread then picks up and responds to. The posting of the message and the reading of the message is thread safe. This way, you can communicate with other threads by sending messages to the queue. The sender's job is to pass command messages to other threads.

What is asynchronous message queue?

A message queue is a form of asynchronous service-to-service communication used in serverless and microservices architectures. Messages are stored on the queue until they are processed and deleted. Each message is processed only once, by a single consumer.

What is a thread message queue?

A message queue is a kernel object that implements a simple message queue, allowing threads and ISRs to asynchronously send and receive fixed-size data items.


2 Answers

I think the typical approach to this is to use a separate message queue for each thread, and push the message onto every queue which has previously registered an interest in receiving such messages.

Something like this ought to work, but it's untested code...

from time import sleep
from threading import Thread
from Queue import Queue

class DispatcherThread(Thread):

   def __init__(self, *args, **kwargs):
       super(DispatcherThread, self).__init__(*args, **kwargs)
       self.interested_threads = []

   def run(self):
       while 1:
           if some_condition:
               self.dispatch_message(some_message)
           else:
               sleep(0.1)

   def register_interest(self, thread):
       self.interested_threads.append(thread)

   def dispatch_message(self, message):
       for thread in self.interested_threads:
           thread.put_message(message)



class WorkerThread(Thread):

   def __init__(self, *args, **kwargs):
       super(WorkerThread, self).__init__(*args, **kwargs)
       self.queue = Queue()


   def run(self):

       # Tell the dispatcher thread we want messages
       dispatcher_thread.register_interest(self)

       while 1:
           # Wait for next message
           message = self.queue.get()

           # Process message
           # ...

   def put_message(self, message):
       self.queue.put(message)


dispatcher_thread = DispatcherThread()
dispatcher_thread.start()

worker_threads = []
for i in range(10):
    worker_thread = WorkerThread()
    worker_thread.start()
    worker_threads.append(worker_thread)

dispatcher_thread.join()
like image 77
Aya Avatar answered Oct 20 '22 01:10

Aya


I think this is a more straight forward example (taken from the Queue example in Python Lib )

from threading import Thread
from Queue import Queue


num_worker_threads = 2

def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = Queue()
for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done
like image 38
Nils Ziehn Avatar answered Oct 19 '22 23:10

Nils Ziehn