重写Pyspark函数以加入JSON中的列
我需要获取Spark DataDrame的所有列,并创建其他列作为JSON,其中包含键和值作为列名称和值。例如,类似的数据帧:
C1 | C2 | CN |
---|---|---|
10 | 20 | ABC |
99 | CDE | |
40 | 50 |
应该转换为以下方式:
C1 | C2 | CN | JSON |
---|---|---|---|
10 | 20 | ABC | {“ C1”:10,“ C2”:20,“ CN”:“ ABC “} |
30 | def | {“ c1”:99,“ cn”:“ cde”} | |
40 | 50 | {“ c1”:99,c2:50} |
列的名称和数字可能会有所不同,所以我不能明确地通过它。 我正在使用的策略是:
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import StringType
def jsonize_fields(row):
vars = {}
for k, v in row.asDict().items():
if v:
vars[k] = v
return json.dumps(vars)
jsonize_udf = udf(jsonize_fields, StringType())
spark_data_frame = spark_data_frame.withColumn('JSON',
jsonize_udf(struct(*spark_data_frame.columns)))
这效果很好,但是它会使性能降低很多。因此,我想将其转换为不使用UDF的解决方案。是否可以?
I need to get all columns of a Spark Datadrame and create other column as a json having keys and values as column names and values. For example, a Dataframe like this:
C1 | C2 | CN |
---|---|---|
10 | 20 | abc |
99 | cde | |
40 | 50 |
Should be transformed to this:
C1 | C2 | CN | JSON |
---|---|---|---|
10 | 20 | abc | { "C1": 10, "C2": 20, "CN": "abc"} |
30 | def | { "C1": 99, "CN": "cde"} | |
40 | 50 | { "C1": 99, C2: 50} |
The columns names and number may vary, so I can't pass it explicitly.
The strategy I'm using is:
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import StringType
def jsonize_fields(row):
vars = {}
for k, v in row.asDict().items():
if v:
vars[k] = v
return json.dumps(vars)
jsonize_udf = udf(jsonize_fields, StringType())
spark_data_frame = spark_data_frame.withColumn('JSON',
jsonize_udf(struct(*spark_data_frame.columns)))
This works well, but it degraded the performance a lot. So, I would like to convert it to a solution that doesn't use UDF. Is it possible?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
我不认为无需使用UDF即可实现这一目标的直接方法。
但是,Pyspark实际上具有一个内置的
to_json
用于将struct> struct
变成JSON字符串的功能。您不需要写自己的书。在幕后,
to_json
将调用Spark的内部实现该功能。反过来,这消除了与Python UDF相关的开销,并应提高性能。用法与您的自定义UDF非常相似:
I don't believe there's a straightforward way to achieve this without using UDFs.
However, PySpark actually has a built-in
to_json
function for turning aSTRUCT
into a JSON string. You don't need to write your own.Behind the scenes,
to_json
will call Spark's internal implementation of the function. In turn, that removes the overhead associated with Python UDFs and should improve performance.The usage is very similar to your custom UDF:
刚刚找到它:
默认情况下,
to_json
忽略null值(可以通过使用AS AS second Parameteroptions = {“ Indegorenullfields”:false})
),而不是空的一个。如果您也想忽略空值,请在以下内容之前:
Just found it:
By default,
to_json
ignore null values (it can be changed by using as second parameteroptions={"ignoreNullFields": False})
), but not empty ones.If you want to ignore empty values also, put it before: