在作业处理期间改变 python-gearman 工作任务
我正在尝试更改 python-gearman 工作人员在其工作周期中可用的任务。我这样做的原因是为了让我对我的工作进程有一点控制,并允许它们从数据库重新加载。我需要每个工作人员定期重新加载,但我不想简单地终止进程,并且我希望服务始终可用,这意味着我必须批量重新加载。因此,我将重新加载 4 个工作人员,同时另外 4 个工作人员可用于处理,然后重新加载接下来的 4 个工作人员。
过程:
- 启动重新加载过程4次。
- 取消注册
重新加载
进程 - 重新加载数据集
- 注册一个
finishReload
任务 - 返回
- 取消注册
- 重复步骤 1,直到没有注册了
reload
任务的工作人员。 - 启动
finishReload
(1) 任务,直到没有可用的工作线程执行finishReload
任务。
(1) finishReload任务注销finishReload
任务并注册reload
任务然后返回。
现在,我遇到的问题是,当我更改工作进程可用的任务时,作业会失败。 gearmand 日志中没有错误消息或异常,只有“ERROR”。这是一个复制该问题的快速程序。
工人
import gearman
def reversify(gmWorker, gmJob):
return "".join(gmJob.data[::-1])
def strcount(gmWorker, gmJob):
gmWorker.unregister_task('reversify') # problem line
return str(len(gmJob.data))
worker = gearman.GearmanWorker(['localhost:4730'])
worker.register_task('reversify', reversify)
worker.register_task('strcount', strcount)
while True:
worker.work()
客户
import gearman
client = gearman.GearmanClient(['localhost:4730'])
a = client.submit_job('reversify', 'spam and eggs')
print a.result
>>> sgge dna maps
a = client.submit_job('strcount', 'spam and eggs')
...
如果有任何我可以解释的事情,请告诉我。
编辑:我知道有人会要求查看我提到的日志。我也已将这个问题发布到 Google 上的 gearman 群组,并且日志可用那里。
I'm attempting to change the tasks available on a python-gearman worker during its work cycle. My reason for doing this is to allow me a little bit of control over my worker processes and allowing them to reload from a database. I need every worker to reload at regular intervals, but I don't want to simply kill the processes, and I want the service to be constantly available which means that I have to reload in batches. So I would have 4 workers reloading while another 4 workers are available to process, and then reload the next 4 workers.
Process:
- Start reload process 4 times.
- unregister the
reload
process - reload the dataset
- register a
finishReload
task - return
- unregister the
- Repeat step 1 until there are no workers with the
reload
task registered. - Start
finishReload
(1) task until there are no workers with thefinishReload
task available.
(1) the finishReload task unregisters the finishReload
task and registers the reload
task and then returns.
Now, the problem that I'm running into is that the job fails when I change the tasks that are available to the worker process. There are no error messages or exceptions, just an "ERROR" in the gearmand log. Here's a quick program that replicates the problem.
WORKER
import gearman
def reversify(gmWorker, gmJob):
return "".join(gmJob.data[::-1])
def strcount(gmWorker, gmJob):
gmWorker.unregister_task('reversify') # problem line
return str(len(gmJob.data))
worker = gearman.GearmanWorker(['localhost:4730'])
worker.register_task('reversify', reversify)
worker.register_task('strcount', strcount)
while True:
worker.work()
CLIENT
import gearman
client = gearman.GearmanClient(['localhost:4730'])
a = client.submit_job('reversify', 'spam and eggs')
print a.result
>>> sgge dna maps
a = client.submit_job('strcount', 'spam and eggs')
...
Please let me know if there are anything things that I can elucidate.
EDIT: I know that someone will ask to see the log I mentioned. I've posted this question to the gearman group on Google as well, and log is available there.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
乍一看,问题似乎在于您正在开始一项作业,然后在作业完成之前从作业服务器取消注册工作人员执行该作业的能力。
At a quick glance, the problem would seem to be that you are starting a job, then de-registering the workers ability to do that job from the job server before its finished.
看起来子类化 GearmanWorker 类并添加一些标志可以解决这个问题。我需要先让作业完成,然后再开始从工作线程向服务器发出新命令,这似乎会中断当前作业。因此,如果我们覆盖
on_job_complete
函数,我们可以检查启用/禁用标志,并在发出send_job_complete
命令后对这些标志采取行动。新的工人计划如下:WORKER
It looks like subclassing the GearmanWorker class and adding a few flags can work around this issue. I need to allow the job to complete before I start issuing new commands from the worker to the server, which seems to interrupt the current job. So if we overwrite the
on_job_complete
function we can check for the enable/disable flag and act on those after we issue thesend_job_complete
command. The new worker program follows:WORKER