Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Implementing a WSGI Streaming Service: (how to detect client disconnects)

So what I am doing is writing a WSGI streaming service which makes use of a Queue wrapped in an iterator to implement a multicast push. What follows is a simplified model of the service:

# this is managed by another thread
def processor_runner():
    generator = SerialMessageGenerator()
    for message in generator:
      for client in Processor.connections:
          client.put(message)

# this is managed by twisted's wsgi implementation
def main(environ, start_response):
    queue = Queue()
    Processor.connections.append(queue)
    status = '200 OK'
    response_headers = [
        ('Content-Type', 'application/json'),
        ('Transfer-Encoding', 'chunked')
    ]
    start_response(status, response_headers)
    return iter(queue.get, None)

And this is working great with twisted as the WSGI server (as an aside, the serial generator is a separate process connected to the processor by an inter process queue). My question is how can I detect when a client disconnects and thus remove it from the queue? My though is adding the queue as a tuple with the client socket i.e. (socket, queue) and then checking if the socket is still connected before I perform the put. However, I don't know exactly what to grab from environ. Does any one have any experience with doing this right before I hack something together?

Updated

Here is the solution I finally went with:

class IterableQueue(Queue):

def __init__(self):
    Queue.__init__(self) # Queue is an old style class
    ShellProcessor.connections.append(self)

def __iter__(self):
    return iter(self.get, None)

def close(self):
    self.put(None)
    self.task_done()
    ShellProcessor.connections.remove(self)
like image 649
Bashwork Avatar asked Nov 13 '22 13:11

Bashwork


1 Answers

twisted calls .close() on the iterator if present when the request is finished or interrupted. You could do something like:

# ...
start_response(status, response_headers)
return ResponseIterator(iter(queue.get, None),
     on_finish=lambda: Processor.connections.remove(queue))

where ResponseIterator could be:

class ResponseIterator:

  def __init__(self, iterator, on_finish=None):
      self.iterator = iterator
      self.on_finish = on_finish

  def __iter__(self):
      return self

  def next(self):
      return next(self.iterator)

  def close(self):
      if self.on_finish is not None:
         self.on_finish()
like image 146
jfs Avatar answered Dec 25 '22 12:12

jfs