Google App Engine:如何使用任务队列进行此处理?
我正在使用 Python GAE SDK。
我需要对 6000 多个 MyKind
实例进行一些处理。在单个请求中完成速度太慢,因此我使用任务队列。如果我让一个任务只处理一个实体,那么它应该只需要几秒钟。
文档规定只能添加 100 个任务“批量”。 (他们的意思是什么?在一个请求中?在一个任务中?)
因此,假设“批量”意味着“请求”,我试图找出为每个实体创建任务的最佳方法是什么数据存储。你怎么认为?
如果我可以假设 MyKind
的顺序永远不会改变,那就更容易了。 (处理实际上永远不会改变 MyKind
实例 - 它只会创建其他类型的新实例。)我可以做一堆任务,给每个任务一个从哪里开始的偏移量,间隔小于相差100。然后,每个任务可以创建执行实际处理的单独任务。
但是,如果实体太多,原始请求无法添加所有必要的调度任务怎么办?这让我觉得我需要一个递归解决方案 - 每个任务都会查看给定的范围。如果该范围中仅存在一个元素,则会对其进行处理。否则,它将范围进一步细分为后续任务。
如果我不能指望使用偏移量和限制来识别实体(因为它们的顺序不能确保恒定),也许我可以只使用它们的键?但随后我可能会发送数千把钥匙,这似乎很麻烦。
我在这里走的是正确的道路吗?或者我应该考虑其他设计吗?
I'm using the Python GAE SDK.
I have some processing that needs to be done on 6000+ instances of MyKind
. It is too slow to be done in a single request, so I'm using the task queue. If I make a single task process only one entity, then it should take only a few seconds.
The documentation says that only 100 tasks can be added in a "batch". (What do they mean by that? In one request? In one task?)
So, assuming that "batch" means "request", I'm trying to figure out what the best way is to create a task for each entity in the datastore. What do you think?
It's easier if I can assume that the order of MyKind
will never change. (The processing will never actually change the MyKind
instances - it only creates new instances of other types.) I could just make a bunch of tasks, giving each one an offset of where to start, spaced less than 100 apart. Then, each task could create individual tasks that do the actual processing.
But what if there are so many entities that the original request can't add all the necessary scheduling tasks? This makes me think I need a recursive solution - each task looks at the range it is given. If only one element is present in the range, it does processing on it. Otherwise, it subdivides the range further into subsequent tasks.
If I can't count on using offsets and limits to identify entities (because their ordering isn't ensured to be constant), maybe I could just use their keys? But then I could be sending 1000s of keys around, which seems unwieldy.
Am I going down the right path here, or is there another design I should consider?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
当您运行像
taskqueue.add(url='/worker', params={'cursor':cursor})
这样的代码时,您正在将任务排入队列;使用您提供的参数安排请求在带外执行。显然,您可以在一项操作中安排最多 100 个这样的操作。但我认为你不想这么做。任务链将使这变得更加简单:
您的工作任务将执行如下操作:
运行查询以获取一些记录进行处理。如果任务参数中提供了光标,请使用它。将查询限制为 10 条记录,或者您认为可以在 30 秒内完成的任何内容。
处理您的 10 条记录
如果您的查询返回 10 条记录,请将另一个任务排入队列并向其传递查询中更新后的游标,以便它可以从您上次停下的地方继续。
>
如果您的记录少于 10 条,则您已完成。万岁!发送一封电子邮件或其他内容并退出。
通过这条路线,你只需要开始第一个任务,其余的任务就可以自行添加。
请注意,如果某项任务失败,App Engine 将重试直至成功,因此您无需担心数据存储故障会导致一项任务超时并破坏链条。
编辑:
上述步骤并不能保证实体只会被处理一次。任务通常应该只运行一次,但谷歌确实建议你设计幂等性。如果这是一个主要问题,则有一种处理方法:
在每个要处理的实体上放置一个状态标志,或创建一个补充实体来保存该标志。它应该具有类似于“待处理”、“正在处理”和“已处理”的状态。
当您获取要处理的新实体时,以事务方式锁定并递增处理标志。仅当实体处于待处理状态时才运行该实体。处理完成后,再次增加该标志。
请注意,在开始之前,不一定要向每个实体添加处理标志。您的“待处理”状态可能仅意味着该属性或相应的实体尚不存在。
When you run code like
taskqueue.add(url='/worker', params={'cursor': cursor})
you are enqueueing a task; scheduling a request to execute out of band using the parameters you provide. You can apparently schedule up to 100 of these in one operation.I don't think you want to, though. Task chaining would make this a lot simpler:
Your worker task would do something like this:
Run a query to fetch some records for processing. If a cursor was provided in the task params, use it. Limit the query to 10 records, or whatever you think can finish in 30 seconds.
Process your 10 records
If your query returned 10 records, enqueue another task and pass it the updated cursor from your query so it can pick up where you left off.
If you got fewer than 10 records, you're done. Hooray! Fire off an email or something and quit.
With this route, you only need to kick off the first task, and the rest add themselves.
Note that if a task fails, App Engine will retry it until it succeeds, so you don't need to worry about a datastore hiccup causing one task to timeout and break the chain.
Edit:
The steps above do not guarantee that an entity will be processed only once. Tasks should generally run only once, but Google does advise you to design for idempotence. If it's a major concern, here's one way to handle it:
Put a status flag on each entity to be processed, or create a complementary entity to hold the flag. It should have states akin to Pending, Processing, and Processed.
When you fetch a new entity to process, transactionally lock and increment the processing flag. Only run the entity if it's Pending. When processing finishes, increment the flag again.
Note that it's not strictly necessary to add the processing flag to every entity before you start. Your "pending" state can just mean the property or corresponding entity doesn't exist yet.
另外,根据您的设计,您可以执行我所做的操作,即对需要处理的所有记录进行编号。我处理了大约 3500 个项目,每个项目需要 3 秒左右的时间来处理。为了避免重叠、超时并考虑到将来的扩展,我的第一个任务从数据库中获取该类型的所有唯一项目的列表。然后,它将其分为每个项目标识符 500 个的列表,循环直到它考虑了数据库中的所有唯一项目,并将 500 个标识符的每个块发布到第二层处理程序任务。第二个处理程序任务中的每一个(当前有七到八个不同的任务)都有一个包含 500 个项目的唯一列表,并且每个处理程序任务都添加 500 个任务,每个任务对应一个唯一标识符。
由于它都是通过循环进行管理的,并根据数据库中唯一项目的数量进行计数,因此我可以添加任意数量的唯一项目,并且任务数量将扩展以容纳它们,并且绝对不会重复。我每天用它来跟踪游戏中的价格,所以这一切都是使用 cron 作业启动的,根本不需要我干预。
Also depending on your design, you can do what I did, which is number all of the records that need to be processed. I process about 3500 items, each taking 3 seconds or so to process. To avoid overlap, timeouts and to account for expansion in the future, my first task gets the list of all the unique items of that kind from the database. Then it divides it up into lists of 500 each item identifier, looping until it accounts for all the unique items in my database and posts each chunk of 500 identifiers to the second tier of handler tasks. Each of the second handler tasks , which currently is seven or eight different tasks, then has a unique list of 500 items, and each of those handler tasks add 500 tasks, one for each unique identifier.
Since it's all managed through loops and counting based on the number of unique items in my database then I can add as many unique items as I want, and the number of tasks will expand to accommodate them with absolutely no duplication. I use it for tracking prices in a game on a daily basis, so it's all fired off using a cron job and requires no intervention on my part at all.