So what I am doing is writing a WSGI streaming service which makes use of a Queue wrapped in an iterator to implement a multicast push. What follows is a simplified model of the service:
# this is managed by another thread
def processor_runner():
generator = SerialMessageGenerator()
for message in generator:
for client in Processor.connections:
client.put(message)
# this is managed by twisted's wsgi implementation
def main(environ, start_response):
queue = Queue()
Processor.connections.append(queue)
status = '200 OK'
response_headers = [
('Content-Type', 'application/json'),
('Transfer-Encoding', 'chunked')
]
start_response(status, response_headers)
return iter(queue.get, None)
And this is working great with twisted as the WSGI server (as an aside, the serial generator is a separate process connected to the processor by an inter process queue). My question is how can I detect when a client disconnects and thus remove it from the queue? My though is adding the queue as a tuple with the client socket i.e. (socket, queue) and then checking if the socket is still connected before I perform the put. However, I don't know exactly what to grab from environ. Does any one have any experience with doing this right before I hack something together?
Updated
Here is the solution I finally went with:
class IterableQueue(Queue):
def __init__(self):
Queue.__init__(self) # Queue is an old style class
ShellProcessor.connections.append(self)
def __iter__(self):
return iter(self.get, None)
def close(self):
self.put(None)
self.task_done()
ShellProcessor.connections.remove(self)
twisted calls .close()
on the iterator if present when the request is finished or interrupted. You could do something like:
# ...
start_response(status, response_headers)
return ResponseIterator(iter(queue.get, None),
on_finish=lambda: Processor.connections.remove(queue))
where ResponseIterator
could be:
class ResponseIterator:
def __init__(self, iterator, on_finish=None):
self.iterator = iterator
self.on_finish = on_finish
def __iter__(self):
return self
def next(self):
return next(self.iterator)
def close(self):
if self.on_finish is not None:
self.on_finish()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With