Apache Beam - 在另一个聚合中使用一个聚合的输出

发布于 2025-01-19 11:56:04 字数 951 浏览 5 评论 0原文

我是Apache Beam的新手,来自Pyspark的DataFrame API。我在另一个梁计算中使用一个梁计算的输出遇到了麻烦。基本上,我想执行一个聚合,该聚合产生一个值(例如平均值),并将该聚合的结果用作随后的聚合中的python原始(例如float)。例如:

import apache_beam as beam

DATA = [
  beam.Row(val="hello"),
  beam.Row(val="stackoverflow,"),
  beam.Row(val="plz"),
  beam.Row(val="halp"),
]

with beam.Pipeline() as pipe:
  graph = pipe | beam.Create(DATA)
  average_word_length = (graph 
    | "Get lengths" >> beam.ParDo(lambda row: beam.Row(length=len(row.val)))
    | "Compute mean" >> beam.combiners.Mean.Globally()
    | "Print avg" >> beam.Map(print)
  )
  # average_word_lengths is a PCollection with only one value: 6.5

  (graph 
    | "Compute metric" >> beam.ParDo(lambda row: beam.Row(newval=len(row.val)/average_word_length)) # fails here
    | beam.Map(print)
  )

由于我试图将int划分为pcollection ...是否有办法从paquase_word_length pcollection中提取一个float值并将其用作浮点在下一个聚合中?如果没有,我如何获得类似的东西?

I am new to Apache Beam, coming over from PySpark's dataframe API. I'm having trouble using the output of one beam calculation in another beam calculation. Basically, I want to perform an aggregation that yields one value (such as an average) and use the result of this aggregation as a python primitive (e.g. a float) in a subsequent aggregation. For example:

import apache_beam as beam

DATA = [
  beam.Row(val="hello"),
  beam.Row(val="stackoverflow,"),
  beam.Row(val="plz"),
  beam.Row(val="halp"),
]

with beam.Pipeline() as pipe:
  graph = pipe | beam.Create(DATA)
  average_word_length = (graph 
    | "Get lengths" >> beam.ParDo(lambda row: beam.Row(length=len(row.val)))
    | "Compute mean" >> beam.combiners.Mean.Globally()
    | "Print avg" >> beam.Map(print)
  )
  # average_word_lengths is a PCollection with only one value: 6.5

  (graph 
    | "Compute metric" >> beam.ParDo(lambda row: beam.Row(newval=len(row.val)/average_word_length)) # fails here
    | beam.Map(print)
  )

This fails with a TypeError because I'm trying to divide an int by a PCollection... is there a way to extract the one float value from the average_word_length PCollection and use it as a float in the next aggregation? If not, how do I achieve something similar?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

少女净妖师 2025-01-26 11:56:04

您需要的是侧面输入。这是关于它们的编程指南

工作解决方案(请注意,我删除了其中一张打印件,以便那里有实际输出):

import apache_beam as beam
from apache_beam import pvalue


DATA = [
  beam.Row(val="HOPE"),
  beam.Row(val="THIS,"),
  beam.Row(val="WORKS"),
  beam.Row(val="BLABLABLA"),
]

with beam.Pipeline() as pipe:
  graph = pipe | beam.Create(DATA)
  average_word_length = (graph 
    | "Get lengths" >> beam.ParDo(lambda row: beam.Row(length=len(row.val)))
    | "Compute mean" >> beam.combiners.Mean.Globally()
  )
  # average_word_lengths is a PCollection with only one value: 6.5

  (graph 
    | "Compute metric" >> beam.ParDo(lambda row, side: beam.Row(newval=len(row.val)/side), 
                                                          side=pvalue.AsSingleton(average_word_length)) # fails here
    | beam.Map(print)
  )

What you need is a Side Input. Here's the programming guide about them.

Working solution (note that I removed one of the prints so that there's an actual output there):

import apache_beam as beam
from apache_beam import pvalue


DATA = [
  beam.Row(val="HOPE"),
  beam.Row(val="THIS,"),
  beam.Row(val="WORKS"),
  beam.Row(val="BLABLABLA"),
]

with beam.Pipeline() as pipe:
  graph = pipe | beam.Create(DATA)
  average_word_length = (graph 
    | "Get lengths" >> beam.ParDo(lambda row: beam.Row(length=len(row.val)))
    | "Compute mean" >> beam.combiners.Mean.Globally()
  )
  # average_word_lengths is a PCollection with only one value: 6.5

  (graph 
    | "Compute metric" >> beam.ParDo(lambda row, side: beam.Row(newval=len(row.val)/side), 
                                                          side=pvalue.AsSingleton(average_word_length)) # fails here
    | beam.Map(print)
  )
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文