beanstalkd 的条件工作队列插入?
我正在使用 beanstalkd 的 Perl 客户端。我需要一种简单的方法来避免将相同的工作排队两次。
我需要的东西基本上需要等到有 K 个元素,然后将它们分组在一起。为了实现这一点,我有生产者:
insert item(s) into DB
insert a queue item into beanstalkd
和消费者:
while ( 1 ) {
beanstalkd.retrieve
if ( DB items >= K )
func_to_process_all_items
kill job
}
这与请求/处理的数量是线性的,但在以下情况下:
insert 1 item
... repeat many times ...
insert 1 item
假设所有这些插入发生在检索作业之前,这将添加 N 个队列项目,并且它会做这样的事情:
check DB, process N items
check DB, no items
... many times ...
check DB, no items
是否有更智能的方法来执行此操作,以便它不会不必要地插入/处理以后的作业请求?
I'm using the Perl client of beanstalkd. I need a simple way to not enqueue the same work twice.
I need something that needs to basically wait until there are K elements, and then groups them together. To accomplish this, I have the producer:
insert item(s) into DB
insert a queue item into beanstalkd
And the consumer:
while ( 1 ) {
beanstalkd.retrieve
if ( DB items >= K )
func_to_process_all_items
kill job
}
This is linear in the number of requests/processing, but in the case of:
insert 1 item
... repeat many times ...
insert 1 item
Assuming all these insertions happened before a job was retrieved, this would add N queue items, and it would do something as such:
check DB, process N items
check DB, no items
... many times ...
check DB, no items
Is there a smarter way to do this so that it does not insert/process the later job requests unnecessarily?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
我有一个相关的要求。我只想在几分钟内处理一次特定作业,但生产者可以对同一作业的多个实例进行排队。我使用 memcache 来存储作业标识符,并将密钥的过期时间设置为几分钟。
当工作人员尝试将作业标识符添加到内存缓存时,只有第一个会成功 - 如果添加作业 ID 失败,工作人员将删除该作业。几分钟后,密钥从内存缓存过期,作业可以再次处理。
不是特别优雅,但很有效。
I had a related requirement. I only wanted to process a specific job once within a few minutes, but the producer could queue several instances of the same job. I used memcache to store the job identifier and set the expiry of the key to just a few minutes.
When a worker tried to add the job identifier to memcache, only the first would succeed - on failure to add the job id, the worker would delete the job. After a few minutes, the key expires from memcache and the job can be processed again.
Not particularly elegant, but it works.
这对你有用吗?:
reserve()
调用的“实时”reserve()
您只需要注意作业永远不会从隐藏状态返回到缓冲区队列。执行此操作的一种安全方法可能是将其删除,然后将其添加到活动中。
两个单独的队列只是为了更干净的分离。您可以对单个队列执行相同的操作,方法是埋藏所有作业,直到出现 K-1 个作业,然后在第 K 个作业到达时,将所有作业踢活。
Will this work for you?:
reserve()
callreserve()
You just need to take care that a job does not ever return to the buffer queue from the buried state. A failsafe way to do this might be to delete it and then add it to live.
The two separate queues are only for cleaner separation. You could do the same with a single queue by burying everyjob until there are K-1 and then on the arrival of the K-th job, kicking all of them live.