在阅读时,使用Spark写给Kafka的JSON字符串未正确转换给Kafka
我读了一个.csv文件以创建数据框架,我想将数据写入Kafka主题。代码是以下内容
df = spark.read.format("csv").option("header", "true").load(f'{file_location}')
kafka_df = df.selectExpr("to_json(struct(*)) AS value").selectExpr("CAST(value AS STRING)")
kafka_df.show(truncate=False)
,数据帧看起来像这样:
value
"{""id"":""d215e9f1-4d0c-42da-8f65-1f4ae72077b3"",""latitude"":""-63.571457254062715"",""longitude"":""-155.7055842710919""}"
"{""id"":""ca3d75b3-86e3-438f-b74f-c690e875ba52"",""latitude"":""-53.36506636464281"",""longitude"":""30.069167069917597""}"
"{""id"":""29e66862-9248-4af7-9126-6880ceb3b45f"",""latitude"":""-23.767505281795835"",""longitude"":""174.593140405442""}"
"{""id"":""451a7e21-6d5e-42c3-85a8-13c740a058a9"",""latitude"":""13.02054867061598"",""longitude"":""20.328402498420786""}"
"{""id"":""09d6c11d-7aae-4d17-8cd8-183157794893"",""latitude"":""-81.48976715040848"",""longitude"":""1.1995769642056189""}"
"{""id"":""393e8760-ef40-482a-a039-d263af3379ba"",""latitude"":""-71.73949722379649"",""longitude"":""112.59922770487054""}"
"{""id"":""d6db8fcf-ee83-41cf-9ec2-5c2909c18534"",""latitude"":""-4.034680969008576"",""longitude"":""60.59645511854336""}"
我想读取它并将其写入Kafka后,并将二进制数据从列“值”转换回JSON字符串,但结果是值仅包含ID,不是整个字符串。有ideea为什么?
from pyspark.sql import functions as F
df = consume_from_event_hub(topic, bootstrap_servers, config, consumer_group)
string_df = df.select(F.col("value").cast("string"))
string_df.display()
value
794541bc-30e6-4c16-9cd0-3c5c8995a3a4
20ea5b50-0baa-47e3-b921-f9a3ac8873e2
598d2fc1-c919-4498-9226-dd5749d92fc5
86cd5b2b-1c57-466a-a3c8-721811ab6959
807de968-c070-4b8b-86f6-00a865474c35
e708789c-e877-44b8-9504-86fd9a20ef91
9133a888-2e8d-4a5a-87ce-4a53e63b67fc
cd5e3e0d-8b02-45ee-8634-7e056d49bf3b
CSV格式是
id,latitude,longitude
bd6d98e1-d1da-4f41-94ba-8dbd8c8fce42,-86.06318155350924,-108.14300138138589
c39e84c6-8d7b-4cc5-b925-68a5ea406d52,74.20752175171859,-129.9453606091319
011e5fb8-6ab7-4ee9-97bb-acafc2c71e15,19.302250885973592,-103.2154291337162
I read a .csv file to create a data frame and I want to write the data to a kafka topic. The code is the following
df = spark.read.format("csv").option("header", "true").load(f'{file_location}')
kafka_df = df.selectExpr("to_json(struct(*)) AS value").selectExpr("CAST(value AS STRING)")
kafka_df.show(truncate=False)
And the data frame looks like this:
value
"{""id"":""d215e9f1-4d0c-42da-8f65-1f4ae72077b3"",""latitude"":""-63.571457254062715"",""longitude"":""-155.7055842710919""}"
"{""id"":""ca3d75b3-86e3-438f-b74f-c690e875ba52"",""latitude"":""-53.36506636464281"",""longitude"":""30.069167069917597""}"
"{""id"":""29e66862-9248-4af7-9126-6880ceb3b45f"",""latitude"":""-23.767505281795835"",""longitude"":""174.593140405442""}"
"{""id"":""451a7e21-6d5e-42c3-85a8-13c740a058a9"",""latitude"":""13.02054867061598"",""longitude"":""20.328402498420786""}"
"{""id"":""09d6c11d-7aae-4d17-8cd8-183157794893"",""latitude"":""-81.48976715040848"",""longitude"":""1.1995769642056189""}"
"{""id"":""393e8760-ef40-482a-a039-d263af3379ba"",""latitude"":""-71.73949722379649"",""longitude"":""112.59922770487054""}"
"{""id"":""d6db8fcf-ee83-41cf-9ec2-5c2909c18534"",""latitude"":""-4.034680969008576"",""longitude"":""60.59645511854336""}"
After I wrote it to Kafka I want to read it and transform the binary data from column "value" back to json string but the result is that the value contains only the id, not the whole string. Any ideea why?
from pyspark.sql import functions as F
df = consume_from_event_hub(topic, bootstrap_servers, config, consumer_group)
string_df = df.select(F.col("value").cast("string"))
string_df.display()
value
794541bc-30e6-4c16-9cd0-3c5c8995a3a4
20ea5b50-0baa-47e3-b921-f9a3ac8873e2
598d2fc1-c919-4498-9226-dd5749d92fc5
86cd5b2b-1c57-466a-a3c8-721811ab6959
807de968-c070-4b8b-86f6-00a865474c35
e708789c-e877-44b8-9504-86fd9a20ef91
9133a888-2e8d-4a5a-87ce-4a53e63b67fc
cd5e3e0d-8b02-45ee-8634-7e056d49bf3b
the CSV the format is this
id,latitude,longitude
bd6d98e1-d1da-4f41-94ba-8dbd8c8fce42,-86.06318155350924,-108.14300138138589
c39e84c6-8d7b-4cc5-b925-68a5ea406d52,74.20752175171859,-129.9453606091319
011e5fb8-6ab7-4ee9-97bb-acafc2c71e15,19.302250885973592,-103.2154291337162
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您需要删除
selectexpr(“ cast(value as string)”)
因为to_json
已经返回了字符串列,所以我不确定消费者怎么了。除非
commume_from_event_hub
做一些专门提取ID列的事情,否则应该有效You need to remove
selectExpr("CAST(value AS STRING)")
sinceto_json
already returns a string columnI'm not sure what's wrong with the consumer. That should have worked unless
consume_from_event_hub
does something specifically to extract the ID column