Pyspark从Kafka读取导致JSON数据损坏

发布于 2025-02-13 19:03:01 字数 8896 浏览 1 评论 0原文

我使用pyspark 2.4从kafka消耗,但是在施放包含消息的值列时 看起来像==>如下表中的表格=>幕,

知道我只是在消耗(我不知道生产者是如何制作的) 我只知道它从Postgre发送到Kafka主题, 然后,我通过这个Kafka主题的Pyspark消费 发送的数据是

我的消费者有问题吗?


kafka_messages : DataFrame = self.sparkSession.read \
            .format("kafka") \
            .option("kafka.bootstrap.servers", self.bootstrap_server) \
            .option("kafka.security.protocol", "SASL_PLAINTEXT") \
            .option("subscribe", self.topic) \
            .option("startingOffsets", offset_option) \
            .load()


def decode_and_flatten_msg_value(df, schema) -> DataFrame:
    df = (
        df
        .select(df['value'].cast('string'))
        .select(F.from_json(F.col("value"), schema).alias("data"))
        .select("data.*")
        .na
        .drop('all')
    )
    return df

here an example of the message  before cast to  string the value column : 
+--------------------+--------------------+--------------------+---------+------+--------------------+-------------+
|                 key|               value|               topic|partition|offset|           timestamp|timestampType|
+--------------------+--------------------+--------------------+---------+------+--------------------+-------------+
|[30 34 65 31 62 3...|[00 00 00 05 C0 B...|interne.pilo...|        0|    42|2022-06-27 17:28:...|            0|
|[66 62 37 61 31 3...|[00 00 00 05 C0 5...|interne.pilo...|        0|    43|2022-06-30 09:22:...|            0|
|[34 37 35 63 36 3...|[00 00 00 05 C0 5...|interne.pilo...|        0|    44|2022-06-30 17:26:...|            0|
|[62 63 63 39 39 3...|[00 00 00 05 C0 5...|interne.pilo...|        0|    45|2022-07-01 09:58:...|            0|
|[35 33 66 32 30 3...|[00 00 00 05 C0 0...|interne..pilo..|        0|    46|2022-07-05 11:39:...|            0|
|[37 32 30 30 37 3...|[00 00 00 05 C0 0...|interne.pilo...|        0|    47|2022-07-05 11:45:...|            0|
+--------------------+--------------------+--------------------+---------+------+--------------------+-------------+
here after the cast L : 
+-------------------------------------------------------------------------------------------------------------+-------------------+---------------------------------------------------------------+---------+------+-----------------------+-------------+

|key                                                                                                          |value              |topic                                                          |partition|offset|timestamp              |timestampType|

+-------------------------------------------------------------------------------------------------------------+-------------------+---------------------------------------------------------------+---------+------+-----------------------+-------------+

|[30 34 65 31 62 33 36 38 2D 63 64 66 39 2D 34 37 66 34 2D 38 65 35 34 2D 63 61 34 34 65 39 39 33 35 31 31 39]|�����īVAL

login|interne.pilot|0        |42    |2022-06-27 17:28:21.657|0            |

|[66 62 37 61 31 32 66 34 2D 65 32 32 61 2D 34 35 66 62 2D 38 65 34 65 2D 31 33 39 34 36 62 33 36 64 30 64 66]|����RʫVAL

login |interne.pilot|0        |43    |2022-06-30 09:22:58.05 |0            |

|[34 37 35 63 36 39 37 32 2D 65 37 35 33 2D 34 61 38 64 2D 62 30 61 30 2D 35 31 38 36 63 39 38 62 64 66 63 64]|����RʫVAL

login |interne.pilot|0        |44    |2022-06-30 17:26:40.336|0            |

|[62 63 63 39 39 30 38 32 2D 38 61 34 66 2D 34 37 64 65 2D 39 61 34 64 2D 63 34 32 30 33 32 31 34 30 63 62 33]|����R̫VAL

login |interne.pilot|0        |45    |2022-07-01 09:58:43.243|0            |

|[35 33 66 32 30 35 38 62 2D 64 38 63 65 2D 34 65 66 62 2D 39 61 62 61 2D 63 65 37 66 38 38 31 36 31 37 66 32]|����

ԫVAL

login |interne.pilot|0        |46    |2022-07-05 11:39:15.865|0            |

|[37 32 30 30 37 30 33 39 2D 61 62 62 61 2D 34 65 61 32 2D 61 34 39 63 2D 36 30 32 30 36 62 65 62 36 62 37 38]|����ԫVAL

login |interne.pilot|0        |47    |2022-07-05 11:45:07.092|0            |

+-------------------------------------------------------------------------------------------------------------+-------------------+---------------------------------------------------------------+---------+------+-----------------------+-------------+


key value FULL disply : 
 FULL KEY VALUE
+-------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+
|key                                                                                                          |value                                                        |
+-------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+
|[30 34 65 31 62 33 36 38 2D 63 64 66 39 2D 34 37 66 34 2D 38 65 35 34 2D 63 61 34 34 65 39 39 33 35 31 31 39]|[00 00 00 05 C0 B4 02 C4 AB 02 06 56 41 4C 0A 6C 6F 67 69 6E]|
|[66 62 37 61 31 32 66 34 2D 65 32 32 61 2D 34 35 66 62 2D 38 65 34 65 2D 31 33 39 34 36 62 33 36 64 30 64 66]|[00 00 00 05 C0 52 CA AB 02 06 56 41 4C 0A 6C 6F 67 69 6E]   |
|[34 37 35 63 36 39 37 32 2D 65 37 35 33 2D 34 61 38 64 2D 62 30 61 30 2D 35 31 38 36 63 39 38 62 64 66 63 64]|[00 00 00 05 C0 52 CA AB 02 06 56 41 4C 0A 6C 6F 67 69 6E]   |
|[62 63 63 39 39 30 38 32 2D 38 61 34 66 2D 34 37 64 65 2D 39 61 34 64 2D 63 34 32 30 33 32 31 34 30 63 62 33]|[00 00 00 05 C0 52 CC AB 02 06 56 41 4C 0A 6C 6F 67 69 6E]   |
|[35 33 66 32 30 35 38 62 2D 64 38 63 65 2D 34 65 66 62 2D 39 61 62 61 2D 63 65 37 66 38 38 31 36 31 37 66 32]|[00 00 00 05 C0 0A D4 AB 02 06 56 41 4C 0A 6C 6F 67 69 6E]   |
|[37 32 30 30 37 30 33 39 2D 61 62 62 61 2D 34 65 61 32 2D 61 34 39 63 2D 36 30 32 30 36 62 65 62 36 62 37 38]|[00 00 00 05 C0 06 D4 AB 02 06 56 41 4C 0A 6C 6F 67 69 6E]   |
+-------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+


im using pyspark 2.4 to consume from Kafka, but when casting the Value column that contain the messages
it appears like that == > like in the table below => �����īVAL ,

knowing that im just consuming ( i do not know how the producer was made)
i just know that its sent from postgre to kafka topic ,
then i consume it via pyspark from this kafka topic
and the sent data is JSON

is there somethin wrong with my consumer ?


kafka_messages : DataFrame = self.sparkSession.read \
            .format("kafka") \
            .option("kafka.bootstrap.servers", self.bootstrap_server) \
            .option("kafka.security.protocol", "SASL_PLAINTEXT") \
            .option("subscribe", self.topic) \
            .option("startingOffsets", offset_option) \
            .load()


def decode_and_flatten_msg_value(df, schema) -> DataFrame:
    df = (
        df
        .select(df['value'].cast('string'))
        .select(F.from_json(F.col("value"), schema).alias("data"))
        .select("data.*")
        .na
        .drop('all')
    )
    return df

here an example of the message  before cast to  string the value column : 
+--------------------+--------------------+--------------------+---------+------+--------------------+-------------+
|                 key|               value|               topic|partition|offset|           timestamp|timestampType|
+--------------------+--------------------+--------------------+---------+------+--------------------+-------------+
|[30 34 65 31 62 3...|[00 00 00 05 C0 B...|interne.pilo...|        0|    42|2022-06-27 17:28:...|            0|
|[66 62 37 61 31 3...|[00 00 00 05 C0 5...|interne.pilo...|        0|    43|2022-06-30 09:22:...|            0|
|[34 37 35 63 36 3...|[00 00 00 05 C0 5...|interne.pilo...|        0|    44|2022-06-30 17:26:...|            0|
|[62 63 63 39 39 3...|[00 00 00 05 C0 5...|interne.pilo...|        0|    45|2022-07-01 09:58:...|            0|
|[35 33 66 32 30 3...|[00 00 00 05 C0 0...|interne..pilo..|        0|    46|2022-07-05 11:39:...|            0|
|[37 32 30 30 37 3...|[00 00 00 05 C0 0...|interne.pilo...|        0|    47|2022-07-05 11:45:...|            0|
+--------------------+--------------------+--------------------+---------+------+--------------------+-------------+
here after the cast L : 
+-------------------------------------------------------------------------------------------------------------+-------------------+---------------------------------------------------------------+---------+------+-----------------------+-------------+

|key                                                                                                          |value              |topic                                                          |partition|offset|timestamp              |timestampType|

+-------------------------------------------------------------------------------------------------------------+-------------------+---------------------------------------------------------------+---------+------+-----------------------+-------------+

|[30 34 65 31 62 33 36 38 2D 63 64 66 39 2D 34 37 66 34 2D 38 65 35 34 2D 63 61 34 34 65 39 39 33 35 31 31 39]|�����īVAL

login|interne.pilot|0        |42    |2022-06-27 17:28:21.657|0            |

|[66 62 37 61 31 32 66 34 2D 65 32 32 61 2D 34 35 66 62 2D 38 65 34 65 2D 31 33 39 34 36 62 33 36 64 30 64 66]|����RʫVAL

login |interne.pilot|0        |43    |2022-06-30 09:22:58.05 |0            |

|[34 37 35 63 36 39 37 32 2D 65 37 35 33 2D 34 61 38 64 2D 62 30 61 30 2D 35 31 38 36 63 39 38 62 64 66 63 64]|����RʫVAL

login |interne.pilot|0        |44    |2022-06-30 17:26:40.336|0            |

|[62 63 63 39 39 30 38 32 2D 38 61 34 66 2D 34 37 64 65 2D 39 61 34 64 2D 63 34 32 30 33 32 31 34 30 63 62 33]|����R̫VAL

login |interne.pilot|0        |45    |2022-07-01 09:58:43.243|0            |

|[35 33 66 32 30 35 38 62 2D 64 38 63 65 2D 34 65 66 62 2D 39 61 62 61 2D 63 65 37 66 38 38 31 36 31 37 66 32]|����

ԫVAL

login |interne.pilot|0        |46    |2022-07-05 11:39:15.865|0            |

|[37 32 30 30 37 30 33 39 2D 61 62 62 61 2D 34 65 61 32 2D 61 34 39 63 2D 36 30 32 30 36 62 65 62 36 62 37 38]|����ԫVAL

login |interne.pilot|0        |47    |2022-07-05 11:45:07.092|0            |

+-------------------------------------------------------------------------------------------------------------+-------------------+---------------------------------------------------------------+---------+------+-----------------------+-------------+


key value FULL disply : 
 FULL KEY VALUE
+-------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+
|key                                                                                                          |value                                                        |
+-------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+
|[30 34 65 31 62 33 36 38 2D 63 64 66 39 2D 34 37 66 34 2D 38 65 35 34 2D 63 61 34 34 65 39 39 33 35 31 31 39]|[00 00 00 05 C0 B4 02 C4 AB 02 06 56 41 4C 0A 6C 6F 67 69 6E]|
|[66 62 37 61 31 32 66 34 2D 65 32 32 61 2D 34 35 66 62 2D 38 65 34 65 2D 31 33 39 34 36 62 33 36 64 30 64 66]|[00 00 00 05 C0 52 CA AB 02 06 56 41 4C 0A 6C 6F 67 69 6E]   |
|[34 37 35 63 36 39 37 32 2D 65 37 35 33 2D 34 61 38 64 2D 62 30 61 30 2D 35 31 38 36 63 39 38 62 64 66 63 64]|[00 00 00 05 C0 52 CA AB 02 06 56 41 4C 0A 6C 6F 67 69 6E]   |
|[62 63 63 39 39 30 38 32 2D 38 61 34 66 2D 34 37 64 65 2D 39 61 34 64 2D 63 34 32 30 33 32 31 34 30 63 62 33]|[00 00 00 05 C0 52 CC AB 02 06 56 41 4C 0A 6C 6F 67 69 6E]   |
|[35 33 66 32 30 35 38 62 2D 64 38 63 65 2D 34 65 66 62 2D 39 61 62 61 2D 63 65 37 66 38 38 31 36 31 37 66 32]|[00 00 00 05 C0 0A D4 AB 02 06 56 41 4C 0A 6C 6F 67 69 6E]   |
|[37 32 30 30 37 30 33 39 2D 61 62 62 61 2D 34 65 61 32 2D 61 34 39 63 2D 36 30 32 30 36 62 65 62 36 62 37 38]|[00 00 00 05 C0 06 D4 AB 02 06 56 41 4C 0A 6C 6F 67 69 6E]   |
+-------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+


如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文