Apache光束在无限侧输入上被阻塞

发布于 2025-01-27 00:03:34 字数 1965 浏览 5 评论 0原文

我的问题与另一篇文章非常相似: apache beam beam cloud dataflow dataflow dataflow datafling stuck side input

但是,我尝试了那里的分辨率(将GlobalWindows()应用于侧面输入),但似乎无法解决我的问题。

我有一个数据流管线(但我正在使用python sdk使用DirectRunner进行调试),其中主要输入是来自PubSub的日志,而侧面输入是来自大多数不变数据库的关联数据。我想加入两个,以便每个日志都与大约相同时间的侧面输入数据配对。可以删除没有相关日志的过多侧输入。

我看到的行为是管道似乎是作为一个线程运行的。它首先处理所有侧输入元素,然后开始处理主要输入元素。如果侧面输入是有界的(非流程),则可以合并输入并运行到完成。但是,如果侧面输入是无限的(流),则主输入无限期地阻止,而显然正在等待侧面输入处理完成。

为了说明行为,我在下面进行了简化的测试案例。

class Logger(apache_beam.DoFn):

  def __init__(self, name):
    self._name = name

  def process(self, element, w=apache_beam.DoFn.WindowParam,
              ts=apache_beam.DoFn.TimestampParam):
    logging.error('%s: %s', self._name, element)
    yield element

def cross_join(left, rights):
  for right in rights:
    yield (left, right)

def main():
  start = timestamp.Timestamp.now()

  # Bounded side inputs work OK.
  stop = start + 20

  # Unbounded side inputs appear to block execution of main input
  # processing.
  #stop = timestamp.MAX_TIMESTAMP

  side_interval = 5
  main_interval = 1

  side_input = (
      pipeline
      | PeriodicImpulse(
          start_timestamp=start,
          stop_timestamp=stop,
          fire_interval=side_interval,
          apply_windowing=True)
      | apache_beam.Map(lambda x: ('side', x))
      | apache_beam.ParDo(Logger('side_input'))
  )
  main_input = (
      pipeline
      | PeriodicImpulse(
          start_timestamp=start, stop_timestamp=stop,
          fire_interval=main_interval, apply_windowing=True)
      | apache_beam.Map(lambda x: ('main', x))
      | apache_beam.ParDo(Logger('main_input'))
      | 'CrossJoin' >> apache_beam.FlatMap(
          cross_join, rights=apache_beam.pvalue.AsIter(side_input))
      | 'CrossJoinLogger' >> apache_beam.ParDo(Logger('cross_join_output'))
  )
  pipeline.run()

是否缺少阻止主输入与侧面输入并行处理的东西?

My question is very similar to another post: Apache Beam Cloud Dataflow Streaming Stuck Side Input.

However, I tried the resolution there (apply GlobalWindows() to the side input), and it did not seem to fix my problem.

I have a Dataflow pipeline (but I'm using DirectRunner for debug) with Python SDK where the main input are logs from PubSub and the side input is associated data from a mostly unchanging database. I would like to join the two such that each log is paired with side input data from the same approximate time. Excess side inputs without an associated log can be dropped.

The behavior I see is that the pipeline seems to be operating as a single thread. It processes the all side input elements first, then starts processing the main input elements. If the side input is bounded (non-streaming), this is fine, and the pipeline can merge inputs and run to completion. If the side input is unbounded (streaming), however, the main input is blocked indefinitely while apparently waiting for the side input processing to finish.

To illustrate the behavior, I made simplified test case below.

class Logger(apache_beam.DoFn):

  def __init__(self, name):
    self._name = name

  def process(self, element, w=apache_beam.DoFn.WindowParam,
              ts=apache_beam.DoFn.TimestampParam):
    logging.error('%s: %s', self._name, element)
    yield element

def cross_join(left, rights):
  for right in rights:
    yield (left, right)

def main():
  start = timestamp.Timestamp.now()

  # Bounded side inputs work OK.
  stop = start + 20

  # Unbounded side inputs appear to block execution of main input
  # processing.
  #stop = timestamp.MAX_TIMESTAMP

  side_interval = 5
  main_interval = 1

  side_input = (
      pipeline
      | PeriodicImpulse(
          start_timestamp=start,
          stop_timestamp=stop,
          fire_interval=side_interval,
          apply_windowing=True)
      | apache_beam.Map(lambda x: ('side', x))
      | apache_beam.ParDo(Logger('side_input'))
  )
  main_input = (
      pipeline
      | PeriodicImpulse(
          start_timestamp=start, stop_timestamp=stop,
          fire_interval=main_interval, apply_windowing=True)
      | apache_beam.Map(lambda x: ('main', x))
      | apache_beam.ParDo(Logger('main_input'))
      | 'CrossJoin' >> apache_beam.FlatMap(
          cross_join, rights=apache_beam.pvalue.AsIter(side_input))
      | 'CrossJoinLogger' >> apache_beam.ParDo(Logger('cross_join_output'))
  )
  pipeline.run()

Am missing something that is preventing main inputs from being processed in parallel with the side inputs?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

昇り龍 2025-02-03 00:03:34

仅当水印已通过相应的侧输入的窗口时,主要输入才能提高。请参阅编程指南中的详细信息。您可能需要对主输入和侧面输入进行窗口,并确保周期性点击正确地推进了水印。

The main input can advance only when the watermark has passed the corresponding side input's windowing. See details in the programming guide. You likely need to window both the main and side input, and make sure PeriodicImpulse is correctly advancing the watermark.

南七夏 2025-02-03 00:03:34

使用 stackoverflow.com/q/70561769 的示例案例。那里的答案是将GlobalWindows()应用于side_input。

side_input = ( pipeline
  | PeriodicImpulse(fire_interval=300, apply_windowing=False)
  | "ApplyGlobalWindow" >> WindowInto(window.GlobalWindows(),
      trigger=trigger.Repeatedly(trigger.AfterProcessingTime(5)),
      accumulation_mode=trigger.AccumulationMode.DISCARDING)
  | ...
)

基于实验,我的结论是在某些情况下,在侧面输入上周期性突破会导致主要输入被阻止,例如下面:

Case 1: GOOD
GlobalWindow
Main input = PubSub
Side input = PeriodicImpulse
Case 2: BAD
FixedWindow
Main input = PubSub
Side input = PeriodicImpulse
Case 3: BAD
GlobalWindow / FixedWindow
Main input = PeriodicImpulse
Side input = PeriodicImpulse
Case 4: GOOD
FixedWindow
Main input = PubSub
Side input = PubSub

我的问题现在是侧面输入时间戳与主输入不正确 stackoverflow.com/q/72382440

Using the example from stackoverflow.com/q/70561769 I was able to get the side input and main input working concurrently as expected for certain cases. The answer there was to apply GlobalWindows() to the side_input.

side_input = ( pipeline
  | PeriodicImpulse(fire_interval=300, apply_windowing=False)
  | "ApplyGlobalWindow" >> WindowInto(window.GlobalWindows(),
      trigger=trigger.Repeatedly(trigger.AfterProcessingTime(5)),
      accumulation_mode=trigger.AccumulationMode.DISCARDING)
  | ...
)

Based on experimentation, my conclusion is there are cases when PeriodicImpulse on the side input causes the main input to block, such as below:

Case 1: GOOD
GlobalWindow
Main input = PubSub
Side input = PeriodicImpulse
Case 2: BAD
FixedWindow
Main input = PubSub
Side input = PeriodicImpulse
Case 3: BAD
GlobalWindow / FixedWindow
Main input = PeriodicImpulse
Side input = PeriodicImpulse
Case 4: GOOD
FixedWindow
Main input = PubSub
Side input = PubSub

My problem now is that the side input timestamps are not aligning with the main input properly stackoverflow.com/q/72382440.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文