beanstalkd 的条件工作队列插入?

发布于 2024-08-15 16:22:42 字数 669 浏览 9 评论 0原文

我正在使用 beanstalkd 的 Perl 客户端。我需要一种简单的方法来避免将相同的工作排队两次。

我需要的东西基本上需要等到有 K 个元素,然后将它们分组在一起。为了实现这一点,我有生产者:

insert item(s) into DB
insert a queue item into beanstalkd

和消费者:

while ( 1 ) {
   beanstalkd.retrieve
   if ( DB items >= K )
       func_to_process_all_items
   kill job
}

这与请求/处理的数量是线性的,但在以下情况下:

insert 1 item
... repeat many times ...
insert 1 item

假设所有这些插入发生在检索作业之前,这将添加 N 个队列项目,并且它会做这样的事情:

check DB, process N items
check DB, no items
... many times ...
check DB, no items

是否有更智能的方法来执行此操作,以便它不会不必要地插入/处理以后的作业请求?

I'm using the Perl client of beanstalkd. I need a simple way to not enqueue the same work twice.

I need something that needs to basically wait until there are K elements, and then groups them together. To accomplish this, I have the producer:

insert item(s) into DB
insert a queue item into beanstalkd

And the consumer:

while ( 1 ) {
   beanstalkd.retrieve
   if ( DB items >= K )
       func_to_process_all_items
   kill job
}

This is linear in the number of requests/processing, but in the case of:

insert 1 item
... repeat many times ...
insert 1 item

Assuming all these insertions happened before a job was retrieved, this would add N queue items, and it would do something as such:

check DB, process N items
check DB, no items
... many times ...
check DB, no items

Is there a smarter way to do this so that it does not insert/process the later job requests unnecessarily?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

倾城月光淡如水﹏ 2024-08-22 16:22:42

我有一个相关的要求。我只想在几分钟内处理一次特定作业,但生产者可以对同一作业的多个实例进行排队。我使用 memcache 来存储作业标识符,并将密钥的过期时间设置为几分钟。

当工作人员尝试将作业标识符添加到内存缓存时,只有第一个会成功 - 如果添加作业 ID 失败,工作人员将删除该作业。几分钟后,密钥从内存缓存过期,作业可以再次处理。

不是特别优雅,但很有效。

I had a related requirement. I only wanted to process a specific job once within a few minutes, but the producer could queue several instances of the same job. I used memcache to store the job identifier and set the expiry of the key to just a few minutes.

When a worker tried to add the job identifier to memcache, only the first would succeed - on failure to add the job id, the worker would delete the job. After a few minutes, the key expires from memcache and the job can be processed again.

Not particularly elegant, but it works.

与酒说心事 2024-08-22 16:22:42

这对你有用吗?:

  1. 创建两个管“缓冲区”和“实时”。您的生产商总是只添加到“缓冲”管中。
  2. 创建两个工作进程,一个监视“缓冲区”,另一个监视调用阻塞 reserve() 调用的“实时”
  3. 工作进程每当“缓冲区”工作进程以保留状态返回时,如果数量较少,它就会埋葬作业比 K 项。如果正好有K个,那么它就会“踢掉”所有K个作业,并将它们转移到“活”管中。
  4. “实时”观察者现在将返回其自己的 reserve()

您只需要注意作业永远不会从隐藏状态返回到缓冲区队列。执行此操作的一种安全方法可能是将其删除,然后将其添加到活动中。

两个单独的队列只是为了更干净的分离。您可以对单个队列执行相同的操作,方法是埋藏所有作业,直到出现 K-1 个作业,然后在第 K 个作业到达时,将所有作业踢活。

Will this work for you?:

  1. Create two Tubes "buffer" and "live". Your producer always only adds to the "buffer" tube.
  2. Create two workers one watches the "buffer" and the other watches the "live" that call the blocking reserve() call
  3. Whenever the "buffer" worker returns on reserve, it buries the job if there are less than K items. If there are exactly K, then it "kicks" all K jobs and transfers them to the "live" tube.
  4. The "live" watcher will now return on its own reserve()

You just need to take care that a job does not ever return to the buffer queue from the buried state. A failsafe way to do this might be to delete it and then add it to live.

The two separate queues are only for cleaner separation. You could do the same with a single queue by burying everyjob until there are K-1 and then on the arrival of the K-th job, kicking all of them live.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文