Pyspark从Kafka读取导致JSON数据损坏
我使用pyspark 2.4从kafka消耗,但是在施放包含消息的值列时 看起来像==>如下表中的表格=>幕,
知道我只是在消耗(我不知道生产者是如何制作的) 我只知道它从Postgre发送到Kafka主题, 然后,我通过这个Kafka主题的Pyspark消费 发送的数据是
我的消费者有问题吗?
kafka_messages : DataFrame = self.sparkSession.read \
.format("kafka") \
.option("kafka.bootstrap.servers", self.bootstrap_server) \
.option("kafka.security.protocol", "SASL_PLAINTEXT") \
.option("subscribe", self.topic) \
.option("startingOffsets", offset_option) \
.load()
def decode_and_flatten_msg_value(df, schema) -> DataFrame:
df = (
df
.select(df['value'].cast('string'))
.select(F.from_json(F.col("value"), schema).alias("data"))
.select("data.*")
.na
.drop('all')
)
return df
here an example of the message before cast to string the value column :
+--------------------+--------------------+--------------------+---------+------+--------------------+-------------+
| key| value| topic|partition|offset| timestamp|timestampType|
+--------------------+--------------------+--------------------+---------+------+--------------------+-------------+
|[30 34 65 31 62 3...|[00 00 00 05 C0 B...|interne.pilo...| 0| 42|2022-06-27 17:28:...| 0|
|[66 62 37 61 31 3...|[00 00 00 05 C0 5...|interne.pilo...| 0| 43|2022-06-30 09:22:...| 0|
|[34 37 35 63 36 3...|[00 00 00 05 C0 5...|interne.pilo...| 0| 44|2022-06-30 17:26:...| 0|
|[62 63 63 39 39 3...|[00 00 00 05 C0 5...|interne.pilo...| 0| 45|2022-07-01 09:58:...| 0|
|[35 33 66 32 30 3...|[00 00 00 05 C0 0...|interne..pilo..| 0| 46|2022-07-05 11:39:...| 0|
|[37 32 30 30 37 3...|[00 00 00 05 C0 0...|interne.pilo...| 0| 47|2022-07-05 11:45:...| 0|
+--------------------+--------------------+--------------------+---------+------+--------------------+-------------+
here after the cast L :
+-------------------------------------------------------------------------------------------------------------+-------------------+---------------------------------------------------------------+---------+------+-----------------------+-------------+
|key |value |topic |partition|offset|timestamp |timestampType|
+-------------------------------------------------------------------------------------------------------------+-------------------+---------------------------------------------------------------+---------+------+-----------------------+-------------+
|[30 34 65 31 62 33 36 38 2D 63 64 66 39 2D 34 37 66 34 2D 38 65 35 34 2D 63 61 34 34 65 39 39 33 35 31 31 39]|�����īVAL
login|interne.pilot|0 |42 |2022-06-27 17:28:21.657|0 |
|[66 62 37 61 31 32 66 34 2D 65 32 32 61 2D 34 35 66 62 2D 38 65 34 65 2D 31 33 39 34 36 62 33 36 64 30 64 66]|����RʫVAL
login |interne.pilot|0 |43 |2022-06-30 09:22:58.05 |0 |
|[34 37 35 63 36 39 37 32 2D 65 37 35 33 2D 34 61 38 64 2D 62 30 61 30 2D 35 31 38 36 63 39 38 62 64 66 63 64]|����RʫVAL
login |interne.pilot|0 |44 |2022-06-30 17:26:40.336|0 |
|[62 63 63 39 39 30 38 32 2D 38 61 34 66 2D 34 37 64 65 2D 39 61 34 64 2D 63 34 32 30 33 32 31 34 30 63 62 33]|����R̫VAL
login |interne.pilot|0 |45 |2022-07-01 09:58:43.243|0 |
|[35 33 66 32 30 35 38 62 2D 64 38 63 65 2D 34 65 66 62 2D 39 61 62 61 2D 63 65 37 66 38 38 31 36 31 37 66 32]|����
ԫVAL
login |interne.pilot|0 |46 |2022-07-05 11:39:15.865|0 |
|[37 32 30 30 37 30 33 39 2D 61 62 62 61 2D 34 65 61 32 2D 61 34 39 63 2D 36 30 32 30 36 62 65 62 36 62 37 38]|����ԫVAL
login |interne.pilot|0 |47 |2022-07-05 11:45:07.092|0 |
+-------------------------------------------------------------------------------------------------------------+-------------------+---------------------------------------------------------------+---------+------+-----------------------+-------------+
key value FULL disply :
FULL KEY VALUE
+-------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+
|key |value |
+-------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+
|[30 34 65 31 62 33 36 38 2D 63 64 66 39 2D 34 37 66 34 2D 38 65 35 34 2D 63 61 34 34 65 39 39 33 35 31 31 39]|[00 00 00 05 C0 B4 02 C4 AB 02 06 56 41 4C 0A 6C 6F 67 69 6E]|
|[66 62 37 61 31 32 66 34 2D 65 32 32 61 2D 34 35 66 62 2D 38 65 34 65 2D 31 33 39 34 36 62 33 36 64 30 64 66]|[00 00 00 05 C0 52 CA AB 02 06 56 41 4C 0A 6C 6F 67 69 6E] |
|[34 37 35 63 36 39 37 32 2D 65 37 35 33 2D 34 61 38 64 2D 62 30 61 30 2D 35 31 38 36 63 39 38 62 64 66 63 64]|[00 00 00 05 C0 52 CA AB 02 06 56 41 4C 0A 6C 6F 67 69 6E] |
|[62 63 63 39 39 30 38 32 2D 38 61 34 66 2D 34 37 64 65 2D 39 61 34 64 2D 63 34 32 30 33 32 31 34 30 63 62 33]|[00 00 00 05 C0 52 CC AB 02 06 56 41 4C 0A 6C 6F 67 69 6E] |
|[35 33 66 32 30 35 38 62 2D 64 38 63 65 2D 34 65 66 62 2D 39 61 62 61 2D 63 65 37 66 38 38 31 36 31 37 66 32]|[00 00 00 05 C0 0A D4 AB 02 06 56 41 4C 0A 6C 6F 67 69 6E] |
|[37 32 30 30 37 30 33 39 2D 61 62 62 61 2D 34 65 61 32 2D 61 34 39 63 2D 36 30 32 30 36 62 65 62 36 62 37 38]|[00 00 00 05 C0 06 D4 AB 02 06 56 41 4C 0A 6C 6F 67 69 6E] |
+-------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+
im using pyspark 2.4 to consume from Kafka, but when casting the Value column that contain the messages
it appears like that == > like in the table below => �����īVAL ,
knowing that im just consuming ( i do not know how the producer was made)
i just know that its sent from postgre to kafka topic ,
then i consume it via pyspark from this kafka topic
and the sent data is JSON
is there somethin wrong with my consumer ?
kafka_messages : DataFrame = self.sparkSession.read \
.format("kafka") \
.option("kafka.bootstrap.servers", self.bootstrap_server) \
.option("kafka.security.protocol", "SASL_PLAINTEXT") \
.option("subscribe", self.topic) \
.option("startingOffsets", offset_option) \
.load()
def decode_and_flatten_msg_value(df, schema) -> DataFrame:
df = (
df
.select(df['value'].cast('string'))
.select(F.from_json(F.col("value"), schema).alias("data"))
.select("data.*")
.na
.drop('all')
)
return df
here an example of the message before cast to string the value column :
+--------------------+--------------------+--------------------+---------+------+--------------------+-------------+
| key| value| topic|partition|offset| timestamp|timestampType|
+--------------------+--------------------+--------------------+---------+------+--------------------+-------------+
|[30 34 65 31 62 3...|[00 00 00 05 C0 B...|interne.pilo...| 0| 42|2022-06-27 17:28:...| 0|
|[66 62 37 61 31 3...|[00 00 00 05 C0 5...|interne.pilo...| 0| 43|2022-06-30 09:22:...| 0|
|[34 37 35 63 36 3...|[00 00 00 05 C0 5...|interne.pilo...| 0| 44|2022-06-30 17:26:...| 0|
|[62 63 63 39 39 3...|[00 00 00 05 C0 5...|interne.pilo...| 0| 45|2022-07-01 09:58:...| 0|
|[35 33 66 32 30 3...|[00 00 00 05 C0 0...|interne..pilo..| 0| 46|2022-07-05 11:39:...| 0|
|[37 32 30 30 37 3...|[00 00 00 05 C0 0...|interne.pilo...| 0| 47|2022-07-05 11:45:...| 0|
+--------------------+--------------------+--------------------+---------+------+--------------------+-------------+
here after the cast L :
+-------------------------------------------------------------------------------------------------------------+-------------------+---------------------------------------------------------------+---------+------+-----------------------+-------------+
|key |value |topic |partition|offset|timestamp |timestampType|
+-------------------------------------------------------------------------------------------------------------+-------------------+---------------------------------------------------------------+---------+------+-----------------------+-------------+
|[30 34 65 31 62 33 36 38 2D 63 64 66 39 2D 34 37 66 34 2D 38 65 35 34 2D 63 61 34 34 65 39 39 33 35 31 31 39]|�����īVAL
login|interne.pilot|0 |42 |2022-06-27 17:28:21.657|0 |
|[66 62 37 61 31 32 66 34 2D 65 32 32 61 2D 34 35 66 62 2D 38 65 34 65 2D 31 33 39 34 36 62 33 36 64 30 64 66]|����RʫVAL
login |interne.pilot|0 |43 |2022-06-30 09:22:58.05 |0 |
|[34 37 35 63 36 39 37 32 2D 65 37 35 33 2D 34 61 38 64 2D 62 30 61 30 2D 35 31 38 36 63 39 38 62 64 66 63 64]|����RʫVAL
login |interne.pilot|0 |44 |2022-06-30 17:26:40.336|0 |
|[62 63 63 39 39 30 38 32 2D 38 61 34 66 2D 34 37 64 65 2D 39 61 34 64 2D 63 34 32 30 33 32 31 34 30 63 62 33]|����R̫VAL
login |interne.pilot|0 |45 |2022-07-01 09:58:43.243|0 |
|[35 33 66 32 30 35 38 62 2D 64 38 63 65 2D 34 65 66 62 2D 39 61 62 61 2D 63 65 37 66 38 38 31 36 31 37 66 32]|����
ԫVAL
login |interne.pilot|0 |46 |2022-07-05 11:39:15.865|0 |
|[37 32 30 30 37 30 33 39 2D 61 62 62 61 2D 34 65 61 32 2D 61 34 39 63 2D 36 30 32 30 36 62 65 62 36 62 37 38]|����ԫVAL
login |interne.pilot|0 |47 |2022-07-05 11:45:07.092|0 |
+-------------------------------------------------------------------------------------------------------------+-------------------+---------------------------------------------------------------+---------+------+-----------------------+-------------+
key value FULL disply :
FULL KEY VALUE
+-------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+
|key |value |
+-------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+
|[30 34 65 31 62 33 36 38 2D 63 64 66 39 2D 34 37 66 34 2D 38 65 35 34 2D 63 61 34 34 65 39 39 33 35 31 31 39]|[00 00 00 05 C0 B4 02 C4 AB 02 06 56 41 4C 0A 6C 6F 67 69 6E]|
|[66 62 37 61 31 32 66 34 2D 65 32 32 61 2D 34 35 66 62 2D 38 65 34 65 2D 31 33 39 34 36 62 33 36 64 30 64 66]|[00 00 00 05 C0 52 CA AB 02 06 56 41 4C 0A 6C 6F 67 69 6E] |
|[34 37 35 63 36 39 37 32 2D 65 37 35 33 2D 34 61 38 64 2D 62 30 61 30 2D 35 31 38 36 63 39 38 62 64 66 63 64]|[00 00 00 05 C0 52 CA AB 02 06 56 41 4C 0A 6C 6F 67 69 6E] |
|[62 63 63 39 39 30 38 32 2D 38 61 34 66 2D 34 37 64 65 2D 39 61 34 64 2D 63 34 32 30 33 32 31 34 30 63 62 33]|[00 00 00 05 C0 52 CC AB 02 06 56 41 4C 0A 6C 6F 67 69 6E] |
|[35 33 66 32 30 35 38 62 2D 64 38 63 65 2D 34 65 66 62 2D 39 61 62 61 2D 63 65 37 66 38 38 31 36 31 37 66 32]|[00 00 00 05 C0 0A D4 AB 02 06 56 41 4C 0A 6C 6F 67 69 6E] |
|[37 32 30 30 37 30 33 39 2D 61 62 62 61 2D 34 65 61 32 2D 61 34 39 63 2D 36 30 32 30 36 62 65 62 36 62 37 38]|[00 00 00 05 C0 06 D4 AB 02 06 56 41 4C 0A 6C 6F 67 69 6E] |
+-------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论