Python 与 multiprocessing.Manager 共享网络套接字
我目前正在编写一个nginx代理服务器模块,前面有一个请求队列,因此当nginx后面的服务器无法处理请求时(nginx被配置为负载均衡器),请求不会被丢弃。
我使用的
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
想法是在处理请求之前将请求放入队列中。我知道 multiprocessing.Queue 仅支持简单对象,不能支持原始套接字,所以我尝试使用 multiprocess.Manager 制作共享字典。 Manager也使用socket进行连接,所以这个方法也失败了。有没有办法在进程之间共享网络套接字? 这是代码中有问题的部分:
class ProxyServer(Threader, HTTPServer):
def __init__(self, server_address, bind_and_activate=True):
HTTPServer.__init__(self, server_address, ProxyHandler,
bind_and_activate)
self.manager = multiprocessing.Manager()
self.conn_dict = self.manager.dict()
self.ticket_queue = multiprocessing.Queue(maxsize= 10)
self._processes = []
self.add_worker(5)
def process_request(self, request, client):
stamp = time.time()
print "We are processing"
self.conn_dict[stamp] = (request, client) # the program crashes here
#Exception happened during processing of request from ('172.28.192.34', 49294)
#Traceback (most recent call last):
# File "/usr/lib64/python2.6/SocketServer.py", line 281, in _handle_request_noblock
# self.process_request(request, client_address)
# File "./nxproxy.py", line 157, in process_request
# self.conn_dict[stamp] = (request, client)
# File "<string>", line 2, in __setitem__
# File "/usr/lib64/python2.6/multiprocessing/managers.py", line 725, in _callmethod
# conn.send((self._id, methodname, args, kwds))
#TypeError: expected string or Unicode object, NoneType found
self.ticket_queue.put(stamp)
def add_worker(self, number_of_workers):
for worker in range(number_of_workers):
print "Starting worker %d" % worker
proc = multiprocessing.Process(target=self._worker, args = (self.conn_dict,))
self._processes.append(proc)
proc.start()
def _worker(self, conn_dict):
while 1:
ticket = self.ticket_queue.get()
print conn_dict
a=0
while a==0:
try:
request, client = conn_dict[ticket]
a=1
except Exception:
pass
print "We are threading!"
self.threader(request, client)
I am currently writing a nginx proxy server module with a Request queue in front, so the requests are not dropped when the servers behind the nginx can't handle the requests (nginx is configured as a load balancer).
I am using
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
The idea is to put the request in a queue before handling them. I know multiprocessing.Queue supports only simple object and cannot support raw sockets, so I tried using a multiprocess.Manager to make a shared dictionary. The Manager also uses sockets for connection, so this method failed too. Is there a way to share network sockets between processes?
Here is the problematic part of the code:
class ProxyServer(Threader, HTTPServer):
def __init__(self, server_address, bind_and_activate=True):
HTTPServer.__init__(self, server_address, ProxyHandler,
bind_and_activate)
self.manager = multiprocessing.Manager()
self.conn_dict = self.manager.dict()
self.ticket_queue = multiprocessing.Queue(maxsize= 10)
self._processes = []
self.add_worker(5)
def process_request(self, request, client):
stamp = time.time()
print "We are processing"
self.conn_dict[stamp] = (request, client) # the program crashes here
#Exception happened during processing of request from ('172.28.192.34', 49294)
#Traceback (most recent call last):
# File "/usr/lib64/python2.6/SocketServer.py", line 281, in _handle_request_noblock
# self.process_request(request, client_address)
# File "./nxproxy.py", line 157, in process_request
# self.conn_dict[stamp] = (request, client)
# File "<string>", line 2, in __setitem__
# File "/usr/lib64/python2.6/multiprocessing/managers.py", line 725, in _callmethod
# conn.send((self._id, methodname, args, kwds))
#TypeError: expected string or Unicode object, NoneType found
self.ticket_queue.put(stamp)
def add_worker(self, number_of_workers):
for worker in range(number_of_workers):
print "Starting worker %d" % worker
proc = multiprocessing.Process(target=self._worker, args = (self.conn_dict,))
self._processes.append(proc)
proc.start()
def _worker(self, conn_dict):
while 1:
ticket = self.ticket_queue.get()
print conn_dict
a=0
while a==0:
try:
request, client = conn_dict[ticket]
a=1
except Exception:
pass
print "We are threading!"
self.threader(request, client)
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
您可以使用 multiprocessing.reduction 在进程之间传输连接和套接字对象
示例代码
U can use multiprocessing.reduction to transfer the connection and socket objects between processes
Example Code
看起来您需要在进程之间传递文件描述符(假设这里是 Unix,对 Windows 没有任何线索)。我从未在 Python 中这样做过,但这里是 python-passfd 项目的链接您可能想检查一下。
Looks like you need to pass file descriptors between processes (assuming Unix here, no clue about Windows). I've never done this in Python, but here is link to python-passfd project that you might want to check.
您可以查看此代码 - https://gist.github.com/sunilmallya/4662837 其中是
multiprocessing.reduction 套接字服务器,其父处理在接受连接后将连接传递给客户端
You can look at this code - https://gist.github.com/sunilmallya/4662837 which is
multiprocessing.reduction socket server with parent processing passing connections to client after accepting connections