Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Non-blocking multiprocessing.connection.Listener?

I use multiprocessing.connection.Listener for communication between processes, and it works as a charm for me. Now i would really love my mainloop to do something else between commands from client. Unfortunately listener.accept() blocks execution until connection from client process is established.

Is there a simple way of managing non blocking check for multiprocessing.connection? Timeout? Or shall i use a dedicated thread?

    # Simplified code:      from multiprocessing.connection import Listener      def mainloop():         listener = Listener(address=(localhost, 6000), authkey=b'secret')          while True:             conn = listener.accept() # <---  This blocks!             msg = conn.recv()              print ('got message: %r' % msg)             conn.close() 
like image 796
igann Avatar asked Aug 13 '16 20:08

igann


People also ask

What is multiprocessing Freeze_support?

multiprocessing. freeze_support() This function will allow a frozen program to create and start new processes via the multiprocessing. Process class when the program is frozen for distribution on Windows.

Is multiprocessing queue process safe?

Yes, it is. From https://docs.python.org/3/library/multiprocessing.html#exchanging-objects-between-processes: Queues are thread and process safe.

What is pool in Python?

Pool . It creates multiple Python processes in the background and spreads out your computations for you across multiple CPU cores so that they all happen in parallel without you needing to do anything.

What is join in multiprocessing?

Python multiprocessing join The join method blocks the execution of the main process until the process whose join method is called terminates. Without the join method, the main process won't wait until the process gets terminated.


2 Answers

One solution that I found (although it might not be the most "elegant" solution is using conn.poll. (documentation) Poll returns True if the Listener has new data, and (most importantly) is nonblocking if no argument is passed to it. I'm not 100% sure that this is the best way to do this, but I've had success with only running listener.accept() once, and then using the following syntax to repeatedly get input (if there is any available)

from multiprocessing.connection import Listener  def mainloop():     running = True      listener = Listener(address=(localhost, 6000), authkey=b'secret')     conn = listener.accept()     msg = ""      while running:         while conn.poll():             msg = conn.recv()              print (f"got message: {msg}")              if msg == "EXIT":                 running = False          # Other code can go here         print(f"I can run too! Last msg received was {msg}")       conn.close() 

The 'while' in the conditional statement can be replaced with 'if,' if you only want to get a maximum of one message at a time. Use with caution, as it seems sort of 'hacky,' and I haven't found references to using conn.poll for this purpose elsewhere.

like image 103
readjfb Avatar answered Sep 28 '22 02:09

readjfb


I've not used the Listener object myself- for this task I normally use multiprocessing.Queue; doco at the following link:

https://docs.python.org/2/library/queue.html#Queue.Queue

That object can be used to send and receive any pickle-able object between Python processes with a nice API; I think you'll be most interested in:

  • in process A
    • .put('some message')
  • in process B
    • .get_nowait() # will raise Queue.Empty if nothing is available- handle that to move on with your execution

The only limitation with this is you'll need to have control of both Process objects at some point in order to be able to allocate the queue to them- something like this:

import time from Queue import Empty from multiprocessing import Queue, Process   def receiver(q):     while 1:         try:             message = q.get_nowait()             print 'receiver got', message         except Empty:             print 'nothing to receive, sleeping'             time.sleep(1)   def sender(q):     while 1:         message = 'some message'         q.put('some message')         print 'sender sent', message         time.sleep(1)   some_queue = Queue()  process_a = Process(     target=receiver,     args=(some_queue,) )  process_b = Process(     target=sender,     args=(some_queue,) )  process_a.start() process_b.start()  print 'ctrl + c to exit' try:     while 1:         time.sleep(1) except KeyboardInterrupt:     pass  process_a.terminate() process_b.terminate()  process_a.join() process_b.join() 

Queues are nice because you can actually have as many consumers and as many producers for that exact same Queue object as you like (handy for distributing tasks).

I should point out that just calling .terminate() on a Process is bad form- you should use your shiny new messaging system to pass a shutdown message or something of that nature.

like image 28
initialed85 Avatar answered Sep 28 '22 02:09

initialed85