将Spark DataFrame写入Kafka作为逗号单独的JSON对象

发布于 2025-01-31 02:43:24 字数 1800 浏览 5 评论 0原文

我无法将dataframe作为逗号分隔的JSON对象发送较大的数据集

较小数据集的工作代码

    df.selectExpr("CAST(collect_list(to_json(struct(*))) AS STRING) AS value") \
        .write.format("kafka")\
        .option("compression", "gzip")\
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("topic", "JsonFormat") \
        .option("kafka.request.timeout.ms", 120000) \
        .option("kafka.linger.ms", 10) \
        .option("compression", "gzip")\
        .option("kafka.retries", 3) \
        .save()
    spark.stop()

输出

[{
    "firstname": "James",
    "middlename": "",
    "lastname": "Smith",
    "id": "36636",
    "gender": "M",
    "salary": 3000
}, {
    "firstname": "Michael",
    "middlename": "Rose",
    "lastname": "",
    "id": "40288",
    "gender": "M",
    "salary": 4000
}, {
    "firstname": "Robert",
    "middlename": "",
    "lastname": "Williams",
    "id": "42114",
    "gender": "M",
    "salary": 4000
}, {
    "firstname": "Maria",
    "middlename": "Anne",
    "lastname": "Jones",
    "id": "39192",
    "gender": "F",
    "salary": 4000
}, {
    "firstname": "Satish",
    "middlename": "Anjaneyapp",
    "lastname": "Brown",
    "id": "",
    "gender": "F",
    "salary": -1
}]

较大数据集的实际问题

- collect_list(to_json(struct(*))作为字符串) - 试图收集大量数据并通过KAFKA发送。我们要低于错误

Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 51312082 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

限制:

我可以通过Kafka每条消息发送1 MB。

有没有办法,我们可以将消息最多分为1 MB大小,然后发送逗号分开的JSON对象。

尝试以下配置,但没有运气

kafka.linger.ms

batch.size

I am not able to send dataframe as comma separated json object for larger data set .

Working code for smaller data set

    df.selectExpr("CAST(collect_list(to_json(struct(*))) AS STRING) AS value") \
        .write.format("kafka")\
        .option("compression", "gzip")\
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("topic", "JsonFormat") \
        .option("kafka.request.timeout.ms", 120000) \
        .option("kafka.linger.ms", 10) \
        .option("compression", "gzip")\
        .option("kafka.retries", 3) \
        .save()
    spark.stop()

output

[{
    "firstname": "James",
    "middlename": "",
    "lastname": "Smith",
    "id": "36636",
    "gender": "M",
    "salary": 3000
}, {
    "firstname": "Michael",
    "middlename": "Rose",
    "lastname": "",
    "id": "40288",
    "gender": "M",
    "salary": 4000
}, {
    "firstname": "Robert",
    "middlename": "",
    "lastname": "Williams",
    "id": "42114",
    "gender": "M",
    "salary": 4000
}, {
    "firstname": "Maria",
    "middlename": "Anne",
    "lastname": "Jones",
    "id": "39192",
    "gender": "F",
    "salary": 4000
}, {
    "firstname": "Satish",
    "middlename": "Anjaneyapp",
    "lastname": "Brown",
    "id": "",
    "gender": "F",
    "salary": -1
}]

Actual Problem

for larger data set - collect_list(to_json(struct(*))) AS STRING) - trying to collect huge data and sending through kafka . We are getting below error

Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 51312082 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

Limitation :

I can send only one 1 mb per message through Kafka .

Is there a way , we can break the message upto 1 mb size and send the comma seperated json object .

Tried below configurations , but no luck

kafka.linger.ms

batch.size

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

眼眸里的那抹悲凉 2025-02-07 02:43:24

不要逗号分开json对象。那么记录将不会有效。您也不应该闯入“ 1MB块”,因为这样您就会将不完整的字符串发送到不同的分区,而且您没有简单的方法来命令将它们放在任何消费者中。

删除collect_list调用,并确保您的数据框架具有有效单个JSON对象的值字符串列,作为多行。然后,Kafka作者将每行作为新消息写

Don't comma separate your JSON objects. Then the records won't be valid JSON. You also shouldn't break into "1MB chunks", because then you'll have incomplete strings being sent to different partitions, and you have no easy way to detemine ordering to put them together in any consumer.

Remove the collect_list call and instead ensure your dataframe has a values string column of valid individual JSON objects as multiple rows. Then the Kafka writer will write each row as a new message

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文