异步写入 appengine blob 并在所有任务完成时完成它

发布于 2024-12-28 22:16:28 字数 629 浏览 8 评论 0原文

我有一个难题。

我正在迭代一组按日期参数化的 URL 并获取它们。例如,以下是其中一个示例:

somewebservice.com?start=01-01-2012&end=01-10-2012

有时,从 URL 返回的内容会被截断(缺少带有“截断错误”消息的随机结果)附件)因为我定义的范围太大,所以我必须将查询分成两个URL

somewebservice.com?start=01-01-2012&end=01-05-2012

somewebservice.com?start=01-06-2012&end=01-10-2012

我递归地执行此操作,直到结果不再被截断,然后写入一个允许并发写入的 blob。

每个 URL 获取调用/blob 写入都在单独的任务队列任务中处理。

问题是,我一生都无法设计一个计划来知道所有任务何时完成。我尝试过使用分片计数器,但递归使其变得困难。有人建议我使用 Pipeline API,所以我看了 Slatkin 的演讲 3 遍。它似乎不适用于递归(但我承认我仍然没有完全理解该库)。

是否有办法知道一组任务队列任务(以及递归生成的子任务)何时完成,以便我可以完成我的 blob 并对其执行任何操作?

谢谢, 约翰

I have a difficult problem.

I am iterating through a set of URLs parameterized by date and fetching them. For example, here is an example of one:

somewebservice.com?start=01-01-2012&end=01-10-2012

Sometimes, the content returned from the URL gets truncated (missing random results with a 'truncated error' message attached) because I've defined too large a range, so I have to split the query into two URLs

somewebservice.com?start=01-01-2012&end=01-05-2012

somewebservice.com?start=01-06-2012&end=01-10-2012

I do this recursively until the results aren't truncated anymore, and then I write to a blob, which allows concurrent writes.

Each of these URL fetch calls/blob writes is handled in a separate task queue task.

The problem is, I can't for the life of me devise a scheme to know when all the tasks have completed. I've tried using sharded counters, but the recursion makes it difficult. Someone suggested I use the Pipeline API, so I watched the Slatkin talk 3 times. It doesn't appear to work with recursion (but I admit I still don't fully understand the lib).

Is there anyway to know when a set of task queue tasks (and children that get spawned recursively) are completed so I can finalize my blob and do whatever with it?

Thanks,
John

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

养猫人 2025-01-04 22:16:28

您是否已阅读管道入门文档?管道可以创建其他管道并等待它们,因此做您想做的事情相当简单:

class RecursivePipeline(pipeline.Pipeline):
  def run(self, param):
    if some_condition: # Too big to process in one
      p1 = yield RecursivePipeline(param1)
      p2 = yield RecursivePipeline(param2)
      yield RecursiveCombiningPipeline(p1, p2)

其中 RecursiveCombiningPipeline 只是充当两个子管道的值的接收器。

Have you read the Pipelines Getting Started docs? Pipelines can create other pipelines and wait on them, so doing what you want is fairly straightforward:

class RecursivePipeline(pipeline.Pipeline):
  def run(self, param):
    if some_condition: # Too big to process in one
      p1 = yield RecursivePipeline(param1)
      p2 = yield RecursivePipeline(param2)
      yield RecursiveCombiningPipeline(p1, p2)

Where RecursiveCombiningPipeline simply acts as a receiver for the values of the two sub-pipelines.

征﹌骨岁月お 2025-01-04 22:16:28

这是使用 Java Pipeline 包 com.example 的示例

import com.google.appengine.tools.pipeline.FutureValue;
import com.google.appengine.tools.pipeline.Job1;
import com.google.appengine.tools.pipeline.Job2;
import com.google.appengine.tools.pipeline.Value;

public class PipelineRecursionDemo {

  /**
   * A Job to count the number of letters in a word
   * using recursion
   */
  public static class LetterCountJob extends Job1<Integer, String> {

    public Value<Integer> run(String word) {
      int length = word.length();
      if (length < 2) {
        return immediate(word.length());
      } else {
        int mid = length / 2;
        FutureValue<Integer> first = futureCall(new LetterCountJob(),
            immediate(word.substring(0, mid)));
        FutureValue<Integer> second = futureCall(new LetterCountJob(),
            immediate(word.substring(mid, length)));
        return futureCall(new SumJob(), first, second);
      }
    }
  }

  /**
   * An immediate Job to add two integers
   */
  public static class SumJob extends Job2<Integer, Integer, Integer> {

    public Value<Integer> run(Integer x, Integer y) {
      return immediate(x + y);
    }
  }
}

Here is an example using Java Pipeline

package com.example;

import com.google.appengine.tools.pipeline.FutureValue;
import com.google.appengine.tools.pipeline.Job1;
import com.google.appengine.tools.pipeline.Job2;
import com.google.appengine.tools.pipeline.Value;

public class PipelineRecursionDemo {

  /**
   * A Job to count the number of letters in a word
   * using recursion
   */
  public static class LetterCountJob extends Job1<Integer, String> {

    public Value<Integer> run(String word) {
      int length = word.length();
      if (length < 2) {
        return immediate(word.length());
      } else {
        int mid = length / 2;
        FutureValue<Integer> first = futureCall(new LetterCountJob(),
            immediate(word.substring(0, mid)));
        FutureValue<Integer> second = futureCall(new LetterCountJob(),
            immediate(word.substring(mid, length)));
        return futureCall(new SumJob(), first, second);
      }
    }
  }

  /**
   * An immediate Job to add two integers
   */
  public static class SumJob extends Job2<Integer, Integer, Integer> {

    public Value<Integer> run(Integer x, Integer y) {
      return immediate(x + y);
    }
  }
}
幻想少年梦 2025-01-04 22:16:28

好吧,这就是我所做的。我不得不稍微修改米奇的解决方案,但他确实让我朝着正确的方向前进,并建议返回未来值而不是立即值。

我必须创建一个中间 DummyJob 来获取递归的输出

   public static class DummyJob extends Job1<Void, List<Void>> {
      @Override
      public Value<Void> run(List<Void> dummies) {
         return null;
      }
   }

然后,我在 waitFor 中将 DummyJob 的输出提交给 Blob Finalizer

List<FutureValue<Void>> dummies = new ArrayList<FutureValue<Void>>();
for (Interval in : ins) {
   dummies.add(futureCall(new DataFetcher(), immediate(file), immediate(in.getStart()),
         immediate(in.getEnd())));
}

FutureValue<Void> fv = futureCall(new DummyJob(), futureList(dummies));

return futureCall(new DataWriter(), immediate(file), waitFor(fv));

谢谢 Mitch 和 Nick!

All right, so here's what I did. I had to modify Mitch's solution just a bit, but he definitely got me in the right direction with the advice to return the future value instead of an immediate one.

I had to create an intermidate DummyJob that takes the output of the recursion

   public static class DummyJob extends Job1<Void, List<Void>> {
      @Override
      public Value<Void> run(List<Void> dummies) {
         return null;
      }
   }

Then, I submit the output of the DummyJob to the Blob Finalizer in a waitFor

List<FutureValue<Void>> dummies = new ArrayList<FutureValue<Void>>();
for (Interval in : ins) {
   dummies.add(futureCall(new DataFetcher(), immediate(file), immediate(in.getStart()),
         immediate(in.getEnd())));
}

FutureValue<Void> fv = futureCall(new DummyJob(), futureList(dummies));

return futureCall(new DataWriter(), immediate(file), waitFor(fv));

Thank you Mitch and Nick!!

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文