重写Pyspark函数以加入JSON中的列

发布于 2025-02-10 20:57:57 字数 1282 浏览 1 评论 0原文

我需要获取Spark DataDrame的所有列,并创建其他列作为JSON,其中包含键和值作为列名称和值。例如,类似的数据帧:

C1C2CN
1020ABC
99CDE
4050

应该转换为以下方式:

C1C2CNJSON
1020ABC{“ C1”:10,“ C2”:20,“ CN”:“ ABC “}
30def{“ c1”:99,“ cn”:“ cde”}
4050{“ c1”:99,c2:50}

列的名称和数字可能会有所不同,所以我不能明确地通过它。 我正在使用的策略是:

from pyspark.sql.functions import udf, struct
from pyspark.sql.types import StringType

def jsonize_fields(row):
    vars = {}
    for k, v in row.asDict().items():
        if v:
            vars[k] = v
    return json.dumps(vars)

jsonize_udf = udf(jsonize_fields, StringType())
spark_data_frame = spark_data_frame.withColumn('JSON',
jsonize_udf(struct(*spark_data_frame.columns)))

这效果很好,但是它会使性能降低很多。因此,我想将其转换为不使用UDF的解决方案。是否可以?

I need to get all columns of a Spark Datadrame and create other column as a json having keys and values as column names and values. For example, a Dataframe like this:

C1C2CN
1020abc
99cde
4050

Should be transformed to this:

C1C2CNJSON
1020abc{ "C1": 10, "C2": 20, "CN": "abc"}
30def{ "C1": 99, "CN": "cde"}
4050{ "C1": 99, C2: 50}

The columns names and number may vary, so I can't pass it explicitly.
The strategy I'm using is:

from pyspark.sql.functions import udf, struct
from pyspark.sql.types import StringType

def jsonize_fields(row):
    vars = {}
    for k, v in row.asDict().items():
        if v:
            vars[k] = v
    return json.dumps(vars)

jsonize_udf = udf(jsonize_fields, StringType())
spark_data_frame = spark_data_frame.withColumn('JSON',
jsonize_udf(struct(*spark_data_frame.columns)))

This works well, but it degraded the performance a lot. So, I would like to convert it to a solution that doesn't use UDF. Is it possible?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

爱给你人给你 2025-02-17 20:57:57

我不认为无需使用UDF即可实现这一目标的直接方法。

但是,Pyspark实际上具有一个内置的 to_json 用于将struct> struct变成JSON字符串的功能。您不需要写自己的书。

在幕后,to_json将调用Spark的内部实现该功能。反过来,这消除了与Python UDF相关的开销,并应提高性能。

用法与您的自定义UDF非常相似:

from pyspark.sql.functions import struct, to_json

spark_data_frame = spark_data_frame.withColumn(
    'JSON',
    to_json(struct(*spark_data_frame.columns))
)

I don't believe there's a straightforward way to achieve this without using UDFs.

However, PySpark actually has a built-in to_json function for turning a STRUCT into a JSON string. You don't need to write your own.

Behind the scenes, to_json will call Spark's internal implementation of the function. In turn, that removes the overhead associated with Python UDFs and should improve performance.

The usage is very similar to your custom UDF:

from pyspark.sql.functions import struct, to_json

spark_data_frame = spark_data_frame.withColumn(
    'JSON',
    to_json(struct(*spark_data_frame.columns))
)
孤云独去闲 2025-02-17 20:57:57

刚刚找到它:

from pyspark.sql.functions import to_json

spark_data_frame = spark_data_frame.withColumn('JSON',
    to_json(struct(*spark_data_frame.columns)))

默认情况下,to_json忽略null值(可以通过使用AS AS second Parameter options = {“ Indegorenullfields”:false})),而不是空的一个。
如果您也想忽略空值,请在以下内容之前:

from pyspark.sql.functions import col,when

spark_data_frame = spark_data_frame.select(
    [when(col(c)=="",None).otherwise(col(c)).alias(c) for c in spark_data_frame.columns])

Just found it:

from pyspark.sql.functions import to_json

spark_data_frame = spark_data_frame.withColumn('JSON',
    to_json(struct(*spark_data_frame.columns)))

By default, to_json ignore null values (it can be changed by using as second parameter options={"ignoreNullFields": False})), but not empty ones.
If you want to ignore empty values also, put it before:

from pyspark.sql.functions import col,when

spark_data_frame = spark_data_frame.select(
    [when(col(c)=="",None).otherwise(col(c)).alias(c) for c in spark_data_frame.columns])
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文