如何将Pyspark DataFrame发送到Kafka主题?
Pyspark版本-2.4.7 KAFKA版本-2.13_3.2.0
HI,我是Pyspark和流媒体属性的新手。我在Internet中遇到了一些资源,但我仍然无法弄清楚如何将Pyspark数据框架发送到Kafka经纪人。我需要编写生产者代码。 我正在阅读CSV文件中的数据,并试图将其发送到Kafka主题。请帮助我使用代码和配置。
import findspark
findspark.init("/usr/local/spark")
from pyspark.sql import SparkSession
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.functions import *
import os
from kafka import KafkaProducer
import csv
def spark_session():
'''
Description:
To open a spark session. Returns a spark session object.
'''
spark = SparkSession \
.builder \
.appName("Test_Kafka_Producer") \
.master("local[*]") \
.getOrCreate()
return spark
if __name__ == '__main__':
spark = spark_session()
topic = "Kafkatest"
spark_version = '2.4.7'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.13:{}'.format(spark_version)
#producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
#value_serializer= lambda x: x.encode('utf-8'))
df1 = spark.read.csv("annual-enterprise-survey-2020-financial-year-provisional-size-bands-csv.csv", inferSchema = True, header = True)
df1.show(10)
print("sending df===========")
df1.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", topic) \
.save()
print("End------")
我在这一点代码中遇到的错误是 py4j.protocol.py4jjavaerror:调用o41.save时发生错误。 :org.apache.spark.sql.analysisexception:未能找到数据来源:kafka。请根据“结构化流 + Kafka集成指南”的部署部分部署应用程序。
pyspark version - 2.4.7
kafka version - 2.13_3.2.0
Hi, I am new to pyspark and streaming properties. I have come across few resources in the internet, but still I am not able to figure out how to send a pyspark data frame to a kafka broker. I need to write a producer code.
I am reading the data from a csv file and trying to send it to kafka topic. Please help me out with the code and the configurations.
import findspark
findspark.init("/usr/local/spark")
from pyspark.sql import SparkSession
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.functions import *
import os
from kafka import KafkaProducer
import csv
def spark_session():
'''
Description:
To open a spark session. Returns a spark session object.
'''
spark = SparkSession \
.builder \
.appName("Test_Kafka_Producer") \
.master("local[*]") \
.getOrCreate()
return spark
if __name__ == '__main__':
spark = spark_session()
topic = "Kafkatest"
spark_version = '2.4.7'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.13:{}'.format(spark_version)
#producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
#value_serializer= lambda x: x.encode('utf-8'))
df1 = spark.read.csv("annual-enterprise-survey-2020-financial-year-provisional-size-bands-csv.csv", inferSchema = True, header = True)
df1.show(10)
print("sending df===========")
df1.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", topic) \
.save()
print("End------")
The error that I am encountering for this bit of code ispy4j.protocol.Py4JJavaError: An error occurred while calling o41.save. : org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
您不需要Spark即可读取CSV文件并在Python中运行KAFKA生产商(我看到您已经尝试导入Kafkaproducer(应该可以使用))
,例如
,如果
pyspark_submit_args
不起作用,因为看起来不是,您应该在CLI上使用相同的选项,也可以使用
config(“ spark.jars.packages”,“ ...”)
,如图所示以下。You'll also need to ensure that the Kafka dataframe only has the mentioned schema, as per
任何示例之前过滤出第一个标头行
使用主机上验证的
You don't need Spark to read a CSV file and run a Kafka Producer in Python (I see you already tried to import KafkaProducer, which should have worked)
E.g
But if
PYSPARK_SUBMIT_ARGS
doesn't work, as it looks like it doesn't, you should use the same option on the CLIOr you can use
config("spark.jars.packages", "...")
on the session, as shown below.You'll also need to ensure that the Kafka dataframe only has the mentioned schema, as per the documentation (topic, key, value, etc). In other words, all CSV columns should be encoded as one string, so you'd be better off using
spark.read.text
and filtering out the first header row before you produce anythingExample
Verified on host with
您正在尝试直接编写DF,但它是否遵循必要列的Kafka所需的架构,
请检查 this 链接以获取详细信息,然后您可能需要将dataframe编码到值列中以将其发送到kafka
You are trying to write df directly but does it follow the Kafka required schema where value column is necessary
Please check this link for details and you might then need to encode your dataframe into value column to send it to Kafka