如何在 Google AppEngine 中汇总数据

发布于 2024-10-27 03:57:33 字数 984 浏览 4 评论 0 原文

我正在尝试使用 AppEngine 对大型数据集实现摘要视图。

我的模型如下所示:

def TxRecord(db.Model):
    expense_type = db.StringProperty()
    amount = db.IntegerProperty()

def ExpenseType(db.Model):
    name = db.StringProperty()
    total = db.IntegerProperty()

我的数据存储区包含 100K 个 TxRecord 实例,我想按 expense_type 总结这些实例。

在 sql 中,它会类似于:

select expense_type as name, sum(amount) as total 
    from TxRecord
    group by expense_type

我当前正在做的是使用 Python MapReduce 框架 使用以下映射器迭代所有 TxRecords

def generate_expense_type(rec):
    expense_type = type.get_or_insert(name, name = rec.expense_type)
    expense_type.total += rec.amount

    yield op.db.Put(expense_type)

这似乎可行,但我觉得我必须使用 1 的 shard_count 来运行它,以确保总数不会被并发写入覆盖。

是否有一种策略可以让我使用 AppEngine 来解决这个问题?或者就是这样吗?

I'm trying to implement a summary view on to a large(ish) data set using AppEngine.

My model looks something like:

def TxRecord(db.Model):
    expense_type = db.StringProperty()
    amount = db.IntegerProperty()

def ExpenseType(db.Model):
    name = db.StringProperty()
    total = db.IntegerProperty()

My datastore contains 100K instances of TxRecord and I'd like to summarise these by expense_type.

In sql it would be something like:

select expense_type as name, sum(amount) as total 
    from TxRecord
    group by expense_type

What I'm currently doing is using the Python MapReduce framework to iterate over all of the TxRecords using the following mapper:

def generate_expense_type(rec):
    expense_type = type.get_or_insert(name, name = rec.expense_type)
    expense_type.total += rec.amount

    yield op.db.Put(expense_type)

This seems to work, but I feel I have to run it using a shard_count of 1 in order to ensure that the total isn't over written with concurrent writes.

Is there a strategy that I can use to over come this issue using AppEngine or is that it?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

两人的回忆 2024-11-03 03:57:33

使用mapreduce是正确的方法。正如 David 所建议的,计数器是一种选择,但它们并不可靠(它们使用内存缓存),而且它们并不是为并行保持大量计数器而设计的。

您当前的 mapreduce 有几个问题:首先,get_or_insert 每次调用时都会执行数据存储事务。其次,您然后在交易外部更新金额并再次异步存储它,从而产生您担心的并发问题。

至少在完全支持reduce之前,最好的选择是在事务中的映射器中进行整个更新,如下所示:

def generate_expense_type(rec):
    def _tx():
      expense_type = type.get(name)
      if not expense_type:
        expense_type = type(key_name=name)
      expense_type.total += rec.amount
      expense_type.put()
    db.run_in_transaction(expense_type)

Using mapreduce is the right approach. Counters, as David suggests, are one option, but they're not reliable (they use memcache), and they're not designed for massive numbers of counters to be kept in parallel.

Your current mapreduce has a couple of issues: First, get_or_insert executes a datastore transaction every time it's called. Second, you then update the amount outside the transaction and asynchronously store it a second time, generating the concurrency issue you were concerned about.

At least until reduce is fully supported, your best option is to do the whole update in the mapper in a transaction, like this:

def generate_expense_type(rec):
    def _tx():
      expense_type = type.get(name)
      if not expense_type:
        expense_type = type(key_name=name)
      expense_type.total += rec.amount
      expense_type.put()
    db.run_in_transaction(expense_type)
凝望流年 2024-11-03 03:57:33

使用 MapReduce 框架是一个好主意。如果您利用 MapReduce 框架提供的计数器,则可以使用多个分片。因此,您不必每次都修改数据存储,而是可以执行以下操作:

yield op.counters.Increment("total_<expense_type_name>", rec.amount)

MapReduce 完成后(希望比仅使用一个分片时快得多),然后您可以将最终的计数器复制到数据存储实体中。

Using the MapReduce framework is a good idea. You could use more than one shard if you utilize the counters provided by the MapReduce framework. So instead of modifying the datastore each time, you could do something like this:

yield op.counters.Increment("total_<expense_type_name>", rec.amount)

After the MapReduce finishes (hopefully much more quickly than when you were using just one shard), then you can copy the finalized counters into your datastore entity.

一曲琵琶半遮面シ 2024-11-03 03:57:33

MapReduce 非常适合离线处理数据,我喜欢 David 处理计数器的解决方案(+1 赞成)。

我只是想提一下另一个选项:处理传入的数据。查看 Brett Slatkin 的 App Engine 上的高吞吐量数据管道 来自 IO 2010 的演讲。

我已经在一个简单的框架中实现了该技术 (slagg),您可能会找到我的示例 使用日期汇总进行分组很有用

MapReduce is great for offline processing of data, and I like David's solution for handling the counters (+1 upvote).

I just wanted to mention another option: process the data as it comes in. Check out Brett Slatkin's High Throughput Data Pipelines on App Engine talk from IO 2010.

I've implemented the technique in a simple framework (slagg), you might find my example of grouping with date rollup useful.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文