实现 WSGI 流服务:(如何检测客户端断开连接)

发布于 2024-12-19 12:48:11 字数 1235 浏览 3 评论 0原文

所以我正在做的是编写一个 WSGI 流服务,它利用包装在迭代器中的队列来实现多播推送。下面是该服务的简化模型:

# this is managed by another thread
def processor_runner():
    generator = SerialMessageGenerator()
    for message in generator:
      for client in Processor.connections:
          client.put(message)

# this is managed by twisted's wsgi implementation
def main(environ, start_response):
    queue = Queue()
    Processor.connections.append(queue)
    status = '200 OK'
    response_headers = [
        ('Content-Type', 'application/json'),
        ('Transfer-Encoding', 'chunked')
    ]
    start_response(status, response_headers)
    return iter(queue.get, None)

这与twisted 作为 WSGI 服务器配合得很好(顺便说一句,串行生成器是一个通过进程间队列连接到处理器的单独进程)。我的问题是如何检测客户端何时断开连接并将其从队列中删除?我的想法是将队列作为客户端套接字即(套接字、队列)的元组添加,然后在执行放置之前检查套接字是否仍处于连接状态。但是,我不知道到底要从环境中获取什么。在我将某些东西组合在一起之前,有人有这样做的经验吗?

更新

这是我最终采用的解决方案:

class IterableQueue(Queue):

def __init__(self):
    Queue.__init__(self) # Queue is an old style class
    ShellProcessor.connections.append(self)

def __iter__(self):
    return iter(self.get, None)

def close(self):
    self.put(None)
    self.task_done()
    ShellProcessor.connections.remove(self)

So what I am doing is writing a WSGI streaming service which makes use of a Queue wrapped in an iterator to implement a multicast push. What follows is a simplified model of the service:

# this is managed by another thread
def processor_runner():
    generator = SerialMessageGenerator()
    for message in generator:
      for client in Processor.connections:
          client.put(message)

# this is managed by twisted's wsgi implementation
def main(environ, start_response):
    queue = Queue()
    Processor.connections.append(queue)
    status = '200 OK'
    response_headers = [
        ('Content-Type', 'application/json'),
        ('Transfer-Encoding', 'chunked')
    ]
    start_response(status, response_headers)
    return iter(queue.get, None)

And this is working great with twisted as the WSGI server (as an aside, the serial generator is a separate process connected to the processor by an inter process queue). My question is how can I detect when a client disconnects and thus remove it from the queue? My though is adding the queue as a tuple with the client socket i.e. (socket, queue) and then checking if the socket is still connected before I perform the put. However, I don't know exactly what to grab from environ. Does any one have any experience with doing this right before I hack something together?

Updated

Here is the solution I finally went with:

class IterableQueue(Queue):

def __init__(self):
    Queue.__init__(self) # Queue is an old style class
    ShellProcessor.connections.append(self)

def __iter__(self):
    return iter(self.get, None)

def close(self):
    self.put(None)
    self.task_done()
    ShellProcessor.connections.remove(self)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

べ繥欢鉨o。 2024-12-26 12:48:12

在请求完成或中断时,twisted 会调用迭代器上的 .close()(如果存在)。您可以执行以下操作:

# ...
start_response(status, response_headers)
return ResponseIterator(iter(queue.get, None),
     on_finish=lambda: Processor.connections.remove(queue))

其中 ResponseIterator 可以是:

class ResponseIterator:

  def __init__(self, iterator, on_finish=None):
      self.iterator = iterator
      self.on_finish = on_finish

  def __iter__(self):
      return self

  def next(self):
      return next(self.iterator)

  def close(self):
      if self.on_finish is not None:
         self.on_finish()

twisted calls .close() on the iterator if present when the request is finished or interrupted. You could do something like:

# ...
start_response(status, response_headers)
return ResponseIterator(iter(queue.get, None),
     on_finish=lambda: Processor.connections.remove(queue))

where ResponseIterator could be:

class ResponseIterator:

  def __init__(self, iterator, on_finish=None):
      self.iterator = iterator
      self.on_finish = on_finish

  def __iter__(self):
      return self

  def next(self):
      return next(self.iterator)

  def close(self):
      if self.on_finish is not None:
         self.on_finish()
已下线请稍等 2024-12-26 12:48:12

阅读:

http://groups.google.com/group/modwsgi/browse_frm/thread /8ebd9aca9d317ac9

有些是 mod_wsgi 特定的,但一般来说,相同的问题适用于任何 WSGI 服务器。

Read:

http://groups.google.com/group/modwsgi/browse_frm/thread/8ebd9aca9d317ac9

Some is mod_wsgi specific, but in general same issues apply to any WSGI server.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文