任务扇出 - 如何批量添加任务到队列 - 超过 5 个

发布于 2024-09-26 01:07:04 字数 739 浏览 1 评论 0原文

我正在使用一个任务(排队任务)来对多个其他任务进行排队——扇出。当我尝试使用 Queue.add< /a> ,任务参数是任务列表 实例具有超过 5 个元素并且处于事务中...我收到此错误。

JointException: taskqueue.DatastoreError caused by: 
    <class 'google.appengine.api.datastore_errors.BadRequestError'> 
    Too many messages, maximum allowed 5

是否有另一种方法可以在事务中对 5 个以上的任务进行排队?

或者...

也许我不需要事务,因为:

  1. 我不在乎这些任务中的任何一个是否会排队两次,
  2. 如果其中任何一个任务排队失败,那么整个排队任务将重新运行。

那么请告诉我如何在事务中排队 5 个以上的任务,或者告诉我不要使用事务,因为我实际上并不需要事务。

I am using a task (queueing-task) to queue multiple others tasks — fanout. When I try to use Queue.add with task argument being a list of Task instances with more than 5 element's and in transaction… I get this error.

JointException: taskqueue.DatastoreError caused by: 
    <class 'google.appengine.api.datastore_errors.BadRequestError'> 
    Too many messages, maximum allowed 5

Is there another way to queue more than 5 tasks in a transaction?

Or...

Maybe I don't need a transaction, cause:

  1. I don't care if any of those tasks get queued twice anyway, and
  2. if queueing will fail for any of them, then the whole queueing-task will be re run.

So tell me how do I queue more than 5 tasks in a transaction or tell me to not use transaction cause I don't really need one.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

新一帅帅 2024-10-03 01:07:04

解决您的问题的一种接近解决方案是添加一个事务性任务来扇出剩余的任务。只需在现有事务中添加一个扇出任务即可。

除非有业务逻辑原因需要这样做,否则不要重新运行已运行的任务。防止任务被重新插入(即重复)非常简单并且可以节省资源。您的扇出任务基本上如下所示:

class FanOutTask(webapp.RequestHandler):
  def get(self):
    name = self.request.get('name')
    params = deserialize(self.request.get('params'))

    try:
      task_params = params.get('stuff')
      taskqueue.add(url='/worker/1', name=name + '-1', params=task_params)
    except TaskAlreadyExistsError:
      pass

    try:
      task_params = params.get('more')
      taskqueue.add(url='/worker/2', name=name + '-2', params=task_params)
    except TaskAlreadyExistsError:
      pass

以事务方式添加扇出任务可确保其排队。已运行的任务所导致的错误会被捕获并忽略,其他错误会导致扇出任务重新运行。使用这种模式,您可以非常轻松地插入许多子任务。

One solution close to solving your problem is to add one transactional task that fans-out the remaining tasks. Just add the one fan-out task in your existing transaction.

Unless there is a business logic reason to do so, do not re-run a task that has already run. Preventing tasks from being re-inserted (i.e. duplicated) is straightforward and saves resources. Your fan-out task will basically look like:

class FanOutTask(webapp.RequestHandler):
  def get(self):
    name = self.request.get('name')
    params = deserialize(self.request.get('params'))

    try:
      task_params = params.get('stuff')
      taskqueue.add(url='/worker/1', name=name + '-1', params=task_params)
    except TaskAlreadyExistsError:
      pass

    try:
      task_params = params.get('more')
      taskqueue.add(url='/worker/2', name=name + '-2', params=task_params)
    except TaskAlreadyExistsError:
      pass

Adding the fan-out task transactionally ensures it is enqueued. Errors resulting from the task already being run get caught and ignored, other errors cause the fan-out task to re-run. With this pattern you can insert many sub-tasks pretty easily.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文