pyspark 中带有字典查找的矢量化 pandas udf

发布于 2025-01-15 06:46:53 字数 2844 浏览 3 评论 0原文

我正在尝试学习在 pyspark (Databricks) 中使用 pandas_udf

其中一项作业是编写一个 pandas_udf 来按星期几排序。我知道如何使用 Spark udf 执行此操作:

from pyspark.sql.functions import *

data = [('Sun', 282905.5), ('Mon', 238195.5), ('Thu', 264620.0), ('Sat', 278482.0), ('Wed', 227214.0)]
schema = 'day string, avg_users double'
df = spark.createDataFrame(data, schema)
print('Original')
df.show()


@udf()
def udf(day: str) -> str:
    dow = {"Mon": "1", "Tue": "2", "Wed": "3", "Thu": "4",
           "Fri": "5", "Sat": "6", "Sun": "7"}
    return dow[day] + '-' + day

print('with spark udf')
final_df = df.select(col('avg_users'), udf(col('day')).alias('day')).sort('day')
final_df.show()

打印:

Original
+---+-----------+
|day|  avg_users|
+---+-----------+
|Sun|   282905.5|
|Mon|   238195.5|
|Thu|   264620.0|
|Sat|   278482.0|
|Wed|   227214.0|
+---+-----------+

with spark udf
+-----------+-----+
|  avg_users|  day|
+-----------+-----+
|   238195.5|1-Mon|
|   227214.0|3-Wed|
|   264620.0|4-Thu|
|   278482.0|6-Sat|
|   282905.5|7-Sun|
+-----------+-----+

尝试使用 pandas_udf 执行相同操作,

import pandas as pd


@pandas_udf('string')
def p_udf(day: pd.Series) -> pd.Series:
    dow = {"Mon": "1", "Tue": "2", "Wed": "3", "Thu": "4",
           "Fri": "5", "Sat": "6", "Sun": "7"}
    return dow[day.str] + '-' + day.str


p_final_df = df.select(df.avg_users, p_udf(df.day))

print('with pandas udf')
p_final_df.show()

我得到 KeyError:.我认为它来自 dow[day.str],这有点道理。

我也尝试过:

return dow[day.str.__str__()] + '-' + day.str # KeyError: .... StringMethods
return dow[str(day.str)] + '-' + day.str      # KeyError: .... StringMethods
return dow[day.str.upper()] + '-' + day.str   # TypeError: unhashable type: 'Series'
return f"{dow[day.str]}-{day.str}"            # KeyError: .... StringMethods (but I think this is logically
                                              # wrong, returning a string instead of a Series)

我读过:

I'm trying to learn to use pandas_udf in pyspark (Databricks).

One of the assignments is to write a pandas_udf to sort by day of the week. I know how to do this using spark udf:

from pyspark.sql.functions import *

data = [('Sun', 282905.5), ('Mon', 238195.5), ('Thu', 264620.0), ('Sat', 278482.0), ('Wed', 227214.0)]
schema = 'day string, avg_users double'
df = spark.createDataFrame(data, schema)
print('Original')
df.show()


@udf()
def udf(day: str) -> str:
    dow = {"Mon": "1", "Tue": "2", "Wed": "3", "Thu": "4",
           "Fri": "5", "Sat": "6", "Sun": "7"}
    return dow[day] + '-' + day

print('with spark udf')
final_df = df.select(col('avg_users'), udf(col('day')).alias('day')).sort('day')
final_df.show()

Prints:

Original
+---+-----------+
|day|  avg_users|
+---+-----------+
|Sun|   282905.5|
|Mon|   238195.5|
|Thu|   264620.0|
|Sat|   278482.0|
|Wed|   227214.0|
+---+-----------+

with spark udf
+-----------+-----+
|  avg_users|  day|
+-----------+-----+
|   238195.5|1-Mon|
|   227214.0|3-Wed|
|   264620.0|4-Thu|
|   278482.0|6-Sat|
|   282905.5|7-Sun|
+-----------+-----+

Trying to do the same with pandas_udf

import pandas as pd


@pandas_udf('string')
def p_udf(day: pd.Series) -> pd.Series:
    dow = {"Mon": "1", "Tue": "2", "Wed": "3", "Thu": "4",
           "Fri": "5", "Sat": "6", "Sun": "7"}
    return dow[day.str] + '-' + day.str


p_final_df = df.select(df.avg_users, p_udf(df.day))

print('with pandas udf')
p_final_df.show()

I get KeyError: <pandas.core.strings.accessor.StringMethods object at 0x7f31197cd9a0>. I think it's coming from dow[day.str], which kinda makes sense.

I also tried:

return dow[day.str.__str__()] + '-' + day.str # KeyError: .... StringMethods
return dow[str(day.str)] + '-' + day.str      # KeyError: .... StringMethods
return dow[day.str.upper()] + '-' + day.str   # TypeError: unhashable type: 'Series'
return f"{dow[day.str]}-{day.str}"            # KeyError: .... StringMethods (but I think this is logically
                                              # wrong, returning a string instead of a Series)

I've read:

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

暖树树初阳… 2025-01-22 06:46:53

单独使用 .str 方法而不进行任何实际的矢量化转换会给您带来错误。另外,您不能使用整个系列作为 dow 字典的键。对 pandas.Series 使用 map 方法:

from pyspark.sql.functions import *
import pandas as pd

data = [('Sun', 282905.5), ('Mon', 238195.5), ('Thu', 264620.0), ('Sat', 278482.0), ('Wed', 227214.0)]
schema = 'day string, avg_users double'
df = spark.createDataFrame(data, schema)

@pandas_udf("string")
def p_udf(day: pd.Series) -> pd.Series:
    dow = {"Mon": "1", "Tue": "2", "Wed": "3", "Thu": "4",
           "Fri": "5", "Sat": "6", "Sun": "7"}
    return day.map(dow) + '-' + day

df.select(df.avg_users, p_udf(df.day).alias("day")).show()

+---------+-----+
|avg_users|  day|
+---------+-----+
| 282905.5|7-Sun|
| 238195.5|1-Mon|
| 264620.0|4-Thu|
| 278482.0|6-Sat|
| 227214.0|3-Wed|
+---------+-----+

Using the .str method alone without any actual vectorized transformation was giving you the error. Also, you can not use the whole series as a key for your dow dict. Use a map method for pandas.Series:

from pyspark.sql.functions import *
import pandas as pd

data = [('Sun', 282905.5), ('Mon', 238195.5), ('Thu', 264620.0), ('Sat', 278482.0), ('Wed', 227214.0)]
schema = 'day string, avg_users double'
df = spark.createDataFrame(data, schema)

@pandas_udf("string")
def p_udf(day: pd.Series) -> pd.Series:
    dow = {"Mon": "1", "Tue": "2", "Wed": "3", "Thu": "4",
           "Fri": "5", "Sat": "6", "Sun": "7"}
    return day.map(dow) + '-' + day

df.select(df.avg_users, p_udf(df.day).alias("day")).show()

+---------+-----+
|avg_users|  day|
+---------+-----+
| 282905.5|7-Sun|
| 238195.5|1-Mon|
| 264620.0|4-Thu|
| 278482.0|6-Sat|
| 227214.0|3-Wed|
+---------+-----+
清秋悲枫 2025-01-22 06:46:53

我们使用 groupeddata 和 orderby 在执行 udf 后。 Pandas sort_values 在 udfs 中存在很大问题。

基本上,在 udf 中,我使用 python 生成数字,然后将它们连接回日期列。

from pyspark.sql.functions import pandas_udf
import pandas as pd
from pyspark.sql.types import *
import calendar
def sortdf(pdf):
  day=pdf.day
  pdf =pdf.assign(day=(day.map(dict(zip(calendar.day_abbr, range(7))))+1).astype(str) + '-'+day)
 
  return pdf

df.groupby('avg_users').applyInPandas(sortdf, schema=df.schema).show()

+-----+---------+
|  day|avg_users|
+-----+---------+
|3-Wed| 227214.0|
|1-Mon| 238195.5|
|4-Thu| 264620.0|
|6-Sat| 278482.0|
|7-Sun| 282905.5|
+-----+---------+

what about we return a dataframe using groupeddata and orderby after you do the udf. Pandas sort_values is quite problematic within udfs.

Basically, in the udf I generate the numbers using python and then concatenate them back to the day column.

from pyspark.sql.functions import pandas_udf
import pandas as pd
from pyspark.sql.types import *
import calendar
def sortdf(pdf):
  day=pdf.day
  pdf =pdf.assign(day=(day.map(dict(zip(calendar.day_abbr, range(7))))+1).astype(str) + '-'+day)
 
  return pdf

df.groupby('avg_users').applyInPandas(sortdf, schema=df.schema).show()

+-----+---------+
|  day|avg_users|
+-----+---------+
|3-Wed| 227214.0|
|1-Mon| 238195.5|
|4-Thu| 264620.0|
|6-Sat| 278482.0|
|7-Sun| 282905.5|
+-----+---------+
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文