DataProc上的流式弗林克作业引发了工人的GRPC错误

发布于 2025-01-25 12:55:28 字数 7033 浏览 6 评论 0原文

我的流flink Job(来自Pub/sub Source)从工人中投掷多个错误消息:

Traceback (most recent call last):
  File "test.py", line 175, in <module>
    run(
  File "test.py", line 139, in run
    pub_sub_data = ( pipeline | "Read from Pub/Sub" >> pubsub.ReadFromPubSub(topic=input_topic))
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py", line 1090, in __ror__
    return self.transform.__ror__(pvalueish, self.label)
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py", line 614, in __ror__
    result = p.apply(self, pvalueish, label)
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/pipeline.py", line 662, in apply
    return self.apply(transform, pvalueish)
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/pipeline.py", line 708, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/runners/runner.py", line 185, in apply
    return m(transform, input, options)
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/runners/runner.py", line 215, in apply_PTransform
    return transform.expand(input)
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/io/external/gcp/pubsub.py", line 98, in expand
    pcoll = pbegin.apply(
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/pvalue.py", line 134, in apply
    return self.pipeline.apply(*arglist, **kwargs)
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/pipeline.py", line 708, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/runners/runner.py", line 185, in apply
    return m(transform, input, options)
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/runners/runner.py", line 215, in apply_PTransform
    return transform.expand(input)
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/transforms/external.py", line 473, in expand
    response = service.Expand(request)
  File "/opt/conda/default/lib/python3.8/site-packages/grpc/_channel.py", line 946, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/opt/conda/default/lib/python3.8/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
    raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
    status = StatusCode.UNAVAILABLE
    details = "failed to connect to all addresses"
    debug_error_string = "{"created":"@1651418111.458421765","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3128,"referenced_errors":[{"created":"@1651418111.458419596","description":"failed to connect to all addresses","file":"src/core/lib/transport/error_utils.cc","file_line":163,"grpc_status":14}]}"
>

我正在使用apache_beam.io.external.gcp.pubsub.pubsub.readfrompubsub函数,用于阅读酒吧sub主题

  • python 3.8
  • apache beam gcp gcp gcp gcp gcp 2.34.0
  • flink 1.12 flink 1.12

代码:

    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, checkpointing_interval=1000, save_main_session=True
    )

    with Pipeline(options=pipeline_options) as pipeline:
        pub_sub_data = ( pipeline | "Read from Pub/Sub" >> pubsub.ReadFromPubSub(topic=input_topic))

我确实尝试了apache_beam.io.ReadFrompubSub,用于从Pub sub主题阅读,以下是我收到的错误

DEBUG:root:java.lang.IllegalArgumentException: PCollectionNodes [PCollectionNode{id=ref_PCollection_PCollection_1, PCollection=unique_name: "22Read from Pub/Sub/Read.None"
coder_id: "ref_Coder_BytesCoder_1"
is_bounded: UNBOUNDED
windowing_strategy_id: "ref_Windowing_Windowing_1"
}] were consumed but never produced
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)
    at org.apache.beam.runners.core.construction.graph.QueryablePipeline.buildNetwork(QueryablePipeline.java:234)
    at org.apache.beam.runners.core.construction.graph.QueryablePipeline.<init>(QueryablePipeline.java:127)
    at org.apache.beam.runners.core.construction.graph.QueryablePipeline.forPrimitivesIn(QueryablePipeline.java:90)
    at org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser.<init>(GreedyPipelineFuser.java:70)
    at org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser.fuse(GreedyPipelineFuser.java:93)
    at org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:112)
    at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:85)
    at org.apache.beam.runners.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:86)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

ERROR:root:java.lang.IllegalArgumentException: PCollectionNodes [PCollectionNode{id=ref_PCollection_PCollection_1, PCollection=unique_name: "22Read from Pub/Sub/Read.None"
coder_id: "ref_Coder_BytesCoder_1"
is_bounded: UNBOUNDED
windowing_strategy_id: "ref_Windowing_Windowing_1"
}] were consumed but never produced
INFO:apache_beam.runners.portability.portable_runner:Job state changed to FAILED
Traceback (most recent call last):
  File "test.py", line 175, in <module>
    run(
  File "test.py", line 144, in run
    _ = main_error | "Transformation Errors to GCS" >> ParDo(WriteToGCS(output_path))
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/pipeline.py", line 597, in __exit__
    self.result.wait_until_finish()
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/runners/portability/portable_runner.py", line 600, in wait_until_finish
    raise self._runtime_exception
RuntimeError: Pipeline BeamApp-ravi-0501153516-4f843e9f_2e7c1bb8-7ac7-4adc-a8f4-fa9f0f97b770 failed in state FAILED: java.lang.IllegalArgumentException: PCollectionNodes [PCollectionNode{id=ref_PCollection_PCollection_1, PCollection=unique_name: "22Read from Pub/Sub/Read.None"
coder_id: "ref_Coder_BytesCoder_1"
is_bounded: UNBOUNDED
windowing_strategy_id: "ref_Windowing_Windowing_1"
}] were consumed but never produced
DEBUG:root:Sending SIGINT to job_server

My streaming Flink job (from Pub/Sub source) throws multiple error messages from the worker:

Traceback (most recent call last):
  File "test.py", line 175, in <module>
    run(
  File "test.py", line 139, in run
    pub_sub_data = ( pipeline | "Read from Pub/Sub" >> pubsub.ReadFromPubSub(topic=input_topic))
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py", line 1090, in __ror__
    return self.transform.__ror__(pvalueish, self.label)
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py", line 614, in __ror__
    result = p.apply(self, pvalueish, label)
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/pipeline.py", line 662, in apply
    return self.apply(transform, pvalueish)
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/pipeline.py", line 708, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/runners/runner.py", line 185, in apply
    return m(transform, input, options)
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/runners/runner.py", line 215, in apply_PTransform
    return transform.expand(input)
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/io/external/gcp/pubsub.py", line 98, in expand
    pcoll = pbegin.apply(
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/pvalue.py", line 134, in apply
    return self.pipeline.apply(*arglist, **kwargs)
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/pipeline.py", line 708, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/runners/runner.py", line 185, in apply
    return m(transform, input, options)
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/runners/runner.py", line 215, in apply_PTransform
    return transform.expand(input)
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/transforms/external.py", line 473, in expand
    response = service.Expand(request)
  File "/opt/conda/default/lib/python3.8/site-packages/grpc/_channel.py", line 946, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/opt/conda/default/lib/python3.8/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
    raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
    status = StatusCode.UNAVAILABLE
    details = "failed to connect to all addresses"
    debug_error_string = "{"created":"@1651418111.458421765","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3128,"referenced_errors":[{"created":"@1651418111.458419596","description":"failed to connect to all addresses","file":"src/core/lib/transport/error_utils.cc","file_line":163,"grpc_status":14}]}"
>

I am using apache_beam.io.external.gcp.pubsub.ReadFromPubSub function for reading the pub sub topic

  • python 3.8
  • apache beam gcp 2.34.0
  • Flink 1.12

Code:

    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, checkpointing_interval=1000, save_main_session=True
    )

    with Pipeline(options=pipeline_options) as pipeline:
        pub_sub_data = ( pipeline | "Read from Pub/Sub" >> pubsub.ReadFromPubSub(topic=input_topic))

I did try apache_beam.io.ReadFromPubSub for reading from pub sub topic and below is the error I get

DEBUG:root:java.lang.IllegalArgumentException: PCollectionNodes [PCollectionNode{id=ref_PCollection_PCollection_1, PCollection=unique_name: "22Read from Pub/Sub/Read.None"
coder_id: "ref_Coder_BytesCoder_1"
is_bounded: UNBOUNDED
windowing_strategy_id: "ref_Windowing_Windowing_1"
}] were consumed but never produced
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)
    at org.apache.beam.runners.core.construction.graph.QueryablePipeline.buildNetwork(QueryablePipeline.java:234)
    at org.apache.beam.runners.core.construction.graph.QueryablePipeline.<init>(QueryablePipeline.java:127)
    at org.apache.beam.runners.core.construction.graph.QueryablePipeline.forPrimitivesIn(QueryablePipeline.java:90)
    at org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser.<init>(GreedyPipelineFuser.java:70)
    at org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser.fuse(GreedyPipelineFuser.java:93)
    at org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:112)
    at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:85)
    at org.apache.beam.runners.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:86)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

ERROR:root:java.lang.IllegalArgumentException: PCollectionNodes [PCollectionNode{id=ref_PCollection_PCollection_1, PCollection=unique_name: "22Read from Pub/Sub/Read.None"
coder_id: "ref_Coder_BytesCoder_1"
is_bounded: UNBOUNDED
windowing_strategy_id: "ref_Windowing_Windowing_1"
}] were consumed but never produced
INFO:apache_beam.runners.portability.portable_runner:Job state changed to FAILED
Traceback (most recent call last):
  File "test.py", line 175, in <module>
    run(
  File "test.py", line 144, in run
    _ = main_error | "Transformation Errors to GCS" >> ParDo(WriteToGCS(output_path))
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/pipeline.py", line 597, in __exit__
    self.result.wait_until_finish()
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/runners/portability/portable_runner.py", line 600, in wait_until_finish
    raise self._runtime_exception
RuntimeError: Pipeline BeamApp-ravi-0501153516-4f843e9f_2e7c1bb8-7ac7-4adc-a8f4-fa9f0f97b770 failed in state FAILED: java.lang.IllegalArgumentException: PCollectionNodes [PCollectionNode{id=ref_PCollection_PCollection_1, PCollection=unique_name: "22Read from Pub/Sub/Read.None"
coder_id: "ref_Coder_BytesCoder_1"
is_bounded: UNBOUNDED
windowing_strategy_id: "ref_Windowing_Windowing_1"
}] were consumed but never produced
DEBUG:root:Sending SIGINT to job_server

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文