实现 WSGI 流服务:(如何检测客户端断开连接)
所以我正在做的是编写一个 WSGI 流服务,它利用包装在迭代器中的队列来实现多播推送。下面是该服务的简化模型:
# this is managed by another thread
def processor_runner():
generator = SerialMessageGenerator()
for message in generator:
for client in Processor.connections:
client.put(message)
# this is managed by twisted's wsgi implementation
def main(environ, start_response):
queue = Queue()
Processor.connections.append(queue)
status = '200 OK'
response_headers = [
('Content-Type', 'application/json'),
('Transfer-Encoding', 'chunked')
]
start_response(status, response_headers)
return iter(queue.get, None)
这与twisted 作为 WSGI 服务器配合得很好(顺便说一句,串行生成器是一个通过进程间队列连接到处理器的单独进程)。我的问题是如何检测客户端何时断开连接并将其从队列中删除?我的想法是将队列作为客户端套接字即(套接字、队列)的元组添加,然后在执行放置之前检查套接字是否仍处于连接状态。但是,我不知道到底要从环境中获取什么。在我将某些东西组合在一起之前,有人有这样做的经验吗?
更新
这是我最终采用的解决方案:
class IterableQueue(Queue):
def __init__(self):
Queue.__init__(self) # Queue is an old style class
ShellProcessor.connections.append(self)
def __iter__(self):
return iter(self.get, None)
def close(self):
self.put(None)
self.task_done()
ShellProcessor.connections.remove(self)
So what I am doing is writing a WSGI streaming service which makes use of a Queue wrapped in an iterator to implement a multicast push. What follows is a simplified model of the service:
# this is managed by another thread
def processor_runner():
generator = SerialMessageGenerator()
for message in generator:
for client in Processor.connections:
client.put(message)
# this is managed by twisted's wsgi implementation
def main(environ, start_response):
queue = Queue()
Processor.connections.append(queue)
status = '200 OK'
response_headers = [
('Content-Type', 'application/json'),
('Transfer-Encoding', 'chunked')
]
start_response(status, response_headers)
return iter(queue.get, None)
And this is working great with twisted as the WSGI server (as an aside, the serial generator is a separate process connected to the processor by an inter process queue). My question is how can I detect when a client disconnects and thus remove it from the queue? My though is adding the queue as a tuple with the client socket i.e. (socket, queue) and then checking if the socket is still connected before I perform the put. However, I don't know exactly what to grab from environ. Does any one have any experience with doing this right before I hack something together?
Updated
Here is the solution I finally went with:
class IterableQueue(Queue):
def __init__(self):
Queue.__init__(self) # Queue is an old style class
ShellProcessor.connections.append(self)
def __iter__(self):
return iter(self.get, None)
def close(self):
self.put(None)
self.task_done()
ShellProcessor.connections.remove(self)
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
在请求完成或中断时,twisted 会调用迭代器上的
.close()
(如果存在)。您可以执行以下操作:其中
ResponseIterator
可以是:twisted calls
.close()
on the iterator if present when the request is finished or interrupted. You could do something like:where
ResponseIterator
could be:阅读:
http://groups.google.com/group/modwsgi/browse_frm/thread /8ebd9aca9d317ac9
有些是 mod_wsgi 特定的,但一般来说,相同的问题适用于任何 WSGI 服务器。
Read:
http://groups.google.com/group/modwsgi/browse_frm/thread/8ebd9aca9d317ac9
Some is mod_wsgi specific, but in general same issues apply to any WSGI server.