使用 python 中的 Spark 结构化流从来自 kafka 的 json 创建数据框

发布于 2025-01-13 06:44:58 字数 2510 浏览 4 评论 0原文

我对 Spark 的结构化流还很陌生,正在开发一个需要在结构化流上实现的 poc。

输入源:卡夫卡 输入格式:json 语言: python3 库:spark 3.2

我正在尝试在预定义结构的 Spark 数据框中格式化传入的 json。

到目前为止,我能够获取 json 事件并能够在控制台中获取结果(不是预期的格式)。如果您能将我推向正确的方向或提出解决方案,那将非常有帮助。

下面是到目前为止我的代码。

来自 kafka

{"property1" : "hello","property2" : "world"}

Structured_kafka.py


"""
 Run the script
    `$ bin/spark-submit structured_kafka.py \
    host1:port1,host2:port2 subscribe topic1,topic2`
"""
import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType


if __name__ == "__main__":
    if len(sys.argv) != 4:
        print("""
        Usage: structured_kafka_wordcount.py <bootstrap-servers> <subscribe-type> <topics>
        """, file=sys.stderr)
        sys.exit(-1)

    bootstrapServers = sys.argv[1]
    subscribeType = sys.argv[2]
    topics = sys.argv[3]

    spark = SparkSession\
        .builder\
        .appName("StructuredKafkaWordCount")\
        .getOrCreate()

    spark.sparkContext.setLogLevel('WARN')

    
    schema = StructType([ 
        StructField("property1", StringType(), True),
        StructField("property2" , StringType(), True),
        ])


    lines = spark\
        .readStream\
        .format("kafka")\
        .option("kafka.bootstrap.servers", bootstrapServers)\
        .option(subscribeType, topics)\
        .load()\
        .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))


    df = lines.select('*')
  
    # Start running the query that prints the running counts to the console
    query = df\
        .writeStream\
        .outputMode('Append')\
        .format('console')\
        .start()

    query.awaitTermination()

输出

Batch: 1
-------------------------------------------
+--------------------+
|        parsed_value|
+--------------------+
|{hello, world}      |
+--------------------+

的 json预期

+--------------------+--------------------+
| property1          | property2          |
+--------------------+--------------------+
|hello               |world               |
+--------------------+---------------------

如果我能获得这种格式的 df,我将能够应用我的用例。

请建议。

注意:我查看了所有现有的解决方案,大多数解决方案要么采用scala,要么不适用于结构化流,或者不适用于kafka作为源。

I am new to spark's structured streaming and working on a poc that needs to be implemented on structured streaming.

input source : kafka
input format: json
language: python3
library: spark 3.2

I am trying to format incoming json in spark dataframe of a predefined structure.

So far I am able to fetch json events and able to get the results in console (not in expected format). It will be very helpful if you could nudge me in right direction or suggest a solution.

Below is my code so far.

json from kafka

{"property1" : "hello","property2" : "world"}

structured_kafka.py


"""
 Run the script
    `$ bin/spark-submit structured_kafka.py \
    host1:port1,host2:port2 subscribe topic1,topic2`
"""
import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType


if __name__ == "__main__":
    if len(sys.argv) != 4:
        print("""
        Usage: structured_kafka_wordcount.py <bootstrap-servers> <subscribe-type> <topics>
        """, file=sys.stderr)
        sys.exit(-1)

    bootstrapServers = sys.argv[1]
    subscribeType = sys.argv[2]
    topics = sys.argv[3]

    spark = SparkSession\
        .builder\
        .appName("StructuredKafkaWordCount")\
        .getOrCreate()

    spark.sparkContext.setLogLevel('WARN')

    
    schema = StructType([ 
        StructField("property1", StringType(), True),
        StructField("property2" , StringType(), True),
        ])


    lines = spark\
        .readStream\
        .format("kafka")\
        .option("kafka.bootstrap.servers", bootstrapServers)\
        .option(subscribeType, topics)\
        .load()\
        .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))


    df = lines.select('*')
  
    # Start running the query that prints the running counts to the console
    query = df\
        .writeStream\
        .outputMode('Append')\
        .format('console')\
        .start()

    query.awaitTermination()

output

Batch: 1
-------------------------------------------
+--------------------+
|        parsed_value|
+--------------------+
|{hello, world}      |
+--------------------+

expected

+--------------------+--------------------+
| property1          | property2          |
+--------------------+--------------------+
|hello               |world               |
+--------------------+---------------------

If I could get the df in this format , I will be able to apply my usecase.

Kindly suggest.

note: I have looked all existing solutions, most of the solutions are either in scala or not for structured streaming or not for kafka as source.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

疯狂的代价 2025-01-20 06:44:58

在行:之后

.select(from_json(col("value").cast("string"), schema).alias("parsed_value"))

添加:

.select(col("parsed_value.property1"), col("parsed_value.property2"))

或:

.select(col("parsed_value.*"))

After line:

.select(from_json(col("value").cast("string"), schema).alias("parsed_value"))

add:

.select(col("parsed_value.property1"), col("parsed_value.property2"))

or:

.select(col("parsed_value.*"))
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文