提取结构化二进制数据(从 Kafka)并转换为 Integer

发布于 2025-01-11 03:54:06 字数 2597 浏览 0 评论 0原文

我正在尝试使用 Python 从 Kafka 将数据提取到 Spark 中。来自 Kafka 的数据采用 JSON 格式,如下所示:

{"order_id": 56, "customer_id": 772, "taxful_total_price": 154

该 JSON 数据持续从 Kafka 流式传输。我想要的是在 Pyspark python 代码中读取这些数据并将其(不变)写入控制台。 (还没有聚合)。

这是代码:

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0- 
10_2.11:2.4.0 pyspark-shell'

from pyspark.sql.functions import from_json
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col

spark = SparkSession\
    .builder\
    .appName("Total-spending-for-top-users")\
    .getOrCreate()

df = spark.readStream.format("kafka")\
    .option("kafka.bootstrap.servers", "localhost:9092")\
    .option("subscribe", "orders")\
    .option("startingOffsets", "earliest")\
    .load()

jsonschema = StructType([StructField("order_id", IntegerType()), 
                    StructField("customer_id", IntegerType()), 
                    StructField("taxful_total_price", IntegerType())])

mta_stream = df.select(from_json(col("value").cast("string"), jsonschema) \
                            .alias("parsed_mta_values"))

mta_data = mta_stream.select("parsed_mta_values.*")

qry = mta_data.writeStream.outputMode("append").format("console").start()
qry.awaitTermination()

这是控制台中的输出:

| order_id | customer_id | taxful_total_price |
| -------- | ----------- | ------------------ |
|null      |   null      |        null        |
|null      |   null      |        null        |
|null      |   null      |        null        |
|null      |   null      |        null        |
|null      |   null      |        null        |

我将以下代码行更改如下:

mta_stream = df.select(col("value").alias("parsed_mta_values"))
mt = mta_stream.select("parsed_mta_values")

query = mt.writeStream.outputMode("append").format("console").start()
query.awaitTermination()

我得到了以下结果:

-------------------------------------------                                     
Batch: 0
-----------------------------------------

|   parsed_mta_values|
|--------------------|
|[22 7B 5C 22 6F 7...|
|[22 7B 5C 22 6F 7...|
|[22 7B 5C 22 6F 7...|
|[22 7B 5C 22 6F 7...|
|[22 7B 5C 22 6F 7...|
|[22 7B 5C 22 6F 7...|

正如我们所看到的,这是二进制数据。

如何从二进制转换为字符串?我提到的第一个代码中的以下内容不起作用:

mta_stream = df.select(from_json(col("value").cast("string"), jsonschema) \
                            .alias("parsed_mta_values"))

请帮忙!

I am trying to ingest data into Spark using Python from Kafka. The data from Kafka is in JSON format like this:

{"order_id": 56, "customer_id": 772, "taxful_total_price": 154

This JSON data is continuously streaming from Kafka. What I want is to read this data in Pyspark python code and write it (unchanged) to the console. (No aggregations yet).

Here is the code:

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0- 
10_2.11:2.4.0 pyspark-shell'

from pyspark.sql.functions import from_json
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col

spark = SparkSession\
    .builder\
    .appName("Total-spending-for-top-users")\
    .getOrCreate()

df = spark.readStream.format("kafka")\
    .option("kafka.bootstrap.servers", "localhost:9092")\
    .option("subscribe", "orders")\
    .option("startingOffsets", "earliest")\
    .load()

jsonschema = StructType([StructField("order_id", IntegerType()), 
                    StructField("customer_id", IntegerType()), 
                    StructField("taxful_total_price", IntegerType())])

mta_stream = df.select(from_json(col("value").cast("string"), jsonschema) \
                            .alias("parsed_mta_values"))

mta_data = mta_stream.select("parsed_mta_values.*")

qry = mta_data.writeStream.outputMode("append").format("console").start()
qry.awaitTermination()

And here is the output in the console:

| order_id | customer_id | taxful_total_price |
| -------- | ----------- | ------------------ |
|null      |   null      |        null        |
|null      |   null      |        null        |
|null      |   null      |        null        |
|null      |   null      |        null        |
|null      |   null      |        null        |

I changed the following lines of the code as follows:

mta_stream = df.select(col("value").alias("parsed_mta_values"))
mt = mta_stream.select("parsed_mta_values")

query = mt.writeStream.outputMode("append").format("console").start()
query.awaitTermination()

And I got the following result:

-------------------------------------------                                     
Batch: 0
-----------------------------------------

|   parsed_mta_values|
|--------------------|
|[22 7B 5C 22 6F 7...|
|[22 7B 5C 22 6F 7...|
|[22 7B 5C 22 6F 7...|
|[22 7B 5C 22 6F 7...|
|[22 7B 5C 22 6F 7...|
|[22 7B 5C 22 6F 7...|

As we can see this is binary data.

How do I convert from binary to string ? The following in the first code I mentioned does not work:

mta_stream = df.select(from_json(col("value").cast("string"), jsonschema) \
                            .alias("parsed_mta_values"))

Please help!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

迟到的我 2025-01-18 03:54:06

请尝试以下操作:

mta_stream = df.select(col("value").cast("string")).alias("parsed_mta_values")) \
               .select(from_json(col("parsed_mta_values"), jsonschema))

please try the following:

mta_stream = df.select(col("value").cast("string")).alias("parsed_mta_values")) \
               .select(from_json(col("parsed_mta_values"), jsonschema))
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文