使用UDF时,无效的参数,而不是字符串或列

发布于 2025-01-27 20:15:57 字数 1295 浏览 2 评论 0原文

我有一个数据质量类可以在DF上进行检查。我使用此类定义的方法来运行这些检查(它们总是返回3个单元)。这些方法是由我想从另一个df调用的UDF调用的:

@F.udf(StructType())
def dq_check_wrapper(df, col, _test):
  
  if _test == 'is_null':
    return Valid_df(df).is_not_null(col).execute()

  elif _test == 'unique':
    return Valid_df(df).is_unique(col).execute()

说我想评估DQ上的DQ:

df = spark.createDataFrame(
  [
     (None, 128.0, 1),(110, 127.0, 2),(111, 127.0, 3),(111, 127.0, 4)
    ,(111, 126.0, 5),(111, 127.0, 6),(109, 126.0, 7),(111, 126.0, 1001)
    ,(114, 126.0, 1003),(115, 83.0, 1064),(116, 127.0, 1066)
  ], ['HR', 'maxABP', 'Second']
)

为了使其动态,我想使用元数据DF:

metadata = sqlContext.sql("select 'HR' as col, 'is_null' as dq_check")

+---+--------+
|col|dq_check|
+---+--------+
| HR| is_null|
+---+--------+

但是,当我尝试时,我会得到:

metadata\
  .withColumn("valid_dq", dq_check_wrapper(df, metadata.col, metadata.dq_check))\
  .show()

我得到一个typeerror

TypeError: Invalid argument, not a string or column: DataFrame[HR: bigint, maxABP: double, Second: bigint] of type <class 'pyspark.sql.dataframe.DataFrame'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

为什么?

I have a data quality class to perform checks on a df. I use methods defined in this class to run these checks (they always return tuples of 3). These methods are called by a udf that I want to call from another df:

@F.udf(StructType())
def dq_check_wrapper(df, col, _test):
  
  if _test == 'is_null':
    return Valid_df(df).is_not_null(col).execute()

  elif _test == 'unique':
    return Valid_df(df).is_unique(col).execute()

Say I want to asses the DQ on this df:

df = spark.createDataFrame(
  [
     (None, 128.0, 1),(110, 127.0, 2),(111, 127.0, 3),(111, 127.0, 4)
    ,(111, 126.0, 5),(111, 127.0, 6),(109, 126.0, 7),(111, 126.0, 1001)
    ,(114, 126.0, 1003),(115, 83.0, 1064),(116, 127.0, 1066)
  ], ['HR', 'maxABP', 'Second']
)

To make it dynamic, I want to use a metadata df:

metadata = sqlContext.sql("select 'HR' as col, 'is_null' as dq_check")

+---+--------+
|col|dq_check|
+---+--------+
| HR| is_null|
+---+--------+

But then, when I try:

metadata\
  .withColumn("valid_dq", dq_check_wrapper(df, metadata.col, metadata.dq_check))\
  .show()

I get a TypeError:

TypeError: Invalid argument, not a string or column: DataFrame[HR: bigint, maxABP: double, Second: bigint] of type <class 'pyspark.sql.dataframe.DataFrame'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

Why?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

月朦胧 2025-02-03 20:15:57

因为如果我不告知df是类型的数据框架,则它会以字符串为单词:

def dq_check_wrapper(df: DataFrame, col, _test):

Because if I don't inform that df is of type DataFrame, it infers as String:

def dq_check_wrapper(df: DataFrame, col, _test):
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文