使用 multiprocessing.Managers 构建一个简单的远程调度程序
考虑以下代码:
服务器:
import sys
from multiprocessing.managers import BaseManager, BaseProxy, Process
def baz(aa) :
l = []
for i in range(3) :
l.append(aa)
return l
class SolverManager(BaseManager): pass
class MyProxy(BaseProxy): pass
manager = SolverManager(address=('127.0.0.1', 50000), authkey='mpm')
manager.register('solver', callable=baz, proxytype=MyProxy)
def serve_forever(server):
try :
server.serve_forever()
except KeyboardInterrupt:
pass
def runpool(n):
server = manager.get_server()
workers = []
for i in range(int(n)):
Process(target=serve_forever, args=(server,)).start()
if __name__ == '__main__':
runpool(sys.argv[1])
客户端:
import sys
from multiprocessing.managers import BaseManager, BaseProxy
import multiprocessing, logging
class SolverManager(BaseManager): pass
class MyProxy(BaseProxy): pass
def main(args) :
SolverManager.register('solver')
m = SolverManager(address=('127.0.0.1', 50000), authkey='mpm')
m.connect()
print m.solver(args[1])._getvalue()
if __name__ == '__main__':
sys.exit(main(sys.argv))
如果我仅使用一个进程作为 python server.py 1
运行服务器
,那么客户端将按预期工作。但是,如果我生成两个进程(python server.py 2
)来侦听连接,我会收到一个令人讨厌的错误:
$python client.py ping
Traceback (most recent call last):
File "client.py", line 24, in <module>
sys.exit(main(sys.argv))
File "client.py", line 21, in main
print m.solver(args[1])._getvalue()
File "/usr/lib/python2.6/multiprocessing/managers.py", line 637, in temp
authkey=self._authkey, exposed=exp
File "/usr/lib/python2.6/multiprocessing/managers.py", line 894, in AutoProxy
incref=incref)
File "/usr/lib/python2.6/multiprocessing/managers.py", line 700, in __init__
self._incref()
File "/usr/lib/python2.6/multiprocessing/managers.py", line 750, in _incref
dispatch(conn, None, 'incref', (self._id,))
File "/usr/lib/python2.6/multiprocessing/managers.py", line 79, in dispatch
raise convert_to_error(kind, result)
multiprocessing.managers.RemoteError:
---------------------------------------------------------------------------
Traceback (most recent call last):
File "/usr/lib/python2.6/multiprocessing/managers.py", line 181, in handle_request
result = func(c, *args, **kwds)
File "/usr/lib/python2.6/multiprocessing/managers.py", line 402, in incref
self.id_to_refcount[ident] += 1
KeyError: '7fb51084c518'
---------------------------------------------------------------------------
我的想法非常简单。我想创建一个服务器,该服务器将产生许多工作人员,这些工作人员将共享相同的套接字并独立处理请求。也许我在这里使用了错误的工具?
目标是构建一个 3 层结构,其中所有请求都通过 http 服务器处理,然后分派到集群中的节点,并通过多处理管理器从节点分派到工作人员......
有一个公共服务器,每台机器一个节点每台机器上的工作人员数量取决于核心数量...我知道我可以使用更复杂的库,但对于这样一个简单的任务(我只是在这里进行原型设计)我只会使用多处理库..这是可能的还是我应该直接探索其他解决方案?我觉得我已经很接近在这里工作了……谢谢。
Consider the following code :
Server :
import sys
from multiprocessing.managers import BaseManager, BaseProxy, Process
def baz(aa) :
l = []
for i in range(3) :
l.append(aa)
return l
class SolverManager(BaseManager): pass
class MyProxy(BaseProxy): pass
manager = SolverManager(address=('127.0.0.1', 50000), authkey='mpm')
manager.register('solver', callable=baz, proxytype=MyProxy)
def serve_forever(server):
try :
server.serve_forever()
except KeyboardInterrupt:
pass
def runpool(n):
server = manager.get_server()
workers = []
for i in range(int(n)):
Process(target=serve_forever, args=(server,)).start()
if __name__ == '__main__':
runpool(sys.argv[1])
Client :
import sys
from multiprocessing.managers import BaseManager, BaseProxy
import multiprocessing, logging
class SolverManager(BaseManager): pass
class MyProxy(BaseProxy): pass
def main(args) :
SolverManager.register('solver')
m = SolverManager(address=('127.0.0.1', 50000), authkey='mpm')
m.connect()
print m.solver(args[1])._getvalue()
if __name__ == '__main__':
sys.exit(main(sys.argv))
If I run the server using only one process as python server.py 1
then the client works as expected. But if I spawn two processes (python server.py 2
) listening for connections, I get a nasty error :
$python client.py ping
Traceback (most recent call last):
File "client.py", line 24, in <module>
sys.exit(main(sys.argv))
File "client.py", line 21, in main
print m.solver(args[1])._getvalue()
File "/usr/lib/python2.6/multiprocessing/managers.py", line 637, in temp
authkey=self._authkey, exposed=exp
File "/usr/lib/python2.6/multiprocessing/managers.py", line 894, in AutoProxy
incref=incref)
File "/usr/lib/python2.6/multiprocessing/managers.py", line 700, in __init__
self._incref()
File "/usr/lib/python2.6/multiprocessing/managers.py", line 750, in _incref
dispatch(conn, None, 'incref', (self._id,))
File "/usr/lib/python2.6/multiprocessing/managers.py", line 79, in dispatch
raise convert_to_error(kind, result)
multiprocessing.managers.RemoteError:
---------------------------------------------------------------------------
Traceback (most recent call last):
File "/usr/lib/python2.6/multiprocessing/managers.py", line 181, in handle_request
result = func(c, *args, **kwds)
File "/usr/lib/python2.6/multiprocessing/managers.py", line 402, in incref
self.id_to_refcount[ident] += 1
KeyError: '7fb51084c518'
---------------------------------------------------------------------------
My idea is pretty simple. I want to create a server that will spawn a number of workers that will share the same socket and handle requests independently. Maybe I'm using the wrong tool here ?
The goal is to build a 3-tier structure where all requests are handled via an http server and then dispatched to nodes sitting in a cluster and from nodes to workers via the multiprocessing managers...
There is one public server, one node per machine and x number of workers on each machine depending on the number of cores... I know I can use a more sophisticated library, but for such a simple task (I'm just prototyping here) I would just use the multiprocessing library... Is this possible or I should explore directly other solutions ? I feel I'm very close to have something working here ... thanks.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您正在尝试发明一个轮子,许多人之前已经发明过。
在我看来,您正在寻找服务器将任务分派到的任务队列,并且您的工作人员执行该任务。
我建议你看看 Celery< /a>.
You're trying to invent a wheel, many have invented before.
It sounds to me that you're looking for task queue where your server dispatches tasks to, and your workers execute this tasks.
I would recommend you to have a look at Celery.