缩放超过1个工人时,Kafka到BigQuery管道上的数据流摊位
我有一个kafka-to-to-ligquery dataflow管道,我从多个主题中消费,并使用动态目的地来输出每个主题的合适的bigquery表。
当仅涉及一个工人时,管道会顺利运行。
但是,当它自动尺度或手动配置为1个以上的工人时,它完全停滞在BigQuery Write Transform,数据陈旧的增长和增长。无论来自KAFKA的数据流数量如何,数据稳定性的增长似乎都会发生。此时,没有插入桌子行。
没有向工作或工人日志报告错误。
具体而言,在大Query写入转换的内部,它在编写之前的改组过程中正在停滞不前。
在BigQuery Write Transform期间发生的重组中,有一个groupByKey
,其中管道失速如下:
我可以看到window.into
上面的步骤工作正常:
但是,在写入转换的大问题中并未隔离此问题。如果我在管道中添加了一个重组步骤,例如,在“从kafka记录中提取的摘录”
步骤之后,停滞管道的问题出现在同一groupbykey 。
我正在使用Beam SDK版本2.39.0。
该管道的设计按照发现在这里。以下是管道代码的概述。还尝试使用固定的窗口
PCollection<MessageData> messageData = pipeline
.apply("Read from Kafka",
KafkaReadTransform.read(
options.getBootstrapServers(),
options.getInputTopics(),
kafkaProperties))
.apply("Extract from Kafka record",
ParDo.of(new KafkaRecordToMessageDataFn()));
/*
* Step 2: Validate Protobuf messages and convert to FailsafeElement
*/
PCollectionTuple failsafe = messageData
.apply("Parse and convert to Failsafe", ParDo.of(new MessageDataToFailsafeFn())
.withOutputTags(Tags.FAILSAFE_OUT, TupleTagList.of(Tags.FAILSAFE_DEADLETTER_OUT)));
/*
* Step 3: Write messages to BigQuery
*/
WriteResult writeResult = failsafe.get(Tags.FAILSAFE_OUT)
.apply("Write to BigQuery", new BigQueryWriteTransform(project, dataset, tablePrefix));
/*
* Step 4: Write errors to BigQuery deadletter table
*/
failsafe.get(Tags.FAILSAFE_DEADLETTER_OUT)
.apply("Write failsafe errors to BigQuery",
new BigQueryDeadletterWriteTransform(project, dataset, tablePrefix));
writeResult.getFailedInsertsWithErr()
.apply("Extract BigQuery insertion errors", ParDo.of(new InsertErrorsToFailsafeRecordFn()))
.apply("Write BigQuery insertion errors",
new BigQueryDeadletterWriteTransform(project, dataset, tablePrefix));
bigQueryWritEtransform
,其中管道正在停滞:
BigQueryIO.<FailsafeElement<MessageData, ValidatedMessageData>>write()
.to(new MessageDynamicDestinations(project, dataset, tablePrefix))
.withFormatFunction(TableRowMapper::toTableRow)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.neverRetry())
.withExtendedErrorInfo()
.optimizedWrites()
格式化函数:
public static TableRow toTableRow(FailsafeElement<MessageData, ValidatedMessageData> failsafeElement) {
try {
ValidatedMessageData messageData = Objects.requireNonNull(failsafeElement.getOutputPayload());
byte[] data = messageData.getData();
String messageType = messageData.getMessageType();
long timestamp = messageData.getKafkaTimestamp();
return TABLE_ROW_CONVERTER_MAP.get(messageType).toTableRow(data, timestamp);
} catch (Exception e) {
log.error("Error occurred when converting message to table row.", e);
return null;
}
}
kafkareadtransform
看起来像这样:
KafkaIO.<String, byte[]>read()
.withBootstrapServers(bootstrapServer)
.withTopics(inputTopics)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class)
.withConsumerConfigUpdates(kafkaProperties);
我还尝试了此固定窗口后,在此之后固定窗口。从KAFKA记录中提取消息数据,即使模板似乎没有使用窗口:
PCollection<MessageData> messageData = pipeline
.apply("Read from Kafka",
KafkaReadTransform.read(
options.getBootstrapServers(),
options.getInputTopics(),
kafkaProperties))
.apply("Extract from Kafka record",
ParDo.of(new KafkaRecordToMessageDataFn()))
.apply("Windowing", Window.<MessageData>into(FixedWindows.of(WINDOW_DURATION))
.withAllowedLateness(LATENESS_COMPENSATION_DURATION)
.discardingFiredPanes());
我没有想法,并且对数据流的了解不足以知道如何进一步诊断此问题。
I've got a Kafka-to-BigQuery Dataflow pipeline where I am consuming from multiple topics and using dynamic destinations to output to the appropriate BigQuery tables for each topic.
The pipeline runs smoothly when only one worker is involved.
However when it auto-scales or is manually configured to more than 1 worker, it completely stalls at the BigQuery write transform and data staleness grows and grows. The growth of data staleness seems to happen regardless of the amount of data streaming from Kafka. No table rows are inserted into BigQuery at this point.
No errors are reported to the job or worker logs.
Specifically, internal to the BigQuery write transform, it is stalling during a reshuffle before the write.
Within the Reshuffle that occurs during the BigQuery write transform, there is a GroupByKey
where the pipeline stalls, as shown below:
I can see that the Window.into
step above is working fine:
However, this problem is not isolated in the BigQuery to the write transform. If I add a Reshuffle step in the pipeline, for example after the "Extract from Kafka record"
step, the problem of a stalling pipeline appears in this new Reshuffle step at the same GroupByKey
.
I am using Beam SDK version 2.39.0.
The pipeline was designed following the example of the Kafka-to-BigQuery template found here. Below is an overview of the pipeline code. This has also been attempted with Fixed Windowing
PCollection<MessageData> messageData = pipeline
.apply("Read from Kafka",
KafkaReadTransform.read(
options.getBootstrapServers(),
options.getInputTopics(),
kafkaProperties))
.apply("Extract from Kafka record",
ParDo.of(new KafkaRecordToMessageDataFn()));
/*
* Step 2: Validate Protobuf messages and convert to FailsafeElement
*/
PCollectionTuple failsafe = messageData
.apply("Parse and convert to Failsafe", ParDo.of(new MessageDataToFailsafeFn())
.withOutputTags(Tags.FAILSAFE_OUT, TupleTagList.of(Tags.FAILSAFE_DEADLETTER_OUT)));
/*
* Step 3: Write messages to BigQuery
*/
WriteResult writeResult = failsafe.get(Tags.FAILSAFE_OUT)
.apply("Write to BigQuery", new BigQueryWriteTransform(project, dataset, tablePrefix));
/*
* Step 4: Write errors to BigQuery deadletter table
*/
failsafe.get(Tags.FAILSAFE_DEADLETTER_OUT)
.apply("Write failsafe errors to BigQuery",
new BigQueryDeadletterWriteTransform(project, dataset, tablePrefix));
writeResult.getFailedInsertsWithErr()
.apply("Extract BigQuery insertion errors", ParDo.of(new InsertErrorsToFailsafeRecordFn()))
.apply("Write BigQuery insertion errors",
new BigQueryDeadletterWriteTransform(project, dataset, tablePrefix));
BigQueryWriteTransform
, where the pipeline is stalling:
BigQueryIO.<FailsafeElement<MessageData, ValidatedMessageData>>write()
.to(new MessageDynamicDestinations(project, dataset, tablePrefix))
.withFormatFunction(TableRowMapper::toTableRow)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.neverRetry())
.withExtendedErrorInfo()
.optimizedWrites()
Formatting function:
public static TableRow toTableRow(FailsafeElement<MessageData, ValidatedMessageData> failsafeElement) {
try {
ValidatedMessageData messageData = Objects.requireNonNull(failsafeElement.getOutputPayload());
byte[] data = messageData.getData();
String messageType = messageData.getMessageType();
long timestamp = messageData.getKafkaTimestamp();
return TABLE_ROW_CONVERTER_MAP.get(messageType).toTableRow(data, timestamp);
} catch (Exception e) {
log.error("Error occurred when converting message to table row.", e);
return null;
}
}
KafkaReadTransform
looks like this:
KafkaIO.<String, byte[]>read()
.withBootstrapServers(bootstrapServer)
.withTopics(inputTopics)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class)
.withConsumerConfigUpdates(kafkaProperties);
I have also tried this with Fixed Windowing in place after the extraction of message data from the Kafka record, even though the template does not appear to use windowing:
PCollection<MessageData> messageData = pipeline
.apply("Read from Kafka",
KafkaReadTransform.read(
options.getBootstrapServers(),
options.getInputTopics(),
kafkaProperties))
.apply("Extract from Kafka record",
ParDo.of(new KafkaRecordToMessageDataFn()))
.apply("Windowing", Window.<MessageData>into(FixedWindows.of(WINDOW_DURATION))
.withAllowedLateness(LATENESS_COMPENSATION_DURATION)
.discardingFiredPanes());
I'm out of ideas and don't know DataFlow well enough to know how to diagnose this problem further.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论