I have implemented a "network service" superclass in Python, like so:
class NetworkService (threading.Thread):
"""
Implements a multithreaded network service
"""
# Class Globals (ie, C++ class static)
ZmqContext = zmq.Context.instance()
# ==========================================================================================
# Class Mechanics
# ==========================================================================================
def __init__(self, name, port, conc=4):
"""
Network service initilization
"""
self.service_name = name
self.service_port = port
self.concurrency = conc
self.handler_url = "inproc://" + name
self.client_url = "tcp://*:" + str(port)
self.shutdown = True # Cleared in run()
self.thread = {}
super(NetworkService, self).__init__()
# ==========================================================================================
# Class Operation
# ==========================================================================================
def run(self): # Called [only] by threading.Thread.start()
self.shutdown = False
clients = NetworkService.ZmqContext.socket(zmq.ROUTER)
clients.bind(self.client_url)
handlers = NetworkService.ZmqContext.socket(zmq.DEALER)
handlers.bind(self.handler_url)
for i in range(self.concurrency):
self.thread[i] = threading.Thread(target = self.handler, name = self.service_name + str(i))
self.thread[i].daemon = True
self.thread[i].start()
zmq.proxy(clients, handlers)
clients.close()
handlers.close()
def terminate(self):
self.shutdown = True
def handler(self):
socket = NetworkService.ZmqContext.socket(zmq.REP)
socket.connect(self.handler_url)
iam = repr(get_pids()[2])
log.info("nsh@%s is up", iam)
while not self.shutdown:
string = socket.recv()
toe = datetime.utcnow()
command = pickle.loads(string)
reply = self.protocol(command)
string = pickle.dumps(reply)
socket.send(string)
def protocol(self, command): # Override this in subclass
reply = {}
reply["success"] = False
reply["detail"] = "No protocol defined (NetworkService.protocol(...) not overridden)"
if "ident" in command:
reply["ident"] = command["ident"]
return reply
The problem is with the line "zmq.proxy(clients, handlers)
": I can't seem to get it to end. Ever. If all of the handlers terminate, still zmq.proxy()
does not return. I don't mind creating an independent thread to run the proxy, but this is in a daemon that I'd like to be able to shut down cleanly.
I read in the documentation that this is correct behavior for zmq.proxy
, but it doesn't seem so correct to me ;-}.
Can anyone recommend an approximate equivalent that can be shut down once the handler threads have terminated?
The API basically implies that you have to terminate the context. You can run your proxy in a separate thread with a shared context then terminate it and except zmq.ContextTerminated.
try:
zmq.proxy(self.frontend, self.backend)
except zmq.ContextTerminated:
# cleanup if needed
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With