使用Python,如何产生KAFKA消息超过1MB?

发布于 2025-02-13 04:24:46 字数 279 浏览 2 评论 0原文

我试图通过Kafka传输一些巨大的JSON数据,但没有发生。 Python生产商也没有任何错误,也没有消费者端的任何消息。

producer = KafkaProducer(bootstrap_servers="kafka_server")
product_message = json.dumps(data)
producer.send(kafka_topic, key=bytes('kk', 'utf-8'), value=bytes(product_message, 'utf-8'))

I was trying to transfer some huge json data through kafka, but its not happening.
Neither no error from python producer nor any messages on the consumer side.

producer = KafkaProducer(bootstrap_servers="kafka_server")
product_message = json.dumps(data)
producer.send(kafka_topic, key=bytes('kk', 'utf-8'), value=bytes(product_message, 'utf-8'))

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

度的依靠╰つ 2025-02-20 04:24:46

您可能需要增加主题配置 > 能够产生该消息。

另外,您可能可以通过获得发送返回的未来来了解发生的事情: https://stackover.com/a> https:/ /55538034/19059974

You may need to increase the topic configuration max.message.bytes to be able to produce the message.

Also, you can probably get a hint of what is going on by getting the future that send returns: https://stackoverflow.com/a/55538034/19059974

維他命╮ 2025-02-20 04:24:46

我有相同的错误,并且通过阅读这个线程我已经解决了这样的问题:

假设您有文件server.propertiescomputer.properties在路径/usr/usr/local/kafka/kafka/config/ (可以在您的环境变量中定义此路径为$ kafka_home,但不必这样做)

并且您想将最大消息长度增加到15728640(15 MB) ),

服务器端(= broker)

sudo vim /usr/local/kafka/config/server.properties

添加行或更改变量值

# my additions
# increase max message size to 15 MB
message.max.bytes=15728640 
replica.fetch.max.bytes=15728640

消费者端

sudo vim /usr/local/kafka/config/consumer.properties

添加行或更改变量值

# my additions
# increase max message size to 15 MB
message.max.bytes=15728640
max.partition.fetch.bytes=15728640
fetch.max.bytes=15728640

重新启动服务

使更改有效(手动运行这些命令)

sudo systemctl daemon-reload  # wait a little

sudo systemctl restart zookeeper  # wait a little

sudo systemctl restart kafka  # wait a little

确保它们通过 如果没有运行

sudo systemctl status zookeeper
sudo systemctl status kafka

,请再次重新启动

生产者方面,在Python模块Kafka-Python的背景下

,您必须添加K-Argument max_request_size

producer.send(kafka_topic, key=bytes('kk', 'utf-8'), value=bytes(product_message, 'utf-8'))

用行替换行

producer.send(kafka_topic, key=bytes('kk', 'utf-8'), value=bytes(product_message, 'utf-8'), max_request_size=15728640)

I had the same error and by reading this thread I have fixed the issue like this:

Supposing that you have files server.properties and consumer.properties at path /usr/local/kafka/config/ (this path could be defined in your environment variables as $KAFKA_HOME, but it does not have to)

and that you want to increase the max message length to 15728640 (15 MB),

server side (=broker)

sudo vim /usr/local/kafka/config/server.properties

add line or change variable value

# my additions
# increase max message size to 15 MB
message.max.bytes=15728640 
replica.fetch.max.bytes=15728640

consumer side

sudo vim /usr/local/kafka/config/consumer.properties

add line or change variable value

# my additions
# increase max message size to 15 MB
message.max.bytes=15728640
max.partition.fetch.bytes=15728640
fetch.max.bytes=15728640

restart services

make changes effective (run these commands manually one by one)

sudo systemctl daemon-reload  # wait a little

sudo systemctl restart zookeeper  # wait a little

sudo systemctl restart kafka  # wait a little

Make sure they restarted correctly by running

sudo systemctl status zookeeper
sudo systemctl status kafka

If not, restart them again

Producer side, in the context of your python module kafka-python

Then you have to add the k-argument max_request_size (read the docs) to your KafkaProducer instance, so substitute line

producer.send(kafka_topic, key=bytes('kk', 'utf-8'), value=bytes(product_message, 'utf-8'))

with line

producer.send(kafka_topic, key=bytes('kk', 'utf-8'), value=bytes(product_message, 'utf-8'), max_request_size=15728640)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文