Spark Streaming 将数据写入 Kafka 主题
我正在尝试为每个 RDD 向 Kafka 主题写入一个数据帧。 我正在使用下面的代码:
mesg.foreachRDD(rdd => { Dataframe.write.format("kafka")
.option("kafka.bootstrap.servers","host")
.option("subscribe","topic")
.option("principal","Kerberos-principal")
.option("keytab","kerberos-keytab")
.save()
})
enter code here
我收到空指针异常。具体来说,我需要向 Kafka Topic 写入一个数据框。任何人都可以帮忙解决这个问题吗?注意这里的Dataframe是在将rdd转换为dataframe并从发送到Kafka Topic的输入json中删除一些字段后获得的。
线程“main”中的异常 java.lang.NullPointerException 在 java.util.regex.Matcher.getTextLength(Matcher.java:1283) 在 java.util.regex.Matcher.reset(Matcher.java309) 在 java.util .regex.Matcher.
I am trying to write a data frame to Kafka topic inside for each RDD.
I am using below code:
mesg.foreachRDD(rdd => { Dataframe.write.format("kafka")
.option("kafka.bootstrap.servers","host")
.option("subscribe","topic")
.option("principal","Kerberos-principal")
.option("keytab","kerberos-keytab")
.save()
})
enter code here
I am getting null pointer exception. Specifically I need to write a data frame to Kafka Topic. Can anyone help on this. Note Dataframe here is obtained after converting rdd to dataframe and removing some fields from input json sent to Kafka Topic.
Exception in thread "main" java.lang.NullPointerException at java.util.regex.Matcher.getTextLength(Matcher.java:1283) at java.util.regex.Matcher.reset(Matcher.java309) at java.util.regex.Matcher.<init>(Matcher.java:229) at java.util.regex.Pattern.matcher(Pattern.java:1093)
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
空指针异常是由于配置错误造成的,现已解决。要将 RDD 中的数据帧写入 kafka 主题,请遵循以下方法:
The null pointer exception was due to config error which has been resolved. For wrting dataframe to kafka topic from RDD please follow the below approach:
注意:如果您想避免在 select 语句中对列名称进行硬编码,请遵循以下方法:
val df = Original Dataframe.select(to_json(struct($"*"))).as("value").selectExpr(" CAST(value as STRING)")
df.write.format("kafka").option("bootstrap-server-properties",来自配置的值).option("topic",来自配置的值配置).save()
Note: If you want to avoid hard coding of column name in select statement then follow this approach:
val df = Original Dataframe.select(to_json(struct($"*"))).as("value").selectExpr("CAST(value as STRING)")
df.write.format("kafka").option("bootstrap-server-properties",value from config).option("topic",value from config).save()