在 Google App Engine 中使用 mapreduce 的简单反例

发布于 2024-11-08 22:54:59 字数 1005 浏览 4 评论 0原文

我对 GAE 中 MapReduce 支持的当前状态有些困惑。根据文档http://code.google.com/p/appengine-mapreduce/减少阶段是尚不支持,但在 I/O 2011 会议的描述中 (http://www.youtube.com/ watch?v=EIxelKcyCC0 )它写道“现在可以在 App Engine 上运行完整的 MapReduce 作业”。我想知道我是否可以在这个任务中使用mapreduce:

我想要做什么:

我有带有字段颜色的模型汽车:

class Car(db.Model):
    color = db.StringProperty()

我想运行mapreduce进程(不时地,cron定义的),它可以计算每种颜色有多少辆汽车,并将此结果存储在数据存储中。似乎是一项非常适合mapreduce的工作(但如果我错了,请纠正我),阶段“map”将为每个汽车实体生成对(,1),并且阶段“reduce”应该按color_name合并这些数据,给我预期的结果。我想要获得的最终结果是具有存储在数据存储中的计算数据的实体,如下所示:

class CarsByColor(db.Model):
    color_name = db.StringProperty()
    cars_num = db.IntegerProperty()

问题: 我不知道如何在 appengine 中实现这一点...视频显示了定义的映射和化简函数的示例,但它们似乎是与数据存储无关的非常通用的示例。我发现的所有其他示例都使用一个函数来处理来自 DatastoreInputReader 的数据,但它们似乎只是“映射”阶段,没有说明如何执行“减少”(以及如何将减少结果存储在数据存储)。

I'm somewhat confused with the current state of mapreduce support in GAE. According to the docs http://code.google.com/p/appengine-mapreduce/ reduce phase isn't supported yet, but in the description of the session from I/O 2011 ( http://www.youtube.com/watch?v=EIxelKcyCC0 ) it's written "It is now possible to run full Map Reduce jobs on App Engine". I wonder if I can use mapreduce in this task:

What I want to do:

I have model Car with field color:

class Car(db.Model):
    color = db.StringProperty()

I want to run mapreduce process (from time to time, cron-defined) which can compute how many cars are in each color ans store this result in the datastore. Seems like a job well suited for mapreduce (but if I'm wrong correct me), phase "map" will yield pairs (, 1) for each Car entity, and phase "reduce" should merge this data by color_name giving me expected results. Final result I want to get are entities with computed data stored in the datastore, something like that:

class CarsByColor(db.Model):
    color_name = db.StringProperty()
    cars_num = db.IntegerProperty()

Problem:
I don't know how to implement this in appengine ... The video shows examples with defined map and reduce functions, but they seem to be very general examples not related to the datastore. All other examples that i found are using one function to process the data from DatastoreInputReader, but they seem to be only the "map" phase, there is no example of how to do the "reduce" (and how to store reduce results in the datastore).

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

一笔一画续写前缘 2024-11-15 22:54:59

您实际上并不需要减少阶段。您可以使用线性任务链来完成此任务,或多或少如下:

def count_colors(limit=100, totals={}, cursor=None):
  query = Car.all()
  if cursor:
    query.with_cursor(cursor)
  cars = query.fetch(limit)
  for car in cars:
    try:
      totals[car.color] += 1
    except KeyError:
      totals[car.color] = 1
  if len(cars) == limit:
    cursor = query.cursor()
    return deferred.defer(count_colors, limit, totals, cursor)
  entities = []
  for color in totals:
    entity = CarsByColor(key_name=color)
    entity.cars_num = totals[color]
    entities.append(entity)
  db.put(entities)

deferred.defer(count_colors)

这应该迭代所有汽车,将查询游标和运行计数传递给一系列临时任务,并在最后存储总计。

如果您必须合并来自多个数据存储、多个模型或单个模型中的多个索引的数据,则缩减阶段可能很有意义。我认为它不会给你买任何东西。

另一种选择:使用任务队列来维护每种颜色的实时计数器。当您创建汽车时,启动一项任务来增加该颜色的总数。当您更新一辆汽车时,启动一项任务来减少旧颜色,并启动另一项任务来增加新颜色。以事务方式更新计数器以避免竞争条件。

You don't really need a reduce phase. You can accomplish this with a linear task chain, more or less as follows:

def count_colors(limit=100, totals={}, cursor=None):
  query = Car.all()
  if cursor:
    query.with_cursor(cursor)
  cars = query.fetch(limit)
  for car in cars:
    try:
      totals[car.color] += 1
    except KeyError:
      totals[car.color] = 1
  if len(cars) == limit:
    cursor = query.cursor()
    return deferred.defer(count_colors, limit, totals, cursor)
  entities = []
  for color in totals:
    entity = CarsByColor(key_name=color)
    entity.cars_num = totals[color]
    entities.append(entity)
  db.put(entities)

deferred.defer(count_colors)

This should iterate over all your cars, pass a query cursor and a running tally to a series of ad-hoc tasks, and store the totals at the end.

A reduce phase might make sense if you had to merge data from multiple datastores, multiple models, or multiple indexes in a single model. As is I don't think it would buy you anything.

Another option: use the task queue to maintain live counters for each color. When you create a car, kick off a task to increment the total for that color. When you update a car, kick off one task to decrement the old color and another to increment the new color. Update counters transactionally to avoid race conditions.

静谧幽蓝 2024-11-15 22:54:59

我在这里提供我最终使用GAE 中的mapreduce 找到的解决方案(没有reduce 阶段)。如果我从头开始,我可能会使用Drew Sears提供的解决方案。

它适用于 GAE python 1.5.0

app.yaml中,我添加了mapreduce的处理程序:

- url: /mapreduce(/.*)?
  script: $PYTHON_LIB/google/appengine/ext/mapreduce/main.py

以及mapreduce代码的处理程序(我使用url /mapred_update来收集mapreduce生成的结果) :

- url: /mapred_.*
  script: mapred.py

创建ma​​preduce.yaml用于处理汽车实体:

mapreduce:
- name: Color_Counter
  params:
  - name: done_callback
    value: /mapred_update
  mapper:
    input_reader: google.appengine.ext.mapreduce.input_readers.DatastoreInputReader
    handler: mapred.process
    params:
    - name: entity_kind
      default: models.Car

说明:done_callback是mapreduce完成操作后调用的url。 ma​​pred.process是一个处理单个实体并更新计数器的函数(它在mapred.py文件中定义)。 中定义

模型 Car 在 models.py ma​​pred.py

from models import CarsByColor
from google.appengine.ext import db
from google.appengine.ext.mapreduce import operation as op
from google.appengine.ext.mapreduce.model import MapreduceState

from google.appengine.ext import webapp
from google.appengine.ext.webapp.util import run_wsgi_app

def process(entity):
    """Process individual Car"""
    color = entity.color
    if color:
        yield op.counters.Increment('car_color_%s' % color)

class UpdateCounters(webapp.RequestHandler):
    """Create stats models CarsByColor based on the data 
    gathered by mapreduce counters"""
    def post(self):
        """Called after mapreduce operation are finished"""
        # Finished mapreduce job id is passed in request headers
        job_id = self.request.headers['Mapreduce-Id']
        state = MapreduceState.get_by_job_id(job_id)
        to_put = []
        counters = state.counters_map.counters
        # Remove counter not needed for stats
        del counters['mapper_calls']
        for counter in counters.keys():
            stat = CarsByColor.get_by_key_name(counter)
            if not stat:
                stat = CarsByColor(key_name=counter,
                                name=counter)
            stat.value = counters[counter]
            to_put.append(stat)
        db.put(to_put)

        self.response.headers['Content-Type'] = 'text/plain'
        self.response.out.write('Updated.')


application = webapp.WSGIApplication(
                                     [('/mapred_update', UpdateCounters)],
                                     debug=True)
def main():
    run_wsgi_app(application)

if __name__ == "__main__":
    main()            

:与问题相比,CarsByColor 模型的定义略有变化。

您可以从 url 手动启动 mapreduce 作业: http://yourapp/mapreduce/ 并希望从 cron 启动(我还没有尚未测试 cron)。

I'm providing here solution I figured out eventually using mapreduce from GAE (without reduce phase). If I had started from scratch I probably would have used solution provided by Drew Sears.

It works in GAE python 1.5.0

In app.yaml I added the handler for mapreduce:

- url: /mapreduce(/.*)?
  script: $PYTHON_LIB/google/appengine/ext/mapreduce/main.py

and the handler for my code for mapreduce (I'm using url /mapred_update to gather the results produced by mapreduce):

- url: /mapred_.*
  script: mapred.py

Created mapreduce.yaml for processing Car entities:

mapreduce:
- name: Color_Counter
  params:
  - name: done_callback
    value: /mapred_update
  mapper:
    input_reader: google.appengine.ext.mapreduce.input_readers.DatastoreInputReader
    handler: mapred.process
    params:
    - name: entity_kind
      default: models.Car

Explanation: done_callback is an url that is called after mapreduce finishes its operations. mapred.process is a function that process individual entity and update counters (it's defined in mapred.py file). Model Car is defined in models.py

mapred.py:

from models import CarsByColor
from google.appengine.ext import db
from google.appengine.ext.mapreduce import operation as op
from google.appengine.ext.mapreduce.model import MapreduceState

from google.appengine.ext import webapp
from google.appengine.ext.webapp.util import run_wsgi_app

def process(entity):
    """Process individual Car"""
    color = entity.color
    if color:
        yield op.counters.Increment('car_color_%s' % color)

class UpdateCounters(webapp.RequestHandler):
    """Create stats models CarsByColor based on the data 
    gathered by mapreduce counters"""
    def post(self):
        """Called after mapreduce operation are finished"""
        # Finished mapreduce job id is passed in request headers
        job_id = self.request.headers['Mapreduce-Id']
        state = MapreduceState.get_by_job_id(job_id)
        to_put = []
        counters = state.counters_map.counters
        # Remove counter not needed for stats
        del counters['mapper_calls']
        for counter in counters.keys():
            stat = CarsByColor.get_by_key_name(counter)
            if not stat:
                stat = CarsByColor(key_name=counter,
                                name=counter)
            stat.value = counters[counter]
            to_put.append(stat)
        db.put(to_put)

        self.response.headers['Content-Type'] = 'text/plain'
        self.response.out.write('Updated.')


application = webapp.WSGIApplication(
                                     [('/mapred_update', UpdateCounters)],
                                     debug=True)
def main():
    run_wsgi_app(application)

if __name__ == "__main__":
    main()            

There is slightly changed definition of CarsByColor model compared to question.

You can start the mapreduce job manually from url: http://yourapp/mapreduce/ and hopefully from cron (I haven't tested the cron yet).

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文