StructuredStreaming - 根据 Kafka 主题中的新事件处理数据

发布于 2025-01-13 22:39:46 字数 2643 浏览 3 评论 0原文

我有一个结构化的Streaming程序,它从Kafka主题A读取数据,并进行一些处理,最后将数据放入目标Kafka主题。

笔记 : 处理是在函数 -convertToDictForEachBatch() 中完成的,该函数调用使用 -foreachBatch(convertToDictForEachBatch)

作为处理的一部分,它读取另一个 Kafka 主题 (events_topic),如果上次读取后有新记录, 它会进行一些额外的处理 - 从 BigQuery 表重新加载数据,并将其保留。

df_stream = spark.readStream.format('kafka') \
        .option("kafka.security.protocol", "SSL") \
        .option("kafka.ssl.truststore.location", ssl_truststore_location) \
        .option("kafka.ssl.truststore.password", ssl_truststore_password) \
        .option("kafka.ssl.keystore.location", ssl_keystore_location) \
        .option("kafka.ssl.keystore.password", ssl_keystore_password) \
        .option("kafka.bootstrap.servers",kafkaBrokers)\
        .option("subscribe", topic) \
        .option("kafka.group.id", consumerGroupId)\
        .option("startingOffsets", "latest") \
        .option("failOnDataLoss", "false") \
        .option("maxOffsetsPerTrigger", 10000) \
        .load()


    print(" df_stream -> ", df_stream)
    query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp").writeStream \
        .outputMode("append") \
        .trigger(processingTime='4 minutes') \
        .option("numRows",10000)\
        .option("truncate", "false") \
        .option("checkpointLocation", checkpoint) \
        .foreachBatch(convertToDictForEachBatch) \
        .start()

    query.awaitTermination()

# called from - foreachbatch
def convertToDictForEachBatch(df, batchId):
      
    # Uses the dataframe to do processing of data, the code is not added, since it is not relevant to this question 

   # Additional processing i.e. reloading of prediction data from Big query, into Data Frame - based on event in Kafka topic   
   # checks for event in topic - topic_reloadpred and further processing takes place if there is new data in the topic
    events = spark.read.format('kafka') \
        .option("kafka.bootstrap.servers", kafkaBrokers) \
        .option("kafka.security.protocol", "SSL") \
        .option("kafka.ssl.truststore.location", ssl_truststore_location) \
        .option("kafka.ssl.truststore.password", ssl_truststore_password) \
        .option("kafka.ssl.keystore.location", ssl_keystore_location_reloadpred) \
        .option("kafka.ssl.keystore.password", ssl_keystore_password_reloadpred) \
        .option("subscribe", topic_reloadpred) \
        .option("kafka.group.id", consumerGroupId_reloadpred) \
        .load()

    # events is passed to a function, and processing is done if new events are generated

实现这一目标的最佳方法是什么? 当前代码正在读取kafka主题中的整个数据,我需要它只读取新数据。

I have a structured Streaming program, which read data from Kafka topic A, and does some processing, and finally puts data into target Kafka Topic.

Note :
the processing is done in function - convertToDictForEachBatch(), which called using - foreachBatch(convertToDictForEachBatch)

As part of the processing, it reads another Kafka Topic (events_topic), and if there is a New record(s) after the last read,
it does some additional processing - reloads data from BigQuery table, and persists it.

df_stream = spark.readStream.format('kafka') \
        .option("kafka.security.protocol", "SSL") \
        .option("kafka.ssl.truststore.location", ssl_truststore_location) \
        .option("kafka.ssl.truststore.password", ssl_truststore_password) \
        .option("kafka.ssl.keystore.location", ssl_keystore_location) \
        .option("kafka.ssl.keystore.password", ssl_keystore_password) \
        .option("kafka.bootstrap.servers",kafkaBrokers)\
        .option("subscribe", topic) \
        .option("kafka.group.id", consumerGroupId)\
        .option("startingOffsets", "latest") \
        .option("failOnDataLoss", "false") \
        .option("maxOffsetsPerTrigger", 10000) \
        .load()


    print(" df_stream -> ", df_stream)
    query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp").writeStream \
        .outputMode("append") \
        .trigger(processingTime='4 minutes') \
        .option("numRows",10000)\
        .option("truncate", "false") \
        .option("checkpointLocation", checkpoint) \
        .foreachBatch(convertToDictForEachBatch) \
        .start()

    query.awaitTermination()

# called from - foreachbatch
def convertToDictForEachBatch(df, batchId):
      
    # Uses the dataframe to do processing of data, the code is not added, since it is not relevant to this question 

   # Additional processing i.e. reloading of prediction data from Big query, into Data Frame - based on event in Kafka topic   
   # checks for event in topic - topic_reloadpred and further processing takes place if there is new data in the topic
    events = spark.read.format('kafka') \
        .option("kafka.bootstrap.servers", kafkaBrokers) \
        .option("kafka.security.protocol", "SSL") \
        .option("kafka.ssl.truststore.location", ssl_truststore_location) \
        .option("kafka.ssl.truststore.password", ssl_truststore_password) \
        .option("kafka.ssl.keystore.location", ssl_keystore_location_reloadpred) \
        .option("kafka.ssl.keystore.password", ssl_keystore_password_reloadpred) \
        .option("subscribe", topic_reloadpred) \
        .option("kafka.group.id", consumerGroupId_reloadpred) \
        .load()

    # events is passed to a function, and processing is done if new events are generated

What is the best way to achieve this ?
The current code is reading the entire data in the kafka topic, i need it to read only the new data.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

美男兮 2025-01-20 22:39:46

正如 @Rishabh Sharma 所建议的,我将偏移量存储在一个单独的 kafka 主题中,该主题具有单个分区(我也可以将其存储在同一主题的单独分区中)。
在处理过程中,我正在检查最后更新的偏移量与当前添加的偏移量。如果当前偏移量大于上次更新的偏移量,我会进行进一步处理(即从 BigQuery 重新加载表)

As suggested by @Rishabh Sharma, i'm storing the offset in a separate kafka topic which has a single partition(i could store it in separate partition in the same topic as well).
During processing, i'm checking the last updated offset against the current offset added. If the current offset is more than the last updated offset, i do further processing (i.e. reload the table from BigQuery)

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文