使用delayed_job进行轮询

发布于 2024-10-30 20:15:45 字数 1202 浏览 2 评论 0原文

我有一个过程通常需要几秒钟才能完成,所以我尝试使用elastiated_job 来异步处理它。工作本身运行良好,我的问题是如何对工作进行轮询以查明它是否已完成。

我可以通过简单地将它分配给一个变量来从delayed_job获取一个id:

job = available.delay.dosomething(:var => 1234)

+------+----------+----------+------------+------------+-------------+-----------+-----------+-----------+------------+-------------+
| id   | priority | attempts | handler    | last_error | run_at      | locked_at | failed_at | locked_by | created_at | updated_at  |
+------+----------+----------+------------+------------+-------------+-----------+-----------+-----------+------------+-------------+
| 4037 | 0        | 0        | --- !ru... |            | 2011-04-... |           |           |           | 2011-04... | 2011-04-... |
+------+----------+----------+------------+------------+-------------+-----------+-----------+-----------+------------+-------------+

但是一旦它完成了作业,它就会将其删除并搜索已完成的记录会返回错误:

@job=Delayed::Job.find(4037)

ActiveRecord::RecordNotFound: Couldn't find Delayed::Backend::ActiveRecord::Job with ID=4037

@job= Delayed::Job.exists?(params[:id])

我是否应该费心更改此设置,或者推迟删除完整记录?我不确定我还能如何获得其状态通知。或者轮询死记录作为完成证明可以吗?还有其他人面临类似的事情吗?

I have a process which takes generally a few seconds to complete so I'm trying to use delayed_job to handle it asynchronously. The job itself works fine, my question is how to go about polling the job to find out if it's done.

I can get an id from delayed_job by simply assigning it to a variable:

job = Available.delay.dosomething(:var => 1234)

+------+----------+----------+------------+------------+-------------+-----------+-----------+-----------+------------+-------------+
| id   | priority | attempts | handler    | last_error | run_at      | locked_at | failed_at | locked_by | created_at | updated_at  |
+------+----------+----------+------------+------------+-------------+-----------+-----------+-----------+------------+-------------+
| 4037 | 0        | 0        | --- !ru... |            | 2011-04-... |           |           |           | 2011-04... | 2011-04-... |
+------+----------+----------+------------+------------+-------------+-----------+-----------+-----------+------------+-------------+

But as soon as it completes the job it deletes it and searching for the completed record returns an error:

@job=Delayed::Job.find(4037)

ActiveRecord::RecordNotFound: Couldn't find Delayed::Backend::ActiveRecord::Job with ID=4037

@job= Delayed::Job.exists?(params[:id])

Should I bother to change this, and maybe postpone the deletion of complete records? I'm not sure how else I can get a notification of it's status. Or is polling a dead record as proof of completion ok? Anyone else face something similar?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(6

咽泪装欢 2024-11-06 20:15:45

让我们从 API 开始。我想要像下面这样的东西。

@available.working? # => true or false, so we know it's running
@available.finished? # => true or false, so we know it's finished (already ran)

现在我们来写作业。

class AwesomeJob < Struct.new(:options)

  def perform
    do_something_with(options[:var])
  end

end

到目前为止,一切都很好。我们有工作。现在让我们编写将其排队的逻辑。由于Available是负责这项工作的模型,所以让我们教它如何开始这项工作。

class Available < ActiveRecord::Base

  def start_working!
    Delayed::Job.enqueue(AwesomeJob.new(options))
  end

  def working?
    # not sure what to put here yet
  end

  def finished?
    # not sure what to put here yet
  end

end

那么我们如何知道工作是否有效呢?有几种方法,但在 Rails 中,当我的模型创建某些东西时,它通常与该东西相关联,这感觉是正确的。我们如何交往?使用数据库中的 ids。让我们在可用模型上添加一个 job_id

当我们这样做的时候,我们怎么知道工作没有工作是因为它已经完成,还是因为它还没有开始?一种方法是实际检查该工作实际做了什么。如果创建了文件,请检查文件是否存在。如果它计算了一个值,请检查结果是否已写入。但有些工作并不那么容易检查,因为他们的工作可能没有明确的可验证结果。对于这种情况,您可以在模型中使用标志或时间戳。假设这是我们的情况,让我们添加一个 job_finished_at 时间戳来区分尚未运行作业和已经完成作业。

class AddJobIdToAvailable < ActiveRecord::Migration
  def self.up
    add_column :available, :job_id, :integer
    add_column :available, :job_finished_at, :datetime
  end

  def self.down
    remove_column :available, :job_id
    remove_column :available, :job_finished_at
  end
end

好吧。因此,现在让我们通过修改 start_working! 方法,在将作业排队后立即将 Available 与其作业关联起来。

def start_working!
  job = Delayed::Job.enqueue(AwesomeJob.new(options))
  update_attribute(:job_id, job.id)
end

伟大的。此时我可以编写 belongs_to :job,但我们并不真正需要它。

现在我们知道如何编写 working? 方法,非常简单。

def working?
  job_id.present?
end

但是我们如何标记工作已完成呢?没有人比工作本身更清楚工作的完成情况。因此,让我们将 available_id 传递到作业中(作为选项之一)并在作业中使用它。为此,我们需要修改 start_working! 方法来传递 id。

def start_working!
  job = Delayed::Job.enqueue(AwesomeJob.new(options.merge(:available_id => id))
  update_attribute(:job_id, job.id)
end

我们应该将逻辑添加到作业中,以在完成时更新 job_finished_at 时间戳。

class AwesomeJob < Struct.new(:options)

  def perform
    available = Available.find(options[:available_id])
    do_something_with(options[:var])

    # Depending on whether you consider an error'ed job to be finished
    # you may want to put this under an ensure. This way the job
    # will be deemed finished even if it error'ed out.
    available.update_attribute(:job_finished_at, Time.current)
  end

end

有了这段代码,我们就知道如何编写 finished? 方法。

def finished?
  job_finished_at.present?
end

我们就完成了。现在我们可以简单地对 @available.working?@available.finished? 进行轮询。此外,您还可以通过检查 <来方便地了解为您的可用创建了哪个确切的作业。代码>@available.job_id。您可以通过说 belongs_to :job 轻松地将其转变为真正的关联。

Let's start with the API. I'd like to have something like the following.

@available.working? # => true or false, so we know it's running
@available.finished? # => true or false, so we know it's finished (already ran)

Now let's write the job.

class AwesomeJob < Struct.new(:options)

  def perform
    do_something_with(options[:var])
  end

end

So far so good. We have a job. Now let's write logic that enqueues it. Since Available is the model responsible for this job, let's teach it how to start this job.

class Available < ActiveRecord::Base

  def start_working!
    Delayed::Job.enqueue(AwesomeJob.new(options))
  end

  def working?
    # not sure what to put here yet
  end

  def finished?
    # not sure what to put here yet
  end

end

So how do we know if the job is working or not? There are a few ways, but in rails it just feels right that when my model creates something, it's usually associated with that something. How do we associate? Using ids in database. Let's add a job_id on Available model.

While we're at it, how do we know that the job is not working because it already finished, or because it didn't start yet? One way is to actually check for what the job actually did. If it created a file, check if file exists. If it computed a value, check that result is written. Some jobs are not as easy to check though, since there may be no clear verifiable result of their work. For such case, you can use a flag or a timestamp in your model. Assuming this is our case, let's add a job_finished_at timestamp to distinguish a not yet ran job from an already finished one.

class AddJobIdToAvailable < ActiveRecord::Migration
  def self.up
    add_column :available, :job_id, :integer
    add_column :available, :job_finished_at, :datetime
  end

  def self.down
    remove_column :available, :job_id
    remove_column :available, :job_finished_at
  end
end

Alright. So now let's actually associate Available with its job as soon as we enqueue the job, by modifying the start_working! method.

def start_working!
  job = Delayed::Job.enqueue(AwesomeJob.new(options))
  update_attribute(:job_id, job.id)
end

Great. At this point I could've written belongs_to :job, but we don't really need that.

So now we know how to write the working? method, so easy.

def working?
  job_id.present?
end

But how do we mark the job finished? Nobody knows a job has finished better than the job itself. So let's pass available_id into the job (as one of the options) and use it in the job. For that we need to modify the start_working! method to pass the id.

def start_working!
  job = Delayed::Job.enqueue(AwesomeJob.new(options.merge(:available_id => id))
  update_attribute(:job_id, job.id)
end

And we should add the logic into the job to update our job_finished_at timestamp when it's done.

class AwesomeJob < Struct.new(:options)

  def perform
    available = Available.find(options[:available_id])
    do_something_with(options[:var])

    # Depending on whether you consider an error'ed job to be finished
    # you may want to put this under an ensure. This way the job
    # will be deemed finished even if it error'ed out.
    available.update_attribute(:job_finished_at, Time.current)
  end

end

With this code in place we know how to write our finished? method.

def finished?
  job_finished_at.present?
end

And we're done. Now we can simply poll against @available.working? and @available.finished? Also, you gain the convenience of knowing which exact job was created for your Available by checking @available.job_id. You can easily turn it into a real association by saying belongs_to :job.

旧伤还要旧人安 2024-11-06 20:15:45

我最终使用了 Delayed_Job 与 after(job) 回调的组合,该回调使用与创建的作业相同的 ID 填充 memcached 对象。通过这种方式,我可以最大限度地减少访问数据库询问作业状态的次数,而不是轮询 memcached 对象。它包含我从已完成的作业中需要的整个对象,因此我什至没有往返请求。我从 github 上的人写的一篇文章中得到了这个想法,他们做了几乎同样的事情。

https://github.com/blog/467-smart-js-polling

并使用 jquery 插件进行轮询,轮询频率较低,并在一定次数的重试后放弃

https://github.com/jeremyw/jquery-smart-poll

似乎效果很好。

 def after(job)
    prices = Room.prices.where("space_id = ? AND bookdate BETWEEN ? AND ?", space_id.to_i, date_from, date_to).to_a
    Rails.cache.fetch(job.id) do
      bed = Bed.new(:space_id => space_id, :date_from => date_from, :date_to => date_to, :prices => prices)
    end
  end

I ended up using a combination of Delayed_Job with an after(job) callback which populates a memcached object with the same ID as the job created. This way I minimize the number of times I hit the database asking for the status of the job, instead polling the memcached object. And it contains the entire object I need from the completed job, so I don't even have a roundtrip request. I got the idea from an article by the github guys who did pretty much the same thing.

https://github.com/blog/467-smart-js-polling

and used a jquery plugin for the polling, which polls less frequently, and gives up after a certain number of retries

https://github.com/jeremyw/jquery-smart-poll

Seems to work great.

 def after(job)
    prices = Room.prices.where("space_id = ? AND bookdate BETWEEN ? AND ?", space_id.to_i, date_from, date_to).to_a
    Rails.cache.fetch(job.id) do
      bed = Bed.new(:space_id => space_id, :date_from => date_from, :date_to => date_to, :prices => prices)
    end
  end
另类 2024-11-06 20:15:45

我认为最好的方法是使用delayed_job中可用的回调。
这些都是:
:成功,:错误和:之后。
所以你可以在你的模型中添加一些带有 after: 的代码:

class ToBeDelayed
  def perform
    # do something
  end

  def after(job)
    # do something
  end
end

因为如果你坚持使用 obj.delayed.method,那么你必须猴子修补 Delayed::PerformableMethod 并添加 after 方法那里。
恕我直言,它比轮询某些甚至可能是特定于后端的值(例如 ActiveRecord 与 Mongoid)要好得多。

I think that the best way would be to use the callbacks available in the delayed_job.
These are:
:success, :error and :after.
so you can put some code in your model with the after:

class ToBeDelayed
  def perform
    # do something
  end

  def after(job)
    # do something
  end
end

Because if you insist of using the obj.delayed.method, then you'll have to monkey patch Delayed::PerformableMethod and add the after method there.
IMHO it's far better than polling for some value which might be even backend specific (ActiveRecord vs. Mongoid, for instance).

一片旧的回忆 2024-11-06 20:15:45

实现此目的的最简单方法是将轮询操作更改为类似于以下内容的操作:

def poll
  @job = Delayed::Job.find_by_id(params[:job_id])

  if @job.nil?
    # The job has completed and is no longer in the database.
  else
    if @job.last_error.nil?
      # The job is still in the queue and has not been run.
    else
      # The job has encountered an error.
    end
  end
end

为什么这样做有效?当Delayed::Job运行队列中的作业时,如果成功,它会从数据库中删除该作业。如果作业失败,记录将保留在队列中以便稍后再次运行,并且 last_error 属性将设置为遇到的错误。使用上面的两个功能,您可以检查已删除的记录以查看它们是否成功。

上述方法的好处是:

  • 您可以获得您在原始帖子中寻找的轮询效果
  • 使用简单的逻辑分支,如果处理作业出现错误,您可以向用户提供反馈

您可以将此功能封装在通过执行类似以下操作的模型方法:

# Include this in your initializers somewhere
class Queue < Delayed::Job
  def self.status(id)
    self.find_by_id(id).nil? ? "success" : (job.last_error.nil? ? "queued" : "failure")
  end
end

# Use this method in your poll method like so:
def poll
    status = Queue.status(params[:id])
    if status == "success"
      # Success, notify the user!
    elsif status == "failure"
      # Failure, notify the user!
    end
end

The simplest method of accomplishing this is to change your polling action to be something similar to the following:

def poll
  @job = Delayed::Job.find_by_id(params[:job_id])

  if @job.nil?
    # The job has completed and is no longer in the database.
  else
    if @job.last_error.nil?
      # The job is still in the queue and has not been run.
    else
      # The job has encountered an error.
    end
  end
end

Why does this work? When Delayed::Job runs a job from the queue, it deletes it from the database if successful. If the job fails, the record stays in the queue to be ran again later, and the last_error attribute is set to the encountered error. Using the two pieces of functionality above, you can check for deleted records to see if they were successful.

The benefits to the method above are:

  • You get the polling effect that you were looking for in your original post
  • Using a simple logic branch, you can provide feedback to the user if there is an error in processing the job

You can encapsulate this functionality in a model method by doing something like the following:

# Include this in your initializers somewhere
class Queue < Delayed::Job
  def self.status(id)
    self.find_by_id(id).nil? ? "success" : (job.last_error.nil? ? "queued" : "failure")
  end
end

# Use this method in your poll method like so:
def poll
    status = Queue.status(params[:id])
    if status == "success"
      # Success, notify the user!
    elsif status == "failure"
      # Failure, notify the user!
    end
end
花伊自在美 2024-11-06 20:15:45

我建议,如果获得作业已完成的通知很重要,那么编写一个自定义作业对象并将其排队,而不是依赖于调用 Available 时排队的默认作业.delay.dosomething。创建一个类似以下的对象:

class DoSomethingAvailableJob

  attr_accessor options

  def initialize(options = {})
    @options = options
  end

  def perform
    Available.dosomething(@options)
    # Do some sort of notification here
    # ...
  end
end

并将其排队:

Delayed::Job.enqueue DoSomethingAvailableJob.new(:var => 1234)

I'd suggest that if it's important to get notification that the job has completed, then write a custom job object and queue that rather than relying upon the default job that gets queued when you call Available.delay.dosomething. Create an object something like:

class DoSomethingAvailableJob

  attr_accessor options

  def initialize(options = {})
    @options = options
  end

  def perform
    Available.dosomething(@options)
    # Do some sort of notification here
    # ...
  end
end

and enqueue it with:

Delayed::Job.enqueue DoSomethingAvailableJob.new(:var => 1234)
云裳 2024-11-06 20:15:45

应用程序中的delayed_jobs表旨在仅提供正在运行和排队的作业的状态。它不是一个持久表,出于性能原因,它实际上应该尽可能小。这就是为什么作业在完成后立即被删除。

相反,您应该将字段添加到您的 Available 模型中,以表示工作已完成。由于我通常对处理作业需要多长时间感兴趣,因此我添加了 start_time 和 end_time 字段。然后我的 dosomething 方法将如下所示:

def self.dosomething(model_id)

 model = Model.find(model_id)

  begin
    model.start!

    # do some long work ...

    rescue Exception => e
      # ...
    ensure
      model.finish!
  end
end

开始!并完成!方法只是记录当前时间并保存模型。然后我会有一个 completed? 方法,您的 AJAX 可以轮询该方法来查看作业是否完成。

def completed?
  return true if start_time and end_time
  return false
end

有很多方法可以做到这一点,但我发现这个方法简单并且对我来说效果很好。

The delayed_jobs table in your application is intended to provide the status of running and queued jobs only. It isn't a persistent table, and really should be as small as possible for performance reasons. Thats why the jobs are deleted immediately after completion.

Instead you should add field to your Available model that signifies that the job is done. Since I'm usually interested in how long the job takes to process, I add start_time and end_time fields. Then my dosomething method would look something like this:

def self.dosomething(model_id)

 model = Model.find(model_id)

  begin
    model.start!

    # do some long work ...

    rescue Exception => e
      # ...
    ensure
      model.finish!
  end
end

The start! and finish! methods just record the current time and save the model. Then I would have a completed? method that your AJAX can poll to see if the job is finished.

def completed?
  return true if start_time and end_time
  return false
end

There are many ways to do this but I find this method simple and works well for me.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文