提取结构化二进制数据(从 Kafka)并转换为 Integer
我正在尝试使用 Python 从 Kafka 将数据提取到 Spark 中。来自 Kafka 的数据采用 JSON 格式,如下所示:
{"order_id": 56, "customer_id": 772, "taxful_total_price": 154
该 JSON 数据持续从 Kafka 流式传输。我想要的是在 Pyspark python 代码中读取这些数据并将其(不变)写入控制台。 (还没有聚合)。
这是代码:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-
10_2.11:2.4.0 pyspark-shell'
from pyspark.sql.functions import from_json
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col
spark = SparkSession\
.builder\
.appName("Total-spending-for-top-users")\
.getOrCreate()
df = spark.readStream.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe", "orders")\
.option("startingOffsets", "earliest")\
.load()
jsonschema = StructType([StructField("order_id", IntegerType()),
StructField("customer_id", IntegerType()),
StructField("taxful_total_price", IntegerType())])
mta_stream = df.select(from_json(col("value").cast("string"), jsonschema) \
.alias("parsed_mta_values"))
mta_data = mta_stream.select("parsed_mta_values.*")
qry = mta_data.writeStream.outputMode("append").format("console").start()
qry.awaitTermination()
这是控制台中的输出:
| order_id | customer_id | taxful_total_price |
| -------- | ----------- | ------------------ |
|null | null | null |
|null | null | null |
|null | null | null |
|null | null | null |
|null | null | null |
我将以下代码行更改如下:
mta_stream = df.select(col("value").alias("parsed_mta_values"))
mt = mta_stream.select("parsed_mta_values")
query = mt.writeStream.outputMode("append").format("console").start()
query.awaitTermination()
我得到了以下结果:
-------------------------------------------
Batch: 0
-----------------------------------------
| parsed_mta_values|
|--------------------|
|[22 7B 5C 22 6F 7...|
|[22 7B 5C 22 6F 7...|
|[22 7B 5C 22 6F 7...|
|[22 7B 5C 22 6F 7...|
|[22 7B 5C 22 6F 7...|
|[22 7B 5C 22 6F 7...|
正如我们所看到的,这是二进制数据。
如何从二进制转换为字符串?我提到的第一个代码中的以下内容不起作用:
mta_stream = df.select(from_json(col("value").cast("string"), jsonschema) \
.alias("parsed_mta_values"))
请帮忙!
I am trying to ingest data into Spark using Python from Kafka. The data from Kafka is in JSON format like this:
{"order_id": 56, "customer_id": 772, "taxful_total_price": 154
This JSON data is continuously streaming from Kafka. What I want is to read this data in Pyspark python code and write it (unchanged) to the console. (No aggregations yet).
Here is the code:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-
10_2.11:2.4.0 pyspark-shell'
from pyspark.sql.functions import from_json
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col
spark = SparkSession\
.builder\
.appName("Total-spending-for-top-users")\
.getOrCreate()
df = spark.readStream.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe", "orders")\
.option("startingOffsets", "earliest")\
.load()
jsonschema = StructType([StructField("order_id", IntegerType()),
StructField("customer_id", IntegerType()),
StructField("taxful_total_price", IntegerType())])
mta_stream = df.select(from_json(col("value").cast("string"), jsonschema) \
.alias("parsed_mta_values"))
mta_data = mta_stream.select("parsed_mta_values.*")
qry = mta_data.writeStream.outputMode("append").format("console").start()
qry.awaitTermination()
And here is the output in the console:
| order_id | customer_id | taxful_total_price |
| -------- | ----------- | ------------------ |
|null | null | null |
|null | null | null |
|null | null | null |
|null | null | null |
|null | null | null |
I changed the following lines of the code as follows:
mta_stream = df.select(col("value").alias("parsed_mta_values"))
mt = mta_stream.select("parsed_mta_values")
query = mt.writeStream.outputMode("append").format("console").start()
query.awaitTermination()
And I got the following result:
-------------------------------------------
Batch: 0
-----------------------------------------
| parsed_mta_values|
|--------------------|
|[22 7B 5C 22 6F 7...|
|[22 7B 5C 22 6F 7...|
|[22 7B 5C 22 6F 7...|
|[22 7B 5C 22 6F 7...|
|[22 7B 5C 22 6F 7...|
|[22 7B 5C 22 6F 7...|
As we can see this is binary data.
How do I convert from binary to string ? The following in the first code I mentioned does not work:
mta_stream = df.select(from_json(col("value").cast("string"), jsonschema) \
.alias("parsed_mta_values"))
Please help!
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
请尝试以下操作:
please try the following: