App Engine:检查数据存储中数据更新的最佳方法,同时避免数据存储写入

发布于 2024-11-15 05:48:43 字数 658 浏览 1 评论 0原文

我的数据存储中有大量来自外部数据源的实体(产品)。我想每天检查它们的更新。

有些项目已经更新,因为应用程序直接获取它们。有些是新插入的,不需要更新。

对于尚未获取的内容,我正在运行 cron 作业。我使用Python API。

目前我执行以下操作。

我有一个字段

dateupdated = db.DateTimeProperty(auto_now_add=True)

,然后可以使用它

query = dbmodel.product.all()
query.filter('dateupdated <', newdate)
query.order('dateupdated')        
results = query.fetch(limit=mylimit, offset=myoffset)

来选择最旧的条目并安排它们进行更新。我使用具有自定义任务名称的任务队列来确保每个产品更新每天仅运行一次。

问题是,我需要更新字段 dateupdated,这意味着数据存储写入,即使产品的数据没有更改,只是为了跟踪更新过程。

这会消耗大量资源(CPU 时间、数据存储 API 调用等)。

是否有更好的方法来执行此类任务并避免不必要的数据存储写入?

I have a large amount of entities (products) in my datastore which come from a external data source. I want to check them for updates daily.

Some items are already updated because the application fetched them directly. Some are newly inserted and don´t need updates.

For ones which have not been fetched I have cron jobs running. I use the Python API.

At the moment I do the following.

I have a field

dateupdated = db.DateTimeProperty(auto_now_add=True)

I can then use

query = dbmodel.product.all()
query.filter('dateupdated <', newdate)
query.order('dateupdated')        
results = query.fetch(limit=mylimit, offset=myoffset)

to pick the oldest entries and schedule them for update. I used the Task Queue with custom task names to make sure each product update is only run once a day.

The problem is, that I need to update the field dateupdated, which means a datastore write, even if a product´s data was not changed, just to keep track of the update process.

This consumes lots of ressources (CPU hours, Datastore API calls, etc.).

Is there a better way to perform such a task and avoid the unnecessary datastore writes?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

财迷小姐 2024-11-22 05:48:43

是的,使用游标

通过按 dateupdated 然后在处理完实体后存储游标,您可以稍后重新运行相同的查询以仅获取上次查询后更新的项目。

因此,给定一个像这样的类,

class MyEntity(db.model):
    dateupdated = db.DateTimeProperty(auto_now_add=True)

您可以设置一个处理程序作为任务运行,例如:

class ProcessNewEntities(webapp.RequestHandler):
    def get(self):
        """Run via a task to process batches of 'batch_size' 
        recently updated entities"""
        # number of eneities to process per task execution
        batch_size = 100
        # build the basic query
        q = MyEntity.all().order("dateupdated")
        # use a cursor?
        cursor = self.request.get("cursor")
        if cursor:
            q.with_cursor(cursor)
        # fetch the batch
        entities = q.fetch(batch_size)
        for entity in entities:
            # process the entity
            do_your_processing(entity)
        # queue up the next task to process the next 100
        # if we have no more to process then delay this task 
        # for a while so that it doesn't hog the application
        delay = 600 if len(entities)<batch_size else 0
        taskqueue.add(
            url='/tasks/process_new_entities', 
            params={'cursor': q.cursor()},
            countdown=delay)

然后您只需要触发任务执行的开始,例如:

def start_processing_entities():
    taskqueue.add(url='/tasks/process_new_entities')

Yes, use cursors

By ordering a query by dateupdated and then storing a cursor after you have processed your entities, you can re-run the same query later to get only the items updated after your last query.

So, given a class like

class MyEntity(db.model):
    dateupdated = db.DateTimeProperty(auto_now_add=True)

You could setup a handler to be run as a task like:

class ProcessNewEntities(webapp.RequestHandler):
    def get(self):
        """Run via a task to process batches of 'batch_size' 
        recently updated entities"""
        # number of eneities to process per task execution
        batch_size = 100
        # build the basic query
        q = MyEntity.all().order("dateupdated")
        # use a cursor?
        cursor = self.request.get("cursor")
        if cursor:
            q.with_cursor(cursor)
        # fetch the batch
        entities = q.fetch(batch_size)
        for entity in entities:
            # process the entity
            do_your_processing(entity)
        # queue up the next task to process the next 100
        # if we have no more to process then delay this task 
        # for a while so that it doesn't hog the application
        delay = 600 if len(entities)<batch_size else 0
        taskqueue.add(
            url='/tasks/process_new_entities', 
            params={'cursor': q.cursor()},
            countdown=delay)

and then you just need to trigger the start of the task execution like:

def start_processing_entities():
    taskqueue.add(url='/tasks/process_new_entities')
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文