如何使用 Avro 二进制编码器对 Kafka 消息进行编码/解码?
我正在尝试使用 Avro 来读取/写入 Kafka 的消息。有谁有使用 Avro 二进制编码器对将放入消息队列的数据进行编码/解码的示例吗?
与 Kafka 部分相比,我更需要 Avro 部分。或者,也许我应该考虑不同的解决方案?基本上,我试图找到一个在空间方面更有效的 JSON 解决方案。刚刚提到 Avro 是因为它比 JSON 更紧凑。
I'm trying to use Avro for messages being read from/written to Kafka. Does anyone have an example of using the Avro binary encoder to encode/decode data that will be put on a message queue?
I need the Avro part more than the Kafka part. Or, perhaps I should look at a different solution? Basically, I'm trying to find a more efficient solution to JSON with regards to space. Avro was just mentioned since it can be more compact than JSON.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(5)
这是一个基本示例。我还没有尝试过多个分区/主题。
//示例生产者代码
//示例消费者代码
第 1 部分:消费者组代码:因为您可以为多个分区/主题拥有多个消费者。
第 2 部分:实际消费消息的个人消费者。
测试 AVRO 模式:
需要注意的重要事项是:
您需要标准的 kafka 和 avro jar 才能开箱即用地运行此代码。
非常重要 props.put("serializer.class", "kafka.serializer.DefaultEncoder");
不要使用 stringEncoder,因为如果您将字节数组作为消息发送,它将无法工作。
您可以将 byte[] 转换为十六进制字符串并将其发送,然后在消费者上将十六进制字符串重新转换为 byte[],然后转换为原始消息。
按照此处提到的方式运行动物园管理员和代理:- http://kafka.apache.org/documentation.html#快速入门并创建一个名为“page_views”的主题或任何您想要的内容。
运行 ProducerTest.java,然后运行 ConsumerGroupExample.java,查看正在生成和使用的 avro 数据。
This is a basic example. I have not tried it with multiple partitions/topics.
//Sample producer code
//Sample consumer code
Part 1 : Consumer group code : as you can have more than multiple consumers for multiple partitions/ topics.
Part 2 : Indiviual consumer that actually consumes the messages.
Test AVRO schema :
Important things to note are :
Youll need the standard kafka and avro jars to run this code out of the box.
Is very important props.put("serializer.class", "kafka.serializer.DefaultEncoder");
Don
t use stringEncoder as that won
t work if you are sending a byte array as message.You can convert the byte[] to a hex string and send that and on the consumer reconvert hex string to byte[] and then to the original message.
Run the zookeeper and the broker as mentioned here :- http://kafka.apache.org/documentation.html#quickstart and create a topic called "page_views" or whatever you want.
Run the ProducerTest.java and then the ConsumerGroupExample.java and see the avro data being produced and consumed.
我终于记得询问 Kafka 邮件列表并得到以下答案,效果非常好。
I finally remembered to ask the Kafka mailing list and got the following as an answer, which worked perfectly.
如果你想从 Avro 消息中获取字节数组(kafka 部分已经回答),请使用二进制编码器:
If you want to get a byte array from an Avro message (the kafka part is already answered), use the binary encoder:
除了 Avro,您还可以简单地考虑压缩数据;要么使用 gzip(压缩效果好,CPU 更高),要么使用 LZF 或 Snappy(压缩速度更快,但压缩速度稍慢)。
或者还有 Smile 二进制 JSON,由 Jackson 在 Java 中支持(带有 这个扩展):它是紧凑的二进制格式,比 Avro 更容易使用:
基本上与 JSON 相同的代码,除了用于传递不同的格式工厂。
从数据大小的角度来看,Smile 还是 Avro 更紧凑取决于用例的细节;但两者都比 JSON 更紧凑。
这样做的好处是,它可以快速使用 JSON 和 Smile,使用相同的代码,仅使用 POJO。与 Avro 相比,Avro 要么需要代码生成,要么需要大量手动代码来打包和解包 GenericRecord。
Instead of Avro, you could also simply consider compressing data; either with gzip (good compression, higher cpu) or LZF or Snappy (much faster, bit slower compression).
Or alternatively there is also Smile binary JSON, supported in Java by Jackson (with this extension): it is compact binary format, and much easier to use than Avro:
basically same code as with JSON, except for passing different format factory.
From data size perspective, whether Smile or Avro is more compact depends on details of use case; but both are more compact than JSON.
Benefit there is that this works fast with both JSON and Smile, with same code, using just POJOs. Compared to Avro which either requires code generation, or lots of manual code to pack and unpack
GenericRecord
s.更新了答案。
Kafka 有一个带有 Maven(SBT 格式)坐标的 Avro 序列化器/反序列化器:
您将 KafkaAvroSerializer 的实例传递到 KafkaProducer 构造函数中。
然后,您可以创建 Avro GenericRecord 实例,并将它们用作 Kafka ProducerRecord 实例中的值,您可以使用 KafkaProducer 发送这些值。
在 Kafka 消费者端,您使用 KafkaAvroDeserializer 和 KafkaConsumer。
Updated Answer.
Kafka has an Avro serializer/deserializer with Maven (SBT formatted) coordinates:
You pass an instance of KafkaAvroSerializer into the KafkaProducer constructor.
Then you can create Avro GenericRecord instances, and use those as values inside Kafka ProducerRecord instances which you can send with KafkaProducer.
On the Kafka consumer side, you use KafkaAvroDeserializer and KafkaConsumer.