使用 python 中的多进程模块运行多个 Gearman 进程

发布于 2024-12-22 14:25:09 字数 1090 浏览 2 评论 0原文

我想使用 python 的多处理模块运行多个 gearman 工作进程,但该进程似乎是以顺序模式执行的。如果我在多个终端中运行单独的worker.py 程序,那么它工作正常。但我想减轻在如此多的终端中手动指定worker.py 的负担。有什么替代方案吗?

import sys , os , simplejson
from fabric import *
from fabric.api import *
import gearman
from gearman import GearmanWorker
from multiprocessing import Pool


##--Global Variables--##
#Spawing minimun 5 worker threads for Gearman



#executing the Job. gmJob consist of dict[host , cmd , pass] 
def exe_job(gmWorker , gmJob ):
 print " the worker process is " , os.getpid()
 d = simplejson.loads(gmJob.data)
 env.host_string = d['host'] 
 env.password = d['pass']  #will store the password .
 cmds = d['cmd']
 print cmds
 for i in cmds:
  sudo (i )  # using fabric functions to ssh into system  
 return "job sucessfull"

def start_exe():
 #woker node id to be specified in here
 gm_worker = gearman.GearmanWorker(['localhost:4730'])
 #gm_worker.set_client_id('client1')
 gm_worker.register_task('exe_job',exe_job)
 gm_worker.work()


if __name__ == '__main__':
 p = Pool(5)
 result = p.apply_async(start_exe)
 print result.get()

I want to run multiple gearman worker process using python`s multiprocessing module , but it seems like the process is executing in sequential mode. If i run individual worker.py programs in several terminals , then it is working fine. But i want to lessen the burden of manually specifying worker.py in soo many terminals. Is there any alternative for this ?

import sys , os , simplejson
from fabric import *
from fabric.api import *
import gearman
from gearman import GearmanWorker
from multiprocessing import Pool


##--Global Variables--##
#Spawing minimun 5 worker threads for Gearman



#executing the Job. gmJob consist of dict[host , cmd , pass] 
def exe_job(gmWorker , gmJob ):
 print " the worker process is " , os.getpid()
 d = simplejson.loads(gmJob.data)
 env.host_string = d['host'] 
 env.password = d['pass']  #will store the password .
 cmds = d['cmd']
 print cmds
 for i in cmds:
  sudo (i )  # using fabric functions to ssh into system  
 return "job sucessfull"

def start_exe():
 #woker node id to be specified in here
 gm_worker = gearman.GearmanWorker(['localhost:4730'])
 #gm_worker.set_client_id('client1')
 gm_worker.register_task('exe_job',exe_job)
 gm_worker.work()


if __name__ == '__main__':
 p = Pool(5)
 result = p.apply_async(start_exe)
 print result.get()

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

冧九 2024-12-29 14:25:09

我也找不到这个问题的答案,所以我深入研究并发现你基本上必须使用队列来跟踪你打开的进程和关闭的进程(对于 gearman 工作人员来说是一个错误) 。无论如何,我将它构建成一个模块并将其发布到 pypi 上。这仍然是一项正在进行的工作,但我将尝试在第二天左右添加文档和示例:

我还包括通过 json 进行通信的客户端和工作人员类(我提到这一点是因为您的示例似乎使用 json)。

让我知道你的想法。我绝对可以多看几眼来发现错误,或者告诉我我在代码中做了什么完全疯狂的事情。

I couldn't find an answer to this, either, so I dug into things and figured out that you basically have to use a queue to track which processes you have open and which have closed (an error in the case of a gearman worker). Anyway, I built it into a module and posted it on pypi. It's still very much a work in progress but I'll try to get to adding documentation and examples in the next day or so:

I also included client and worker classes that communicate via json (I mention this because your example seems to use json).

Let me know what you think. I could definitely use a few more sets of eyes on this to find bugs or tell me where I did something completely crazy with the code.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文