在阅读时,使用Spark写给Kafka的JSON字符串未正确转换给Kafka

发布于 2025-02-11 23:04:43 字数 2108 浏览 1 评论 0原文

我读了一个.csv文件以创建数据框架,我想将数据写入Kafka主题。代码是以下内容

df = spark.read.format("csv").option("header", "true").load(f'{file_location}')
kafka_df = df.selectExpr("to_json(struct(*)) AS value").selectExpr("CAST(value AS STRING)")
kafka_df.show(truncate=False)

,数据帧看起来像这样:

value
"{""id"":""d215e9f1-4d0c-42da-8f65-1f4ae72077b3"",""latitude"":""-63.571457254062715"",""longitude"":""-155.7055842710919""}"
"{""id"":""ca3d75b3-86e3-438f-b74f-c690e875ba52"",""latitude"":""-53.36506636464281"",""longitude"":""30.069167069917597""}"
"{""id"":""29e66862-9248-4af7-9126-6880ceb3b45f"",""latitude"":""-23.767505281795835"",""longitude"":""174.593140405442""}"
"{""id"":""451a7e21-6d5e-42c3-85a8-13c740a058a9"",""latitude"":""13.02054867061598"",""longitude"":""20.328402498420786""}"
"{""id"":""09d6c11d-7aae-4d17-8cd8-183157794893"",""latitude"":""-81.48976715040848"",""longitude"":""1.1995769642056189""}"
"{""id"":""393e8760-ef40-482a-a039-d263af3379ba"",""latitude"":""-71.73949722379649"",""longitude"":""112.59922770487054""}"
"{""id"":""d6db8fcf-ee83-41cf-9ec2-5c2909c18534"",""latitude"":""-4.034680969008576"",""longitude"":""60.59645511854336""}"

我想读取它并将其写入Kafka后,并将二进制数据从列“值”转换回JSON字符串,但结果是值仅包含ID,不是整个字符串。有ideea为什么?

from pyspark.sql import functions as F

df = consume_from_event_hub(topic, bootstrap_servers, config, consumer_group)
string_df = df.select(F.col("value").cast("string"))
string_df.display()
value
794541bc-30e6-4c16-9cd0-3c5c8995a3a4
20ea5b50-0baa-47e3-b921-f9a3ac8873e2
598d2fc1-c919-4498-9226-dd5749d92fc5
86cd5b2b-1c57-466a-a3c8-721811ab6959
807de968-c070-4b8b-86f6-00a865474c35
e708789c-e877-44b8-9504-86fd9a20ef91
9133a888-2e8d-4a5a-87ce-4a53e63b67fc
cd5e3e0d-8b02-45ee-8634-7e056d49bf3b

CSV格式是

id,latitude,longitude 
bd6d98e1-d1da-4f41-94ba-8dbd8c8fce42,-86.06318155350924,-108.14300138138589
c39e84c6-8d7b-4cc5-b925-68a5ea406d52,74.20752175171859,-129.9453606091319
011e5fb8-6ab7-4ee9-97bb-acafc2c71e15,19.302250885973592,-103.2154291337162

I read a .csv file to create a data frame and I want to write the data to a kafka topic. The code is the following

df = spark.read.format("csv").option("header", "true").load(f'{file_location}')
kafka_df = df.selectExpr("to_json(struct(*)) AS value").selectExpr("CAST(value AS STRING)")
kafka_df.show(truncate=False)

And the data frame looks like this:

value
"{""id"":""d215e9f1-4d0c-42da-8f65-1f4ae72077b3"",""latitude"":""-63.571457254062715"",""longitude"":""-155.7055842710919""}"
"{""id"":""ca3d75b3-86e3-438f-b74f-c690e875ba52"",""latitude"":""-53.36506636464281"",""longitude"":""30.069167069917597""}"
"{""id"":""29e66862-9248-4af7-9126-6880ceb3b45f"",""latitude"":""-23.767505281795835"",""longitude"":""174.593140405442""}"
"{""id"":""451a7e21-6d5e-42c3-85a8-13c740a058a9"",""latitude"":""13.02054867061598"",""longitude"":""20.328402498420786""}"
"{""id"":""09d6c11d-7aae-4d17-8cd8-183157794893"",""latitude"":""-81.48976715040848"",""longitude"":""1.1995769642056189""}"
"{""id"":""393e8760-ef40-482a-a039-d263af3379ba"",""latitude"":""-71.73949722379649"",""longitude"":""112.59922770487054""}"
"{""id"":""d6db8fcf-ee83-41cf-9ec2-5c2909c18534"",""latitude"":""-4.034680969008576"",""longitude"":""60.59645511854336""}"

After I wrote it to Kafka I want to read it and transform the binary data from column "value" back to json string but the result is that the value contains only the id, not the whole string. Any ideea why?

from pyspark.sql import functions as F

df = consume_from_event_hub(topic, bootstrap_servers, config, consumer_group)
string_df = df.select(F.col("value").cast("string"))
string_df.display()
value
794541bc-30e6-4c16-9cd0-3c5c8995a3a4
20ea5b50-0baa-47e3-b921-f9a3ac8873e2
598d2fc1-c919-4498-9226-dd5749d92fc5
86cd5b2b-1c57-466a-a3c8-721811ab6959
807de968-c070-4b8b-86f6-00a865474c35
e708789c-e877-44b8-9504-86fd9a20ef91
9133a888-2e8d-4a5a-87ce-4a53e63b67fc
cd5e3e0d-8b02-45ee-8634-7e056d49bf3b

the CSV the format is this

id,latitude,longitude 
bd6d98e1-d1da-4f41-94ba-8dbd8c8fce42,-86.06318155350924,-108.14300138138589
c39e84c6-8d7b-4cc5-b925-68a5ea406d52,74.20752175171859,-129.9453606091319
011e5fb8-6ab7-4ee9-97bb-acafc2c71e15,19.302250885973592,-103.2154291337162

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

吻风 2025-02-18 23:04:43

您需要删除selectexpr(“ cast(value as string)”)因为to_json已经返回了字符串列,所以

from pyspark.sql.functions import col, to_json, struct
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(f'{file_location}')
kafka_df = df.select(to_json(struct(col("*"))).alias("value"))
kafka_df.show(truncate=False)

我不确定消费者怎么了。除非commume_from_event_hub做一些专门提取ID列的事情,否则应该有效

You need to remove selectExpr("CAST(value AS STRING)") since to_json already returns a string column

from pyspark.sql.functions import col, to_json, struct
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(f'{file_location}')
kafka_df = df.select(to_json(struct(col("*"))).alias("value"))
kafka_df.show(truncate=False)

I'm not sure what's wrong with the consumer. That should have worked unless consume_from_event_hub does something specifically to extract the ID column

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文