在作业处理期间改变 python-gearman 工作任务

发布于 2024-10-25 02:46:48 字数 1576 浏览 1 评论 0原文

我正在尝试更改 python-gearman 工作人员在其工作周期中可用的任务。我这样做的原因是为了让我对我的工作进程有一点控制,并允许它们从数据库重新加载。我需要每个工作人员定期重新加载,但我不想简单地终止进程,并且我希望服务始终可用,这意味着我必须批量重新加载。因此,我将重新加载 4 个工作人员,同时另外 4 个工作人员可用于处理,然后重新加载接下来的 4 个工作人员。

过程:

  1. 启动重新加载过程4次。
    1. 取消注册重新加载进程
    2. 重新加载数据集
    3. 注册一个finishReload任务
    4. 返回
  2. 重复步骤 1,直到没有注册了 reload 任务的工作人员。
  3. 启动 finishReload(1) 任务,直到没有可用的工作线程执行 finishReload 任务。

(1) finishReload任务注销finishReload任务并注册reload任务然后返回。

现在,我遇到的问题是,当我更改工作进程可用的任务时,作业会失败。 gearmand 日志中没有错误消息或异常,只有“ERROR”。这是一个复制该问题的快速程序。

工人

import gearman 
def reversify(gmWorker, gmJob): 
        return "".join(gmJob.data[::-1]) 
def strcount(gmWorker, gmJob): 
        gmWorker.unregister_task('reversify')  # problem line 
        return str(len(gmJob.data)) 
worker = gearman.GearmanWorker(['localhost:4730']) 
worker.register_task('reversify', reversify) 
worker.register_task('strcount', strcount) 
while True: 
        worker.work() 

客户

import gearman 
client = gearman.GearmanClient(['localhost:4730']) 
a = client.submit_job('reversify', 'spam and eggs') 
print a.result 
>>> sgge dna maps 

a = client.submit_job('strcount', 'spam and eggs') 
...

如果有任何我可以解释的事情,请告诉我。

编辑:我知道有人会要求查看我提到的日志。我也已将这个问题发布到 Google 上的 gearman 群组,并且日志可用那里

I'm attempting to change the tasks available on a python-gearman worker during its work cycle. My reason for doing this is to allow me a little bit of control over my worker processes and allowing them to reload from a database. I need every worker to reload at regular intervals, but I don't want to simply kill the processes, and I want the service to be constantly available which means that I have to reload in batches. So I would have 4 workers reloading while another 4 workers are available to process, and then reload the next 4 workers.

Process:

  1. Start reload process 4 times.
    1. unregister the reload process
    2. reload the dataset
    3. register a finishReload task
    4. return
  2. Repeat step 1 until there are no workers with the reload task registered.
  3. Start finishReload(1) task until there are no workers with the finishReload task available.

(1) the finishReload task unregisters the finishReload task and registers the reload task and then returns.

Now, the problem that I'm running into is that the job fails when I change the tasks that are available to the worker process. There are no error messages or exceptions, just an "ERROR" in the gearmand log. Here's a quick program that replicates the problem.

WORKER

import gearman 
def reversify(gmWorker, gmJob): 
        return "".join(gmJob.data[::-1]) 
def strcount(gmWorker, gmJob): 
        gmWorker.unregister_task('reversify')  # problem line 
        return str(len(gmJob.data)) 
worker = gearman.GearmanWorker(['localhost:4730']) 
worker.register_task('reversify', reversify) 
worker.register_task('strcount', strcount) 
while True: 
        worker.work() 

CLIENT

import gearman 
client = gearman.GearmanClient(['localhost:4730']) 
a = client.submit_job('reversify', 'spam and eggs') 
print a.result 
>>> sgge dna maps 

a = client.submit_job('strcount', 'spam and eggs') 
...

Please let me know if there are anything things that I can elucidate.

EDIT: I know that someone will ask to see the log I mentioned. I've posted this question to the gearman group on Google as well, and log is available there.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

似最初 2024-11-01 02:46:48

乍一看,问题似乎在于您正在开始一项作业,然后在作业完成之前从作业服务器取消注册工作人员执行该作业的能力。

At a quick glance, the problem would seem to be that you are starting a job, then de-registering the workers ability to do that job from the job server before its finished.

违心° 2024-11-01 02:46:48

看起来子类化 GearmanWorker 类并添加一些标志可以解决这个问题。我需要先让作业完成,然后再开始从工作线程向服务器发出新命令,这似乎会中断当前作业。因此,如果我们覆盖 on_job_complete 函数,我们可以检查启用/禁用标志,并在发出 send_job_complete 命令后对这些标志采取行动。新的工人计划如下:

WORKER

import gearman

def reversify(gmWorker, gmJob):
        return "".join(gmJob.data[::-1])

def enable_reversify(gmWorker, gmJob):
        myWorker.enableReversify = 1
        return 'OK'

def strcount(gmWorker, gmJob):
        myWorker.enableReversify = -1
        return str(len(gmJob.data))

class myWorker(gearman.GearmanWorker):

        enableReversify = 0 # 0 = do nothing, -1 = turn off, 1 = turn on

        def on_job_complete(self, current_job, job_result):
                self.send_job_complete(current_job, job_result)
                ### check the flag here and enable or disable tasks ###
                if myWorker.enableReversify == -1:
                        self.unregister_task('reversify')
                if myWorker.enableReversify == 1:
                        self.register_task('reversify', reversify)
                myWorker.enableReversify = 0 # reset the flag
                return True

worker = myWorker(['localhost:4730']) 
worker.register_task('reversify', reversify)
worker.register_task('strcount', strcount)
worker.register_task('enableReversify', enable_reversify)

while True:
        worker.work() 

It looks like subclassing the GearmanWorker class and adding a few flags can work around this issue. I need to allow the job to complete before I start issuing new commands from the worker to the server, which seems to interrupt the current job. So if we overwrite the on_job_complete function we can check for the enable/disable flag and act on those after we issue the send_job_complete command. The new worker program follows:

WORKER

import gearman

def reversify(gmWorker, gmJob):
        return "".join(gmJob.data[::-1])

def enable_reversify(gmWorker, gmJob):
        myWorker.enableReversify = 1
        return 'OK'

def strcount(gmWorker, gmJob):
        myWorker.enableReversify = -1
        return str(len(gmJob.data))

class myWorker(gearman.GearmanWorker):

        enableReversify = 0 # 0 = do nothing, -1 = turn off, 1 = turn on

        def on_job_complete(self, current_job, job_result):
                self.send_job_complete(current_job, job_result)
                ### check the flag here and enable or disable tasks ###
                if myWorker.enableReversify == -1:
                        self.unregister_task('reversify')
                if myWorker.enableReversify == 1:
                        self.register_task('reversify', reversify)
                myWorker.enableReversify = 0 # reset the flag
                return True

worker = myWorker(['localhost:4730']) 
worker.register_task('reversify', reversify)
worker.register_task('strcount', strcount)
worker.register_task('enableReversify', enable_reversify)

while True:
        worker.work() 
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文