Kafka的 Producer.flush() 是否验证所有数据都发送到主题?
我每天通过 Pythonic KafkaProducer 向 Kafka 主题发送大约 1200 万行数据。
数据需要为JSON格式,每行为1个条目, 每 100 万行执行一次 Producer.flush() 。
代码如下:
import json
count = 0
for key in json_df:
x = json.dumps(key, default=str).encode('utf-8')
producer.send('SOME_TOPIC_NAME', value=x)
count+=1
if (count % 1000000 == 0):
producer.flush()
producer.flush()
但是今天我遇到了一个问题,主题中缺少 250~ 行,我的问题是:执行 Producer.flush() 是否更频繁地保证没有数据失踪了?
我在 KafkaProducer 对象中也有 linger_ms = 500
,降低 linger 的频率是否可以确保所有数据都发送到主题?
I'm sending about 12 million rows of data to Kafka topic, via Pythonic KafkaProducer on a daily basis.
The data needs to be JSON formatted, and each row as a 1 entry,
and does a producer.flush()
every 1 million rows.
The code is provided below:
import json
count = 0
for key in json_df:
x = json.dumps(key, default=str).encode('utf-8')
producer.send('SOME_TOPIC_NAME', value=x)
count+=1
if (count % 1000000 == 0):
producer.flush()
producer.flush()
But today I have faced a problem that 250~ rows were missing from the topic, and my question is: does the execution of producer.flush()
more often guarantees that no data is missing?
I also have linger_ms = 500
in KafkaProducer object, does decreasing the frequency of linger makes sure that all the data is sent to topic?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
我不认为冲洗是问题所在。您的代码中没有任何内容可以检查程序本身是否因少于 1M 条记录而终止。当您重新启动生产者时,计数重新开始为零,并且您将丢失所有中间批处理记录。
回答这个问题 - 是的,flush 是一个阻塞调用,它清空批次中可用的数据...请记住,Kafka 确实有一个
batch.size
上限(以字节为单位)。如果您已经有 JSON-lines 格式的文件,您可以这样做
I dont think the flush is the problem. There is nothing in your code that checks if the program itself dies with less than 1M records. When you restart the producers, the count restarts a zero, and you lose all intermediate batched records.
To answer the question - yes, flush is a blocking call that empties the data that is available in the batch... Keep in mind that Kafka does have a
batch.size
upper limit (in bytes).If you already had a JSON-lines formatted file, you could just do