apache beam / dataflow gosdk管道不会处理任何pubsub消息

发布于 2025-02-13 23:58:19 字数 3685 浏览 0 评论 0原文

我希望每个人的星期五进展顺利。我拼命寻求Apache Beam Go SDK的帮助( https://github.com /apache/beam/tree/master/sdks )。我已经写了一条使用它来处理Pubsub事件的管道,并进入了我的工人开始良好的阶段,但是没有从PubSub消费的消息。我已经尝试运行SDK中提供的示例(

订阅中有消息(几百万)。执行实验并将订阅名称更改为不存在的事物时,我已经看到数据流日志中的错误。否则不会出错,除了通用数据流调试外没有信息。

2022-07-08T11:21:31.793474125ZStarting 3 workers in europe-west4-a...
Debug
2022-07-08T11:21:31.820662575ZStarting worker pool setup.
Debug
2022-07-08T11:22:00.789962383ZAutoscaling: Raised the number of workers to 3 so that the pipeline can catch up with its backlog and keep up with its input rate.
Debug
2022-07-08T11:22:50.806937837ZWorkers have started successfully.

这是我的管道代码的一部分:

var (
    inputTopic        = flag.String("topic", "", "PubSub input topic (required).")
    inputSubscription = flag.String("inputSubscription", "", "PubSub input subscription (required).")
    outputTableSpec   = flag.String("outputTableSpec", "", "Output BQ table (required).")
)

func init() {
    beam.RegisterType(reflect.TypeOf((*event.Envelope)(nil)).Elem())
    beam.RegisterType(reflect.TypeOf((*decodeEnvelopeJSONFunc)(nil)).Elem())
    [...]
}

func main() {
    flag.Parse()
    beam.Init()

    ctx := context.Background()
    if err := validateFlags(); err != nil {
        log.Exit(ctx, err.Error())
    }

    project := gcpopts.GetProject(ctx)
    p, s := beam.NewPipelineWithRoot()

    pubSubMessages := pubsubio.Read(s, project, *inputTopic, &pubsubio.ReadOptions{
        Subscription: *inputSubscription, WithAttributes: false, IDAttribute: "", TimestampAttribute: "",
    })

    eventMapper := DecodeAndMap(s, pubSubMessages)

    bigqueryio.Write(s, project, *outputTableSpec, eventMapper)

    if err := beamx.Run(ctx, p); err != nil {
        log.Fatalf(ctx, "failed to execute job: %v", err)
    }
}

func DecodeAndMap(s beam.Scope, messages beam.PCollection) beam.PCollection {
    s = s.Scope("DecodeAndMap")
    events := beam.ParDo(s, &decodeEnvelopeJSONFunc{}, messages)
    return beam.ParDo(s, &mapPayloadFunc{}, events)
}

type decodeEnvelopeJSONFunc struct{}

func (f *decodeEnvelopeJSONFunc) ProcessElement(ctx context.Context, msg []byte, emit func(*event.Envelope)) error {
    var e event.Envelope
    log.Infoln(ctx, "decoding envelope")

    if err := json.NewDecoder(bytes.NewReader(msg)).Decode(&e); err != nil {
        return fmt.Errorf("failed to decode envelope: %w", err)
    }

    log.Infoln(ctx, "emitting envelope")
    emit(&e)

    return nil
}

[...]

这是我部署管道的方式

go run ./pkg/my-mapper/. \
    --runner dataflow \
    --job_name my-mapper \
    --project mb-gcp-project \
    --region europe-west4 --zone europe-west4-a \
    --temp_location gs://my-beam-tmp-data-bucket/tmp/ \
    --staging_location gs://my-beam-tmp-data-bucket/binaries/ \
    --worker_harness_container_image=apache/beam_go_sdk:latest \
    --subnetwork regions/europe-west4/subnetworks/my-subnetwork \
    --num_workers 3 \
    --max_num_workers 10 \
    --async --update \
    --topic my-topic-name --inputSubscription my-sub-name --outputTableSpec my-gcp-project:my_dataset.mapped_data
2022/07/08 12:16:33 Cross-compiling ... as /tmp/worker-1-1657278993706049280
[...]
  "type": "JOB_TYPE_STREAMING"
}
2022/07/08 12:20:11 Submitted job: 2022-07-08_04_20_11-11918574995509384496

workerers pool pool runnig

I hope everyone's Friday is going well. I am desperately looking for some help with apache beam Go SDK (https://github.com/apache/beam/tree/master/sdks). I've written a pipeline using it to process PubSub events and got into a stage where my workers are starting nicely, but no messages are being consumed from pubsub. I've tried to run the example provided in the SDK (streaming_wordcap) that's using the same pubsubio and the result is the same. No messages in the newly created topics are being consumed. I wonder if there is an extra option that I should be enabling? Any deployment-specific flag? I am a little bit lost now.

There are messages in the subscription (a few million). When performed an experiment and changed the subscription name to something that doesn't exist I have seen errors in dataflow logs. Otherwise no errors, no info apart from generic dataflow debug.

2022-07-08T11:21:31.793474125ZStarting 3 workers in europe-west4-a...
Debug
2022-07-08T11:21:31.820662575ZStarting worker pool setup.
Debug
2022-07-08T11:22:00.789962383ZAutoscaling: Raised the number of workers to 3 so that the pipeline can catch up with its backlog and keep up with its input rate.
Debug
2022-07-08T11:22:50.806937837ZWorkers have started successfully.

Here is a part of my pipeline code:

var (
    inputTopic        = flag.String("topic", "", "PubSub input topic (required).")
    inputSubscription = flag.String("inputSubscription", "", "PubSub input subscription (required).")
    outputTableSpec   = flag.String("outputTableSpec", "", "Output BQ table (required).")
)

func init() {
    beam.RegisterType(reflect.TypeOf((*event.Envelope)(nil)).Elem())
    beam.RegisterType(reflect.TypeOf((*decodeEnvelopeJSONFunc)(nil)).Elem())
    [...]
}

func main() {
    flag.Parse()
    beam.Init()

    ctx := context.Background()
    if err := validateFlags(); err != nil {
        log.Exit(ctx, err.Error())
    }

    project := gcpopts.GetProject(ctx)
    p, s := beam.NewPipelineWithRoot()

    pubSubMessages := pubsubio.Read(s, project, *inputTopic, &pubsubio.ReadOptions{
        Subscription: *inputSubscription, WithAttributes: false, IDAttribute: "", TimestampAttribute: "",
    })

    eventMapper := DecodeAndMap(s, pubSubMessages)

    bigqueryio.Write(s, project, *outputTableSpec, eventMapper)

    if err := beamx.Run(ctx, p); err != nil {
        log.Fatalf(ctx, "failed to execute job: %v", err)
    }
}

func DecodeAndMap(s beam.Scope, messages beam.PCollection) beam.PCollection {
    s = s.Scope("DecodeAndMap")
    events := beam.ParDo(s, &decodeEnvelopeJSONFunc{}, messages)
    return beam.ParDo(s, &mapPayloadFunc{}, events)
}

type decodeEnvelopeJSONFunc struct{}

func (f *decodeEnvelopeJSONFunc) ProcessElement(ctx context.Context, msg []byte, emit func(*event.Envelope)) error {
    var e event.Envelope
    log.Infoln(ctx, "decoding envelope")

    if err := json.NewDecoder(bytes.NewReader(msg)).Decode(&e); err != nil {
        return fmt.Errorf("failed to decode envelope: %w", err)
    }

    log.Infoln(ctx, "emitting envelope")
    emit(&e)

    return nil
}

[...]

Here is how I am deploying my pipeline

go run ./pkg/my-mapper/. \
    --runner dataflow \
    --job_name my-mapper \
    --project mb-gcp-project \
    --region europe-west4 --zone europe-west4-a \
    --temp_location gs://my-beam-tmp-data-bucket/tmp/ \
    --staging_location gs://my-beam-tmp-data-bucket/binaries/ \
    --worker_harness_container_image=apache/beam_go_sdk:latest \
    --subnetwork regions/europe-west4/subnetworks/my-subnetwork \
    --num_workers 3 \
    --max_num_workers 10 \
    --async --update \
    --topic my-topic-name --inputSubscription my-sub-name --outputTableSpec my-gcp-project:my_dataset.mapped_data
2022/07/08 12:16:33 Cross-compiling ... as /tmp/worker-1-1657278993706049280
[...]
  "type": "JOB_TYPE_STREAMING"
}
2022/07/08 12:20:11 Submitted job: 2022-07-08_04_20_11-11918574995509384496

Workers pool runnig

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

〗斷ホ乔殘χμё〖 2025-02-20 23:58:22

这看起来像是一个窗口问题。这是一个流媒体管道,在​​写出之前没有窗口数据,这意味着它试图在默认的全局窗口中汇总,并且永远不会终止或拒绝消息。

在Pubsub步骤之后添加横梁。窗口步骤达到一些粒度。带有一些窗户或触发器。有关更多信息,请参见有关窗口和触发器的编程指南: https:https://beam.apace://beam.apache .org/document/drogramming guide/#窗口

这将允许在bigqueryio.write步骤要实际完成,无论它是否是“ Xlang”版本。

目前,我建议使用Xlang版本,因为已经验证了流媒体使用的验证。 Go Native One尚未经过测试和审查以适当地进行流写作,因此可能存在问题。

例如。如果您使用的是本机Bigqueryio,则管道将在这里延伸,因为管道正在等待全局窗口的结尾。
https://github.com/apache/beam/blob/master/sdks/go/pkg/pkg/beam/io/bigquerio/bigqueryio/bigquery.go#l207

否则,否则,以下是一些可以看的事情:

  1. 确保您的导入路径使用SDK的“ SDK/V2”。没有该V2的进口量被困在2.32,并且不期望工作。没有其他代码更改被移动。因此 good 是2.40), bad (请参阅最新的是2.32+不兼容) 。编辑:查看屏幕截图,您已经在执行此操作了。出色的!
  2. 确保event.envelope的字段全部导出。否则,它们不会被任何JSON或Beam Native编码的横梁序列化。但是,我看不到这是根部问题。
  3. 如果您使用的是SDK的已发布版本(v2.40.0,v2.39.0等... d),则不需要包含worker_harness_container_image_image flag,并且可能会导致Worker启动问题。虽然历史上的光束GO容器并没有太大变化,但由于添加了其他功能,因此不再是这种情况,这需要容器引导程序和Worker Harness码之间的协调。

干杯!

This looks like a windowing problem. It's a streaming pipeline that isn't windowing it's data before writing out, which means it's trying to aggregate in the Default Global Window, and never terminate or Ack the messages.

Add a beam.WindowInto step to some granularity after the PubSub step. Either with some windowing, or Triggers. See the Programming Guide on Windowing and Triggers for more info: https://beam.apache.org/documentation/programming-guide/#windowing

This will allow any aggretation operation in the bigqueryio.Write step to actually complete, regardless of if it's the "xlang" version or not.

At this time, I'd recommend using the xlang version, as that has been validated for Streaming use. The Go native one hasn't been tested and vetted to work properly for Streaming writes, and there could be issues as a result.

Eg. If you're using the native bigqueryIO, your pipeline is getting held up here, as the pipeline is waiting for the end of the global window.
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/io/bigqueryio/bigquery.go#L207

Otherwise, here are some things to look into though:

  1. Ensure your import paths are using the "sdks/v2" of the SDK. Imports that don't have that v2 are stuck at 2.32, and aren't expected to work. No other code changes are exected. So good (see that latest is 2.40), and bad (see that latest is 2.32+incompatible). Edit: Looking at the screenshot, you're already doing this. Excellent!
  2. Ensure that the fields of event.Envelope are all Exported. Otherwise they won't be serialized by any of either JSON or the beam native Schema encoding. I can't see this being the root problem however.
  3. If you're using a released version of the SDK (v2.40.0, v2.39.0 etc...d), you don't need to include the worker_harness_container_image flag, and infact that could cause worker startup issues. While historically the Beam Go container hasn't changed much, that's no longer the case as additional features are being added, which require coordination between the container bootloader and the worker harness code.

Cheers!

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文