Google App Engine 中多种数据存储类型上的 MapReduce

发布于 2024-09-24 02:40:00 字数 1734 浏览 5 评论 0原文

我刚刚观看了 Batch使用 Google I/O 2010 的 App Engine 会议进行数据处理,请阅读 MapReduce 的部分内容来自 Google Research 的文章,现在我正在考虑使用 Google App Engine 上的 MapReduce< /a> 在 Python 中实现推荐系统。

我更喜欢使用 appengine-mapreduce 而不是 Task Queue API,因为前者可以轻松迭代某种类型的所有实例、自动批处理、自动任务链接等。问题是:我的推荐系统需要计算两个不同模型的实例之间的相关性,即两种不同类型的实例。

例子: 我有这两个模型:用户和项目。每个都有一个标签列表作为属性。以下是计算用户和项目之间相关性的函数。请注意,应该为用户和项目的每个组合调用 calculateCorrelation

def calculateCorrelation(user, item):
    return calculateCorrelationAverage(u.tags, i.tags)

def calculateCorrelationAverage(tags1, tags2):
    correlationSum = 0.0
    for (tag1, tag2) in allCombinations(tags1, tags2):
        correlationSum += correlation(tag1, tag2)
    return correlationSum / (len(tags1) + len(tags2))

def allCombinations(list1, list2):
    combinations = []
    for x in list1:
        for y in list2:
            combinations.append((x, y))
    return combinations             

但是 calculateCorrelation 不是 appengine-mapreduce 中的有效映射器,并且该函数甚至可能与 MapReduce 不兼容计算概念。然而,我需要确定...拥有那些 appengine-mapreduce 优势(例如自动批处理和任务链接)对我来说真的很棒。

有什么解决方案吗?

我应该定义自己的 InputReader 吗? 读取两种不同类型的所有实例的新 InputReader 与当前的 appengine-mapreduce 实现兼容吗?

或者我应该尝试以下操作?

  • 将这两种实体的所有键两两组合到新模型的实例中(可能使用 MapReduce)
  • 使用映射器对此新模型的实例进行
  • 迭代每个实例,使用其中的键来获取两个不同类型的实体并计算它们之间的相关性。

I just watched Batch data processing with App Engine session of Google I/O 2010, read some parts of MapReduce article from Google Research and now I am thinking to use MapReduce on Google App Engine to implement a recommender system in Python.

I prefer using appengine-mapreduce instead of Task Queue API because the former offers easy iteration over all instances of some kind, automatic batching, automatic task chaining, etc. The problem is: my recommender system needs to calculate correlation between instances of two different Models, i.e., instances of two distinct kinds.

Example:
I have these two Models: User and Item. Each one has a list of tags as an attribute. Below are the functions to calculate correlation between users and items. Note that calculateCorrelation should be called for every combination of users and items:

def calculateCorrelation(user, item):
    return calculateCorrelationAverage(u.tags, i.tags)

def calculateCorrelationAverage(tags1, tags2):
    correlationSum = 0.0
    for (tag1, tag2) in allCombinations(tags1, tags2):
        correlationSum += correlation(tag1, tag2)
    return correlationSum / (len(tags1) + len(tags2))

def allCombinations(list1, list2):
    combinations = []
    for x in list1:
        for y in list2:
            combinations.append((x, y))
    return combinations             

But that calculateCorrelation is not a valid Mapper in appengine-mapreduce and maybe this function is not even compatible with MapReduce computation concept. Yet, I need to be sure... it would be really great for me having those appengine-mapreduce advantages like automatic batching and task chaining.

Is there any solution for that?

Should I define my own InputReader? A new InputReader that reads all instances of two different kinds is compatible with the current appengine-mapreduce implementation?

Or should I try the following?

  • Combine all keys of all entities of these two kinds, two by two, into instances of a new Model (possibly using MapReduce)
  • Iterate using mappers over instances of this new Model
  • For each instance, use keys inside it to get the two entities of different kinds and calculate the correlation between them.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

债姬 2024-10-01 02:40:00

根据 Nick Johnson 的建议,我编写了自己的 InputReader。该读取器从两种不同类型中获取实体。它生成包含这些实体的所有组合的元组。如下:

class TwoKindsInputReader(InputReader):
    _APP_PARAM = "_app"
    _KIND1_PARAM = "kind1"
    _KIND2_PARAM = "kind2"
    MAPPER_PARAMS = "mapper_params"

    def __init__(self, reader1, reader2):
        self._reader1 = reader1
        self._reader2 = reader2

    def __iter__(self):
        for u in self._reader1:
            for e in self._reader2:
                yield (u, e)

    @classmethod
    def from_json(cls, input_shard_state):
        reader1 = DatastoreInputReader.from_json(input_shard_state[cls._KIND1_PARAM])
        reader2 = DatastoreInputReader.from_json(input_shard_state[cls._KIND2_PARAM])

        return cls(reader1, reader2)

    def to_json(self):
        json_dict = {}
        json_dict[self._KIND1_PARAM] = self._reader1.to_json()
        json_dict[self._KIND2_PARAM] = self._reader2.to_json()
        return json_dict

    @classmethod
    def split_input(cls, mapper_spec):
        params = mapper_spec.params
        app = params.get(cls._APP_PARAM)
        kind1 = params.get(cls._KIND1_PARAM)
        kind2 = params.get(cls._KIND2_PARAM)
        shard_count = mapper_spec.shard_count
        shard_count_sqrt = int(math.sqrt(shard_count))

        splitted1 = DatastoreInputReader._split_input_from_params(app, kind1, params, shard_count_sqrt)
        splitted2 = DatastoreInputReader._split_input_from_params(app, kind2, params, shard_count_sqrt)
        inputs = []

        for u in splitted1:
            for e in splitted2:
                inputs.append(TwoKindsInputReader(u, e))

        #mapper_spec.shard_count = len(inputs) #uncomment this in case of "Incorrect number of shard states" (at line 408 in handlers.py)
        return inputs

    @classmethod
    def validate(cls, mapper_spec):
        return True #TODO

当您需要处理两种实体的所有组合时,应该使用此代码。您还可以将其推广到两种以上的实体。

这是 TwoKindsInputReader 的有效 mapreduce.yaml:

mapreduce:
- name: recommendationMapReduce
  mapper:
    input_reader: customInputReaders.TwoKindsInputReader
    handler: recommendation.calculateCorrelationHandler
    params:
    - name: kind1
      default: kinds.User
    - name: kind2
      default: kinds.Item
    - name: shard_count
      default: 16

Following Nick Johnson suggestion, I wrote my own InputReader. This reader fetch entities from two different kinds. It yields tuples with all combinations of these entities. Here it is:

class TwoKindsInputReader(InputReader):
    _APP_PARAM = "_app"
    _KIND1_PARAM = "kind1"
    _KIND2_PARAM = "kind2"
    MAPPER_PARAMS = "mapper_params"

    def __init__(self, reader1, reader2):
        self._reader1 = reader1
        self._reader2 = reader2

    def __iter__(self):
        for u in self._reader1:
            for e in self._reader2:
                yield (u, e)

    @classmethod
    def from_json(cls, input_shard_state):
        reader1 = DatastoreInputReader.from_json(input_shard_state[cls._KIND1_PARAM])
        reader2 = DatastoreInputReader.from_json(input_shard_state[cls._KIND2_PARAM])

        return cls(reader1, reader2)

    def to_json(self):
        json_dict = {}
        json_dict[self._KIND1_PARAM] = self._reader1.to_json()
        json_dict[self._KIND2_PARAM] = self._reader2.to_json()
        return json_dict

    @classmethod
    def split_input(cls, mapper_spec):
        params = mapper_spec.params
        app = params.get(cls._APP_PARAM)
        kind1 = params.get(cls._KIND1_PARAM)
        kind2 = params.get(cls._KIND2_PARAM)
        shard_count = mapper_spec.shard_count
        shard_count_sqrt = int(math.sqrt(shard_count))

        splitted1 = DatastoreInputReader._split_input_from_params(app, kind1, params, shard_count_sqrt)
        splitted2 = DatastoreInputReader._split_input_from_params(app, kind2, params, shard_count_sqrt)
        inputs = []

        for u in splitted1:
            for e in splitted2:
                inputs.append(TwoKindsInputReader(u, e))

        #mapper_spec.shard_count = len(inputs) #uncomment this in case of "Incorrect number of shard states" (at line 408 in handlers.py)
        return inputs

    @classmethod
    def validate(cls, mapper_spec):
        return True #TODO

This code should be used when you need to process all combinations of entities of two kinds. You can also generalize this for more than two kinds.

Here it is a valid the mapreduce.yaml for TwoKindsInputReader:

mapreduce:
- name: recommendationMapReduce
  mapper:
    input_reader: customInputReaders.TwoKindsInputReader
    handler: recommendation.calculateCorrelationHandler
    params:
    - name: kind1
      default: kinds.User
    - name: kind2
      default: kinds.Item
    - name: shard_count
      default: 16
痴意少年 2024-10-01 02:40:00

如果没有您实际计算的更多细节,就很难知道应该推荐什么。一种简单的选择是简单地在映射调用中获取相关实体 - 没有什么可以阻止您在那里执行数据存储操作。

不过,这会导致很多小调用。正如您所建议的,编写自定义 InputReader 将允许您并行获取两组实体,这将显着提高性能。

如果您提供有关如何加入这些实体的更多详细信息,我们也许能够提供更具体的建议。

It's difficult to know what to recommend without more details of what you're actually calculating. One simple option is to simply fetch the related entity inside the map call - there's nothing preventing you from doing datastore operations there.

This will result in a lot of small calls, though. Writing a custom InputReader, as you suggest, will allow you to fetch both sets of entities in parallel, which will significantly improve performance.

If you give more details as to how you need to join these entities, we may be able to provide more concrete suggestions.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文