将Spark DataFrame写入Kafka作为逗号单独的JSON对象
我无法将dataframe作为逗号分隔的JSON对象发送较大的数据集。
较小数据集的工作代码
df.selectExpr("CAST(collect_list(to_json(struct(*))) AS STRING) AS value") \
.write.format("kafka")\
.option("compression", "gzip")\
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "JsonFormat") \
.option("kafka.request.timeout.ms", 120000) \
.option("kafka.linger.ms", 10) \
.option("compression", "gzip")\
.option("kafka.retries", 3) \
.save()
spark.stop()
输出
[{
"firstname": "James",
"middlename": "",
"lastname": "Smith",
"id": "36636",
"gender": "M",
"salary": 3000
}, {
"firstname": "Michael",
"middlename": "Rose",
"lastname": "",
"id": "40288",
"gender": "M",
"salary": 4000
}, {
"firstname": "Robert",
"middlename": "",
"lastname": "Williams",
"id": "42114",
"gender": "M",
"salary": 4000
}, {
"firstname": "Maria",
"middlename": "Anne",
"lastname": "Jones",
"id": "39192",
"gender": "F",
"salary": 4000
}, {
"firstname": "Satish",
"middlename": "Anjaneyapp",
"lastname": "Brown",
"id": "",
"gender": "F",
"salary": -1
}]
较大数据集的实际问题
- collect_list(to_json(struct(*))作为字符串) - 试图收集大量数据并通过KAFKA发送。我们要低于错误
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 51312082 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
限制:
我可以通过Kafka每条消息发送1 MB。
有没有办法,我们可以将消息最多分为1 MB大小,然后发送逗号分开的JSON对象。
尝试以下配置,但没有运气
kafka.linger.ms
batch.size
I am not able to send dataframe as comma separated json object for larger data set .
Working code for smaller data set
df.selectExpr("CAST(collect_list(to_json(struct(*))) AS STRING) AS value") \
.write.format("kafka")\
.option("compression", "gzip")\
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "JsonFormat") \
.option("kafka.request.timeout.ms", 120000) \
.option("kafka.linger.ms", 10) \
.option("compression", "gzip")\
.option("kafka.retries", 3) \
.save()
spark.stop()
output
[{
"firstname": "James",
"middlename": "",
"lastname": "Smith",
"id": "36636",
"gender": "M",
"salary": 3000
}, {
"firstname": "Michael",
"middlename": "Rose",
"lastname": "",
"id": "40288",
"gender": "M",
"salary": 4000
}, {
"firstname": "Robert",
"middlename": "",
"lastname": "Williams",
"id": "42114",
"gender": "M",
"salary": 4000
}, {
"firstname": "Maria",
"middlename": "Anne",
"lastname": "Jones",
"id": "39192",
"gender": "F",
"salary": 4000
}, {
"firstname": "Satish",
"middlename": "Anjaneyapp",
"lastname": "Brown",
"id": "",
"gender": "F",
"salary": -1
}]
Actual Problem
for larger data set - collect_list(to_json(struct(*))) AS STRING) - trying to collect huge data and sending through kafka . We are getting below error
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 51312082 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
Limitation :
I can send only one 1 mb per message through Kafka .
Is there a way , we can break the message upto 1 mb size and send the comma seperated json object .
Tried below configurations , but no luck
kafka.linger.ms
batch.size
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
不要逗号分开json对象。那么记录将不会有效。您也不应该闯入“ 1MB块”,因为这样您就会将不完整的字符串发送到不同的分区,而且您没有简单的方法来命令将它们放在任何消费者中。
删除
collect_list
调用,并确保您的数据框架具有有效单个JSON对象的值字符串列,作为多行。然后,Kafka作者将每行作为新消息写Don't comma separate your JSON objects. Then the records won't be valid JSON. You also shouldn't break into "1MB chunks", because then you'll have incomplete strings being sent to different partitions, and you have no easy way to detemine ordering to put them together in any consumer.
Remove the
collect_list
call and instead ensure your dataframe has a values string column of valid individual JSON objects as multiple rows. Then the Kafka writer will write each row as a new message