Kafka的 Producer.flush() 是否验证所有数据都发送到主题?

发布于 2025-01-11 09:45:07 字数 582 浏览 0 评论 0原文

我每天通过 Pythonic KafkaProducer 向 Kafka 主题发送大约 1200 万行数据。

数据需要为JSON格式,每行为1个条目, 每 100 万行执行一次 Producer.flush() 。

代码如下:

import json
count = 0
for key in json_df:
    x = json.dumps(key, default=str).encode('utf-8')
    producer.send('SOME_TOPIC_NAME', value=x)
    count+=1
    if (count % 1000000 == 0):
        producer.flush()
producer.flush()  

但是今天我遇到了一个问题,主题中缺少 250~ 行,我的问题是:执行 Producer.flush() 是否更频繁地保证没有数据失踪了?

我在 KafkaProducer 对象中也有 linger_ms = 500 ,降低 linger 的频率是否可以确保所有数据都发送到主题?

I'm sending about 12 million rows of data to Kafka topic, via Pythonic KafkaProducer on a daily basis.

The data needs to be JSON formatted, and each row as a 1 entry,
and does a producer.flush() every 1 million rows.

The code is provided below:

import json
count = 0
for key in json_df:
    x = json.dumps(key, default=str).encode('utf-8')
    producer.send('SOME_TOPIC_NAME', value=x)
    count+=1
    if (count % 1000000 == 0):
        producer.flush()
producer.flush()  

But today I have faced a problem that 250~ rows were missing from the topic, and my question is: does the execution of producer.flush() more often guarantees that no data is missing?

I also have linger_ms = 500 in KafkaProducer object, does decreasing the frequency of linger makes sure that all the data is sent to topic?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

你与清晨阳光 2025-01-18 09:45:07

我不认为冲洗是问题所在。您的代码中没有任何内容可以检查程序本身是否因少于 1M 条记录而终止。当您重新启动生产者时,计数重新开始为零,并且您将丢失所有中间批处理记录。

回答这个问题 - 是的,flush 是一个阻塞调用,它清空批次中可用的数据...请记住,Kafka 确实有一个 batch.size 上限(以字节为单位)。

如果您已经有 JSON-lines 格式的文件,您可以这样做

kafka-console-producer ... < file.jsonl

I dont think the flush is the problem. There is nothing in your code that checks if the program itself dies with less than 1M records. When you restart the producers, the count restarts a zero, and you lose all intermediate batched records.

To answer the question - yes, flush is a blocking call that empties the data that is available in the batch... Keep in mind that Kafka does have a batch.size upper limit (in bytes).

If you already had a JSON-lines formatted file, you could just do

kafka-console-producer ... < file.jsonl
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文