I'm trying to start a data queue server under a managing process (so that it can later be turned into a service), and while the data queue server function works fine in the main process, it does not work in a process created using multiprocessing.Process.
The dataQueueServer and dataQueueClient code is based on the code from the multiprocessing module documentation here.
When run on its own, dataQueueServer works well. However, when run using a multiprocessing.Process
's start()
in mpquueue, it doesn't work (when tested with the client). I am using the dataQueueClient without changes to test both cases.
The code does reach the serve_forever
in both cases, so I think the server is working, but something is blocking it from communicating back to the client in the mpqueue case.
I have placed the loop that runs the serve_forever()
part under a thread, so that it can be stoppable.
Here is the code:
mpqueue # this is the "manager" process trying to spawn the server in a child process
import time
import multiprocessing
import threading
import dataQueueServer
class Printer():
def __init__(self):
self.lock = threading.Lock()
def tsprint(self, text):
with self.lock:
print text
class QueueServer(multiprocessing.Process):
def __init__(self, name = '', printer = None):
multiprocessing.Process.__init__(self)
self.name = name
self.printer = printer
self.ml = dataQueueServer.MainLoop(name = 'ml', printer = self.printer)
def run(self):
self.printer.tsprint(self.ml)
self.ml.start()
def stop(self):
self.ml.stop()
if __name__ == '__main__':
printer = Printer()
qs = QueueServer(name = 'QueueServer', printer = printer)
printer.tsprint(qs)
printer.tsprint('starting')
qs.start()
printer.tsprint('started.')
printer.tsprint('Press Ctrl-C to quit')
try:
while True:
time.sleep(60)
except KeyboardInterrupt:
printer.tsprint('\nTrying to exit cleanly...')
qs.stop()
printer.tsprint('stopped')
dataQueueServer
import time
import threading
from multiprocessing.managers import BaseManager
from multiprocessing import Queue
HOST = ''
PORT = 50010
AUTHKEY = 'authkey'
## Define some helper functions for use by the main process loop
class Printer():
def __init__(self):
self.lock = threading.Lock()
def tsprint(self, text):
with self.lock:
print text
class QueueManager(BaseManager):
pass
class MainLoop(threading.Thread):
"""A thread based loop manager, allowing termination signals to be sent
to the thread"""
def __init__(self, name = '', printer = None):
threading.Thread.__init__(self)
self._stopEvent = threading.Event()
self.daemon = True
self.name = name
if printer is None:
self.printer = Printer()
else:
self.printer = printer
## create the queue
self.queue = Queue()
## Add a function to the handler to return the queue to clients
self.QM = QueueManager
self.QM.register('get_queue', callable=lambda:self.queue)
self.queue_manager = self.QM(address=(HOST, PORT), authkey=AUTHKEY)
self.queue_server = self.queue_manager.get_server()
def __del__(self):
self.printer.tsprint( 'closing...')
def run(self):
self.printer.tsprint( '{}: started serving'.format(self.name))
self.queue_server.serve_forever()
def stop(self):
self.printer.tsprint ('{}: stopping'.format(self.name))
self._stopEvent.set()
def stopped(self):
return self._stopEvent.isSet()
def start():
printer = Printer()
ml = MainLoop(name = 'ml', printer = printer)
ml.start()
return ml
def stop(ml):
ml.stop()
if __name__ == '__main__':
ml = start()
raw_input("\nhit return to stop")
stop(ml)
And a client:
dataQueueClient
import datetime
from multiprocessing.managers import BaseManager
n = 0
N = 10**n
HOST = ''
PORT = 50010
AUTHKEY = 'authkey'
def now():
return datetime.datetime.now()
def gen(n, func, *args, **kwargs):
k = 0
while k < n:
yield func(*args, **kwargs)
k += 1
class QueueManager(BaseManager):
pass
QueueManager.register('get_queue')
m = QueueManager(address=(HOST, PORT), authkey=AUTHKEY)
m.connect()
queue = m.get_queue()
def load(msg, q):
return q.put(msg)
def get(q):
return q.get()
lgen = gen(N, load, msg = 'hello', q = queue)
t0 = now()
while True:
try:
lgen.next()
except StopIteration:
break
t1 = now()
print 'loaded %d items in ' % N, t1-t0
t0 = now()
while queue.qsize() > 0:
queue.get()
t1 = now()
print 'got %d items in ' % N, t1-t0
So it seems like the solution is simple enough: Don't use serve_forever()
, and use manager.start()
instead.
According to Eli Bendersky, the BaseManager
(and it's extended version SyncManager
) already spawns the server in a new process (and looking at the multiprocessing.managers code confirms this). The problem I have been experiencing stems from the form used in the example, in which the server is started under the main process.
I still don't understand why the current example doesn't work when run under a child process, but that's no longer an issue.
Here's the working (and much simplified from OP) code to manage multiple queue servers:
Server:
from multiprocessing import Queue
from multiprocessing.managers import SyncManager
HOST = ''
PORT0 = 5011
PORT1 = 5012
PORT2 = 5013
AUTHKEY = 'authkey'
name0 = 'qm0'
name1 = 'qm1'
name2 = 'qm2'
description = 'Queue Server'
def CreateQueueServer(HOST, PORT, AUTHKEY, name = None, description = None):
name = name
description = description
q = Queue()
class QueueManager(SyncManager):
pass
QueueManager.register('get_queue', callable = lambda: q)
QueueManager.register('get_name', callable = name)
QueueManager.register('get_description', callable = description)
manager = QueueManager(address = (HOST, PORT), authkey = AUTHKEY)
manager.start() # This actually starts the server
return manager
# Start three queue servers
qm0 = CreateQueueServer(HOST, PORT0, AUTHKEY, name0, description)
qm1 = CreateQueueServer(HOST, PORT1, AUTHKEY, name1, description)
qm2 = CreateQueueServer(HOST, PORT2, AUTHKEY, name2, description)
raw_input("return to end")
Client:
from multiprocessing.managers import SyncManager
HOST = ''
PORT0 = 5011
PORT1 = 5012
PORT2 = 5013
AUTHKEY = 'authkey'
def QueueServerClient(HOST, PORT, AUTHKEY):
class QueueManager(SyncManager):
pass
QueueManager.register('get_queue')
QueueManager.register('get_name')
QueueManager.register('get_description')
manager = QueueManager(address = (HOST, PORT), authkey = AUTHKEY)
manager.connect() # This starts the connected client
return manager
# create three connected managers
qc0 = QueueServerClient(HOST, PORT0, AUTHKEY)
qc1 = QueueServerClient(HOST, PORT1, AUTHKEY)
qc2 = QueueServerClient(HOST, PORT2, AUTHKEY)
# Get the queue objects from the clients
q0 = qc0.get_queue()
q1 = qc1.get_queue()
q2 = qc2.get_queue()
# put stuff in the queues
q0.put('some stuff')
q1.put('other stuff')
q2.put({1:123, 2:'abc'})
# check their sizes
print 'q0 size', q0.qsize()
print 'q1 size', q1.qsize()
print 'q2 size', q2.qsize()
# pull some stuff and print it
print q0.get()
print q1.get()
print q2.get()
Adding an additional server to share a dictionary with the information of the running queue servers so that consumers can easily tell what's available where is easy enough using that model. One thing to note, though, is that the shared dictionary requires slightly different syntax than a normal dictionary: dictionary[0] = something
will not work. You need to use dictionary.update([(key, value), (otherkey, othervalue)])
and dictionary.get(key)
syntax, which propagates across to all other clients connected to this dictionary..
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With