在python-flask中调用kafka发送消息,消费者收不到
首先,在没有flask的时候,我写了kafka的程序,一个生产者,一个消费者。
生产者进程,它负责读取本地的一张图片,把图片的二进制数据以消息的形式发送给kafka,代码如下:
from kafka import KafkaProducer
# get the binary data of a picture
f=open('/home/seven/Pictures/fff.png','rb')
data=f.read()
f.close()
# create a producer
producer=KafkaProducer(bootstrap_servers=['localhost:9092'],key_serializer=str.encode)
# send th binary data to kafka
producer.send('img_msg',key="Hello,Assassin424214141",value=data)
消费者进程,它负责接收消息,并用消息内容还原出一张图片存储在磁盘上,代码如下:
from kafka import KafkaConsumer
# create a consumer
consumer = KafkaConsumer('img_msg',bootstrap_servers=['localhost:9092'])
# receive messages
for message in consumer:
# print the message
print ("hello %s:%d:%d: key=%s"%(message.topic, message.partition,message.offset, message.key))
# use the message's data to create a picture
img_data=message.value
outfile=open("test.png","wb")
outfile.write(img_data)
outfile.close()
代码比较简陋,是个测试用的原型,并且是可以跑的。
然而,我把生产者的代码放到flask(python-flask是一个web框架)中后。flask程序接收客户端的请求(客户端会上传一张图片),确实是接收到图片了,而且消息发送给kafka的时候也没有报错。但是怪异的是kafka的消费者那边却始终没有动静...
然后我做了一个极端的测试,服务端(即flask)把接收到的客户端上传的图片先写到磁盘上,再从磁盘上读取这张图片(就和一开始的测试原型是一样的逻辑,而且验证了图片是成功写到磁盘上的,即图片接收这一环节是没有问题的),然而这种情况下kafka的消费者却还是接收不到消息,如之奈何?
我接着做测试,发现一个普遍现象,无论是普通的字符串消息还是二进制的消息,只要不在flask中发送给kafka,则kafka的消费者都能收到。但是无论是什么类型的消息,只要把kafka生产者的代码放到flask中去,就会导致消费者那边啥也收不到。
有遇到同样情况的,请不吝赐教~
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
彻底解决flask中向kafka写入消息,结果消费者收不到消息的问题
假设客户端一次性发送了 : 一个文件(键名为targetfile),两个数字(键名为x和y)
flask服务端的解决方案是这样的 :
获取图片的二进制内容 :
flask.request.files['targetfile'].read()
获取两个数字参数 :
post_data=dict(flask.request.form)
x=post_data['x'][0]
y=post_data['y'][0]
// 此处不能用 flask.request.form['x']
// 也不能够用 flask.request.form.get('x')
// 也不能够用 flask.request.values.get('x')
// 否则,kafka的消费者那端就会收不到消息
// 很诡异,但是目前的这个方案能够解决问题
好吧,我发现了更加灵异的问题,我把kafka的生产者放到GET请求中,消费者那边是可以收到的。
然而在POST请求中调用kafka的producer,消费者那边就收不到。
感觉自己被flask和kafka玩弄了一样...
又搞了一上午,问题解决了,详细是这样的:
1. 要在kafka的生产者发送消息后,sleep一会(一般10毫秒就够了),但是这样还不行,准确来说。当flask处理POST请求,同时接收来自客户端的图片数据和非图片数据,kafka消费者就还是收不到消息。必须只能接收图片数据,此时才行得通。我也不知道为什么,感觉好神奇。
2. 好了,总结一下。
目前的解决方案是,
第一件事是要保证flask中不要同时接收图片和非图片数据,
第二件事是在kafka-producer发送消息后,sleep十几毫秒。两件事都要做,才能让kafka-consumer接收到消息。虽然sleep不是一个好办法,可以说又是迂行恶首,但目前也只能如此了。