Spark Streaming 将数据写入 Kafka 主题

发布于 2025-01-14 15:03:38 字数 739 浏览 4 评论 0原文

我正在尝试为每个 RDD 向 Kafka 主题写入一个数据帧。 我正在使用下面的代码:

 mesg.foreachRDD(rdd => { Dataframe.write.format("kafka")
    .option("kafka.bootstrap.servers","host")
    .option("subscribe","topic")
    .option("principal","Kerberos-principal")
    .option("keytab","kerberos-keytab")
    .save()
    })

enter code here

我收到空指针异常。具体来说,我需要向 Kafka Topic 写入一个数据框。任何人都可以帮忙解决这个问题吗?注意这里的Dataframe是在将rdd转换为dataframe并从发送到Kafka Topic的输入json中删除一些字段后获得的。

线程“main”中的异常 java.lang.NullPointerException 在 java.util.regex.Matcher.getTextLength(Matcher.java:1283) 在 java.util.regex.Matcher.reset(Matcher.java309) 在 java.util .regex.Matcher.(Matcher.java:229) at java.util.regex.Pattern.matcher(Pattern.java:1093)

I am trying to write a data frame to Kafka topic inside for each RDD.
I am using below code:

 mesg.foreachRDD(rdd => { Dataframe.write.format("kafka")
    .option("kafka.bootstrap.servers","host")
    .option("subscribe","topic")
    .option("principal","Kerberos-principal")
    .option("keytab","kerberos-keytab")
    .save()
    })

enter code here

I am getting null pointer exception. Specifically I need to write a data frame to Kafka Topic. Can anyone help on this. Note Dataframe here is obtained after converting rdd to dataframe and removing some fields from input json sent to Kafka Topic.

Exception in thread "main" java.lang.NullPointerException at java.util.regex.Matcher.getTextLength(Matcher.java:1283) at java.util.regex.Matcher.reset(Matcher.java309) at java.util.regex.Matcher.<init>(Matcher.java:229) at java.util.regex.Pattern.matcher(Pattern.java:1093)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

葬花如无物 2025-01-21 15:03:38

空指针异常是由于配置错误造成的,现已解决。要将 RDD 中的数据帧写入 kafka 主题,请遵循以下方法:

import sparkSession.implicts._

val df = Original Dataframe.select(col("one column name"),to_json(struct($"*"))).toDF("key","value")

df.write.format("kafka").option("bootstrap-server-properties",value from config).option("topic",value from config).save()

The null pointer exception was due to config error which has been resolved. For wrting dataframe to kafka topic from RDD please follow the below approach:

import sparkSession.implicts._

val df = Original Dataframe.select(col("one column name"),to_json(struct(
quot;*"))).toDF("key","value")

df.write.format("kafka").option("bootstrap-server-properties",value from config).option("topic",value from config).save()
囍笑 2025-01-21 15:03:38

注意:如果您想避免在 select 语句中对列名称进行硬编码,请遵循以下方法:

val df = Original Dataframe.select(to_json(struct($"*"))).as("value").selectExpr(" CAST(value as STRING)")

df.write.format("kafka").option("bootstrap-server-properties",来自配置的值).option("topic",来自配置的值配置).save()

Note: If you want to avoid hard coding of column name in select statement then follow this approach:

val df = Original Dataframe.select(to_json(struct($"*"))).as("value").selectExpr("CAST(value as STRING)")

df.write.format("kafka").option("bootstrap-server-properties",value from config).option("topic",value from config).save()

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文