在 Spark 结构化流中解释数组 JSON 中的数据帧

发布于 2025-01-14 09:34:20 字数 482 浏览 2 评论 0原文

我的数据框中有一列下面的 json 字符串,如何分解/压平它以获得单级数据框? 目前的模式是

df
|-json_data (StringType)

如何拥有以下模式的 df ?

df
|-key1
|-key2_signal
|-key2_value

[{
    "key1": 1647336730000,
    "key2": [
      {
        "signal": "signal_key_1",
        "value": 73.6
    },
      {
        "signal": "signal_key_2",
        "value": 3.375
    },
      {
        "signal": "signal_key_3",
        "value": 13.82
    }]
}]

I am having a column of below json string in my dataframe how can I explode/flatten it to get single level dataframe ?
Currently the schema is

df
|-json_data (StringType)

how can I have a df of below schema ?

df
|-key1
|-key2_signal
|-key2_value

[{
    "key1": 1647336730000,
    "key2": [
      {
        "signal": "signal_key_1",
        "value": 73.6
    },
      {
        "signal": "signal_key_2",
        "value": 3.375
    },
      {
        "signal": "signal_key_3",
        "value": 13.82
    }]
}]

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

嗫嚅 2025-01-21 09:34:20

我想你应该这样做。

说明:

  1. 创建用于对示例数据进行练习的虚拟模型。 (df1 创建)
  2. 获取字符串列并将其转换为实际的 JSON。
  3. 选择所有字段。
  4. 分解 key2,因为它是一个列表。
  5. 从 key2 中选择所有相关键
  6. 选择 key1 和 key2 相关数据。使用星号 (*)
import pyspark.sql.functions as f
from pyspark.sql.types import *
import json
df1 = spark.createDataFrame(
    [{'json_data': [json.dumps({
        "key1": 1647336730000,
        "key2": [
            {
                "signal": "signal_key_1",
                "value": 73.6
            },
            {
                "signal": "signal_key_2",
                "value": 3.375
            },
            {
                "signal": "signal_key_3",
                "value": 13.82
            }]
    })]}],
    schema=StructType([StructField('json_data', StringType(), True)])
)
(
    df1
    .withColumn('actual', f.from_json(f.col('json_data'), f.schema_of_json(f.lit(df1.select(f.col("json_data")).first()[0]))))
    .withColumn('data', f.explode('actual'))
    .drop('actual')
    .withColumn('key2', f.explode('data.key2'))
    .drop('json_data')
    .select('data.key1', 'key2.*').show()
    )
# +-------------+------------+-----+
# |         key1|      signal|value|
# +-------------+------------+-----+
# |1647336730000|signal_key_1| 73.6|
# |1647336730000|signal_key_2|3.375|
# |1647336730000|signal_key_3|13.82|
# +-------------+------------+-----+

I guess it's how you should do it.

Explanation:

  1. Creating the dummy for practice on your sample data. (df1 creation)
  2. Take the string column and cast it to be an actual JSON.
  3. Select all of the fields.
  4. Explode key2 since it's a list.
  5. select all the relevant keys from within key2
  6. select key1 and key2 related data. using asterisk (*)
import pyspark.sql.functions as f
from pyspark.sql.types import *
import json
df1 = spark.createDataFrame(
    [{'json_data': [json.dumps({
        "key1": 1647336730000,
        "key2": [
            {
                "signal": "signal_key_1",
                "value": 73.6
            },
            {
                "signal": "signal_key_2",
                "value": 3.375
            },
            {
                "signal": "signal_key_3",
                "value": 13.82
            }]
    })]}],
    schema=StructType([StructField('json_data', StringType(), True)])
)
(
    df1
    .withColumn('actual', f.from_json(f.col('json_data'), f.schema_of_json(f.lit(df1.select(f.col("json_data")).first()[0]))))
    .withColumn('data', f.explode('actual'))
    .drop('actual')
    .withColumn('key2', f.explode('data.key2'))
    .drop('json_data')
    .select('data.key1', 'key2.*').show()
    )
# +-------------+------------+-----+
# |         key1|      signal|value|
# +-------------+------------+-----+
# |1647336730000|signal_key_1| 73.6|
# |1647336730000|signal_key_2|3.375|
# |1647336730000|signal_key_3|13.82|
# +-------------+------------+-----+
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文