如何将Pyspark DataFrame发送到Kafka主题?

发布于 2025-02-06 20:42:39 字数 1617 浏览 3 评论 0原文

Pyspark版本-2.4.7 KAFKA版本-2.13_3.2.0

HI,我是Pyspark和流媒体属性的新手。我在Internet中遇到了一些资源,但我仍然无法弄清楚如何将Pyspark数据框架发送到Kafka经纪人。我需要编写生产者代码。 我正在阅读CSV文件中的数据,并试图将其发送到Kafka主题。请帮助我使用代码和配置。

import findspark
findspark.init("/usr/local/spark")
from pyspark.sql import SparkSession
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.functions import *
import os
from kafka import KafkaProducer

import csv

def spark_session():
    '''
    Description:
        To open a spark session. Returns a spark session object.
    '''
    spark = SparkSession \
        .builder \
        .appName("Test_Kafka_Producer") \
        .master("local[*]") \
        .getOrCreate()
    
    return spark
   
if __name__ == '__main__':

    spark = spark_session()
    topic = "Kafkatest"
    spark_version = '2.4.7'
    os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.13:{}'.format(spark_version)
 
    #producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                       #value_serializer= lambda x: x.encode('utf-8'))

    df1 = spark.read.csv("annual-enterprise-survey-2020-financial-year-provisional-size-bands-csv.csv", inferSchema = True, header = True)
    df1.show(10)

    print("sending df===========")

    df1.write \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", topic) \
    .save()

    print("End------")

我在这一点代码中遇到的错误是 py4j.protocol.py4jjavaerror:调用o41.save时发生错误。 :org.apache.spark.sql.analysisexception:未能找到数据来源:kafka。请根据“结构化流 + Kafka集成指南”的部署部分部署应用程序。

pyspark version - 2.4.7
kafka version - 2.13_3.2.0

Hi, I am new to pyspark and streaming properties. I have come across few resources in the internet, but still I am not able to figure out how to send a pyspark data frame to a kafka broker. I need to write a producer code.
I am reading the data from a csv file and trying to send it to kafka topic. Please help me out with the code and the configurations.

import findspark
findspark.init("/usr/local/spark")
from pyspark.sql import SparkSession
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.functions import *
import os
from kafka import KafkaProducer

import csv

def spark_session():
    '''
    Description:
        To open a spark session. Returns a spark session object.
    '''
    spark = SparkSession \
        .builder \
        .appName("Test_Kafka_Producer") \
        .master("local[*]") \
        .getOrCreate()
    
    return spark
   
if __name__ == '__main__':

    spark = spark_session()
    topic = "Kafkatest"
    spark_version = '2.4.7'
    os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.13:{}'.format(spark_version)
 
    #producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                       #value_serializer= lambda x: x.encode('utf-8'))

    df1 = spark.read.csv("annual-enterprise-survey-2020-financial-year-provisional-size-bands-csv.csv", inferSchema = True, header = True)
    df1.show(10)

    print("sending df===========")

    df1.write \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", topic) \
    .save()

    print("End------")

The error that I am encountering for this bit of code is
py4j.protocol.Py4JJavaError: An error occurred while calling o41.save. : org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

翻了热茶 2025-02-13 20:42:39

您不需要Spark即可读取CSV文件并在Python中运行KAFKA生产商(我看到您已经尝试导入Kafkaproducer(应该可以使用))

,例如

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer= lambda x: x.encode('utf-8'))
with open("annual-enterprise-survey-2020-financial-year-provisional-size-bands-csv.csv") as f:
    for i, line in enumerate(f):
        if i > 0:
            producer.send(topic, line)
producer.flush()

,如果pyspark_submit_args不起作用,因为看起来不是,您应该在CLI上使用相同的选项

spark-submit --packages ... app.py

,也可以使用config(“ spark.jars.packages”,“ ...”),如图所示以下。


You'll also need to ensure that the Kafka dataframe only has the mentioned schema, as per

任何示例之前过滤出第一个标头行

from pyspark.sql import SparkSession

scala_version = '2.12'  # TODO: Ensure this is correct
spark_version = '3.2.1'
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:3.2.0'
]
spark = SparkSession.builder\
   .master("local")\
   .appName("kafka-example")\
   .config("spark.jars.packages", ",".join(packages))\
   .getOrCreate()

# Read all lines into a single value dataframe  with column 'value'
# TODO: Replace with real file. 
df = spark.read.text('file:///tmp/data.csv')

# TODO: Remove the file header, if it exists

# Write
df.write.format("kafka")\
  .option("kafka.bootstrap.servers", "localhost:9092")\
  .option("topic", "foobar")\
  .save()

使用主机上验证的

$ kcat -b localhost:9092 -C -t foobar

You don't need Spark to read a CSV file and run a Kafka Producer in Python (I see you already tried to import KafkaProducer, which should have worked)

E.g

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer= lambda x: x.encode('utf-8'))
with open("annual-enterprise-survey-2020-financial-year-provisional-size-bands-csv.csv") as f:
    for i, line in enumerate(f):
        if i > 0:
            producer.send(topic, line)
producer.flush()

But if PYSPARK_SUBMIT_ARGS doesn't work, as it looks like it doesn't, you should use the same option on the CLI

spark-submit --packages ... app.py

Or you can use config("spark.jars.packages", "...") on the session, as shown below.


You'll also need to ensure that the Kafka dataframe only has the mentioned schema, as per the documentation (topic, key, value, etc). In other words, all CSV columns should be encoded as one string, so you'd be better off using spark.read.text and filtering out the first header row before you produce anything

Example

from pyspark.sql import SparkSession

scala_version = '2.12'  # TODO: Ensure this is correct
spark_version = '3.2.1'
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:3.2.0'
]
spark = SparkSession.builder\
   .master("local")\
   .appName("kafka-example")\
   .config("spark.jars.packages", ",".join(packages))\
   .getOrCreate()

# Read all lines into a single value dataframe  with column 'value'
# TODO: Replace with real file. 
df = spark.read.text('file:///tmp/data.csv')

# TODO: Remove the file header, if it exists

# Write
df.write.format("kafka")\
  .option("kafka.bootstrap.servers", "localhost:9092")\
  .option("topic", "foobar")\
  .save()

Verified on host with

$ kcat -b localhost:9092 -C -t foobar
你丑哭了我 2025-02-13 20:42:39

您正在尝试直接编写DF,但它是否遵循必要列的Kafka所需的架构,

请检查 this 链接以获取详细信息,然后您可能需要将dataframe编码到值列中以将其发送到kafka

You are trying to write df directly but does it follow the Kafka required schema where value column is necessary

Please check this link for details and you might then need to encode your dataframe into value column to send it to Kafka

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文