apache beam / dataflow gosdk管道不会处理任何pubsub消息
我希望每个人的星期五进展顺利。我拼命寻求Apache Beam Go SDK的帮助( https://github.com /apache/beam/tree/master/sdks )。我已经写了一条使用它来处理Pubsub事件的管道,并进入了我的工人开始良好的阶段,但是没有从PubSub消费的消息。我已经尝试运行SDK中提供的示例(
订阅中有消息(几百万)。执行实验并将订阅名称更改为不存在的事物时,我已经看到数据流日志中的错误。否则不会出错,除了通用数据流调试外没有信息。
2022-07-08T11:21:31.793474125ZStarting 3 workers in europe-west4-a...
Debug
2022-07-08T11:21:31.820662575ZStarting worker pool setup.
Debug
2022-07-08T11:22:00.789962383ZAutoscaling: Raised the number of workers to 3 so that the pipeline can catch up with its backlog and keep up with its input rate.
Debug
2022-07-08T11:22:50.806937837ZWorkers have started successfully.
这是我的管道代码的一部分:
var (
inputTopic = flag.String("topic", "", "PubSub input topic (required).")
inputSubscription = flag.String("inputSubscription", "", "PubSub input subscription (required).")
outputTableSpec = flag.String("outputTableSpec", "", "Output BQ table (required).")
)
func init() {
beam.RegisterType(reflect.TypeOf((*event.Envelope)(nil)).Elem())
beam.RegisterType(reflect.TypeOf((*decodeEnvelopeJSONFunc)(nil)).Elem())
[...]
}
func main() {
flag.Parse()
beam.Init()
ctx := context.Background()
if err := validateFlags(); err != nil {
log.Exit(ctx, err.Error())
}
project := gcpopts.GetProject(ctx)
p, s := beam.NewPipelineWithRoot()
pubSubMessages := pubsubio.Read(s, project, *inputTopic, &pubsubio.ReadOptions{
Subscription: *inputSubscription, WithAttributes: false, IDAttribute: "", TimestampAttribute: "",
})
eventMapper := DecodeAndMap(s, pubSubMessages)
bigqueryio.Write(s, project, *outputTableSpec, eventMapper)
if err := beamx.Run(ctx, p); err != nil {
log.Fatalf(ctx, "failed to execute job: %v", err)
}
}
func DecodeAndMap(s beam.Scope, messages beam.PCollection) beam.PCollection {
s = s.Scope("DecodeAndMap")
events := beam.ParDo(s, &decodeEnvelopeJSONFunc{}, messages)
return beam.ParDo(s, &mapPayloadFunc{}, events)
}
type decodeEnvelopeJSONFunc struct{}
func (f *decodeEnvelopeJSONFunc) ProcessElement(ctx context.Context, msg []byte, emit func(*event.Envelope)) error {
var e event.Envelope
log.Infoln(ctx, "decoding envelope")
if err := json.NewDecoder(bytes.NewReader(msg)).Decode(&e); err != nil {
return fmt.Errorf("failed to decode envelope: %w", err)
}
log.Infoln(ctx, "emitting envelope")
emit(&e)
return nil
}
[...]
这是我部署管道的方式
go run ./pkg/my-mapper/. \
--runner dataflow \
--job_name my-mapper \
--project mb-gcp-project \
--region europe-west4 --zone europe-west4-a \
--temp_location gs://my-beam-tmp-data-bucket/tmp/ \
--staging_location gs://my-beam-tmp-data-bucket/binaries/ \
--worker_harness_container_image=apache/beam_go_sdk:latest \
--subnetwork regions/europe-west4/subnetworks/my-subnetwork \
--num_workers 3 \
--max_num_workers 10 \
--async --update \
--topic my-topic-name --inputSubscription my-sub-name --outputTableSpec my-gcp-project:my_dataset.mapped_data
2022/07/08 12:16:33 Cross-compiling ... as /tmp/worker-1-1657278993706049280
[...]
"type": "JOB_TYPE_STREAMING"
}
2022/07/08 12:20:11 Submitted job: 2022-07-08_04_20_11-11918574995509384496
I hope everyone's Friday is going well. I am desperately looking for some help with apache beam Go SDK (https://github.com/apache/beam/tree/master/sdks). I've written a pipeline using it to process PubSub events and got into a stage where my workers are starting nicely, but no messages are being consumed from pubsub. I've tried to run the example provided in the SDK (streaming_wordcap) that's using the same pubsubio and the result is the same. No messages in the newly created topics are being consumed. I wonder if there is an extra option that I should be enabling? Any deployment-specific flag? I am a little bit lost now.
There are messages in the subscription (a few million). When performed an experiment and changed the subscription name to something that doesn't exist I have seen errors in dataflow logs. Otherwise no errors, no info apart from generic dataflow debug.
2022-07-08T11:21:31.793474125ZStarting 3 workers in europe-west4-a...
Debug
2022-07-08T11:21:31.820662575ZStarting worker pool setup.
Debug
2022-07-08T11:22:00.789962383ZAutoscaling: Raised the number of workers to 3 so that the pipeline can catch up with its backlog and keep up with its input rate.
Debug
2022-07-08T11:22:50.806937837ZWorkers have started successfully.
Here is a part of my pipeline code:
var (
inputTopic = flag.String("topic", "", "PubSub input topic (required).")
inputSubscription = flag.String("inputSubscription", "", "PubSub input subscription (required).")
outputTableSpec = flag.String("outputTableSpec", "", "Output BQ table (required).")
)
func init() {
beam.RegisterType(reflect.TypeOf((*event.Envelope)(nil)).Elem())
beam.RegisterType(reflect.TypeOf((*decodeEnvelopeJSONFunc)(nil)).Elem())
[...]
}
func main() {
flag.Parse()
beam.Init()
ctx := context.Background()
if err := validateFlags(); err != nil {
log.Exit(ctx, err.Error())
}
project := gcpopts.GetProject(ctx)
p, s := beam.NewPipelineWithRoot()
pubSubMessages := pubsubio.Read(s, project, *inputTopic, &pubsubio.ReadOptions{
Subscription: *inputSubscription, WithAttributes: false, IDAttribute: "", TimestampAttribute: "",
})
eventMapper := DecodeAndMap(s, pubSubMessages)
bigqueryio.Write(s, project, *outputTableSpec, eventMapper)
if err := beamx.Run(ctx, p); err != nil {
log.Fatalf(ctx, "failed to execute job: %v", err)
}
}
func DecodeAndMap(s beam.Scope, messages beam.PCollection) beam.PCollection {
s = s.Scope("DecodeAndMap")
events := beam.ParDo(s, &decodeEnvelopeJSONFunc{}, messages)
return beam.ParDo(s, &mapPayloadFunc{}, events)
}
type decodeEnvelopeJSONFunc struct{}
func (f *decodeEnvelopeJSONFunc) ProcessElement(ctx context.Context, msg []byte, emit func(*event.Envelope)) error {
var e event.Envelope
log.Infoln(ctx, "decoding envelope")
if err := json.NewDecoder(bytes.NewReader(msg)).Decode(&e); err != nil {
return fmt.Errorf("failed to decode envelope: %w", err)
}
log.Infoln(ctx, "emitting envelope")
emit(&e)
return nil
}
[...]
Here is how I am deploying my pipeline
go run ./pkg/my-mapper/. \
--runner dataflow \
--job_name my-mapper \
--project mb-gcp-project \
--region europe-west4 --zone europe-west4-a \
--temp_location gs://my-beam-tmp-data-bucket/tmp/ \
--staging_location gs://my-beam-tmp-data-bucket/binaries/ \
--worker_harness_container_image=apache/beam_go_sdk:latest \
--subnetwork regions/europe-west4/subnetworks/my-subnetwork \
--num_workers 3 \
--max_num_workers 10 \
--async --update \
--topic my-topic-name --inputSubscription my-sub-name --outputTableSpec my-gcp-project:my_dataset.mapped_data
2022/07/08 12:16:33 Cross-compiling ... as /tmp/worker-1-1657278993706049280
[...]
"type": "JOB_TYPE_STREAMING"
}
2022/07/08 12:20:11 Submitted job: 2022-07-08_04_20_11-11918574995509384496
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
这看起来像是一个窗口问题。这是一个流媒体管道,在写出之前没有窗口数据,这意味着它试图在默认的全局窗口中汇总,并且永远不会终止或拒绝消息。
在Pubsub步骤之后添加横梁。窗口步骤达到一些粒度。带有一些窗户或触发器。有关更多信息,请参见有关窗口和触发器的编程指南: https:https://beam.apace://beam.apache .org/document/drogramming guide/#窗口
这将允许在bigqueryio.write步骤要实际完成,无论它是否是“ Xlang”版本。
目前,我建议使用Xlang版本,因为已经验证了流媒体使用的验证。 Go Native One尚未经过测试和审查以适当地进行流写作,因此可能存在问题。
例如。如果您使用的是本机Bigqueryio,则管道将在这里延伸,因为管道正在等待全局窗口的结尾。
https://github.com/apache/beam/blob/master/sdks/go/pkg/pkg/beam/io/bigquerio/bigqueryio/bigquery.go#l207
否则,否则,以下是一些可以看的事情:
event.envelope
的字段全部导出。否则,它们不会被任何JSON或Beam Native编码的横梁序列化。但是,我看不到这是根部问题。worker_harness_container_image_image
flag,并且可能会导致Worker启动问题。虽然历史上的光束GO容器并没有太大变化,但由于添加了其他功能,因此不再是这种情况,这需要容器引导程序和Worker Harness码之间的协调。干杯!
This looks like a windowing problem. It's a streaming pipeline that isn't windowing it's data before writing out, which means it's trying to aggregate in the Default Global Window, and never terminate or Ack the messages.
Add a beam.WindowInto step to some granularity after the PubSub step. Either with some windowing, or Triggers. See the Programming Guide on Windowing and Triggers for more info: https://beam.apache.org/documentation/programming-guide/#windowing
This will allow any aggretation operation in the bigqueryio.Write step to actually complete, regardless of if it's the "xlang" version or not.
At this time, I'd recommend using the xlang version, as that has been validated for Streaming use. The Go native one hasn't been tested and vetted to work properly for Streaming writes, and there could be issues as a result.
Eg. If you're using the native bigqueryIO, your pipeline is getting held up here, as the pipeline is waiting for the end of the global window.
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/io/bigqueryio/bigquery.go#L207
Otherwise, here are some things to look into though:
event.Envelope
are all Exported. Otherwise they won't be serialized by any of either JSON or the beam native Schema encoding. I can't see this being the root problem however.worker_harness_container_image
flag, and infact that could cause worker startup issues. While historically the Beam Go container hasn't changed much, that's no longer the case as additional features are being added, which require coordination between the container bootloader and the worker harness code.Cheers!