计算MAPE并应用于Pyspark分组的数据框[@pandas_udf]

发布于 2025-01-23 05:05:05 字数 1926 浏览 0 评论 0原文

目标: 计算每个唯一id的shee_absolute_percentage_error (mape)。

  • y - 真实值
  • yhat - 预测的值

示例pyspark dataframe:join_df

+----------+----------+-------+---------+----------+----------+
|        ID|        ds|      y|     yhat|yhat_upper|yhat_lower|
+----------+----------+-------+---------+----------+----------+
|    Ax849b|2021-07-01|1165.59| 1298.809| 1939.1261| 687.48206|
|    Ax849b|2021-07-02|1120.69| 1295.552| 1892.4929|   693.786|
|    Ax849b|2021-07-03|1120.69| 1294.079| 1923.0253|  664.1514|
|    Ax849b|2021-07-04|1120.69|1295.0399| 1947.6392|  639.4879|
|    Bz383J|2021-07-03|1108.71|1159.4934| 1917.6515| 652.76624|
|    Bz383J|2021-07-04|1062.77|1191.2385| 1891.9268|  665.9529|
+----------+----------+-------+---------+----------+----------+

final_schema =StructType([
  StructField('ds',DateType()),
  StructField('ID',IntegerType()),
  StructField('y',FloatType()),
  StructField('yhat',FloatType()),
  StructField('yhat_upper',FloatType()),
  StructField('yhat_lower',FloatType()),
  StructField('mape',FloatType())
  ])

我尝试通过创建uff并应用并应用了它在id上使用应用函数。

from sklearn.metrics import mean_absolute_percentage_error
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf(final_schema, PandasUDFType.GROUPED_MAP)
def gr_mape_val(join_df):
  
  mape = mean_absolute_percentage_error(join_df["y"], join_df["yhat"]) 
  join_df['mape'] = mape
  
  return join_df

df_apply = join_df.groupby('ID').applyInPandas(gr_mape_val, final_schema)
df_apply.show()

但是,我遇到了错误:

pythonexception:'typeError:返回用户定义函数的返回类型应为pandas.dataframe,但IS< class'numpy.float32'>'

我了解我要求Mape作为Numpy输出,应该是数据框架。但是,我确定是否知道需要做些不同的事情,以便为每个id获得MAPE。

Goal:
Calculate mean_absolute_percentage_error (MAPE) for each unique ID.

  • y - real value
  • yhat - predicted value

Sample PySpark Dataframe: join_df

+----------+----------+-------+---------+----------+----------+
|        ID|        ds|      y|     yhat|yhat_upper|yhat_lower|
+----------+----------+-------+---------+----------+----------+
|    Ax849b|2021-07-01|1165.59| 1298.809| 1939.1261| 687.48206|
|    Ax849b|2021-07-02|1120.69| 1295.552| 1892.4929|   693.786|
|    Ax849b|2021-07-03|1120.69| 1294.079| 1923.0253|  664.1514|
|    Ax849b|2021-07-04|1120.69|1295.0399| 1947.6392|  639.4879|
|    Bz383J|2021-07-03|1108.71|1159.4934| 1917.6515| 652.76624|
|    Bz383J|2021-07-04|1062.77|1191.2385| 1891.9268|  665.9529|
+----------+----------+-------+---------+----------+----------+

final_schema =StructType([
  StructField('ds',DateType()),
  StructField('ID',IntegerType()),
  StructField('y',FloatType()),
  StructField('yhat',FloatType()),
  StructField('yhat_upper',FloatType()),
  StructField('yhat_lower',FloatType()),
  StructField('mape',FloatType())
  ])

I have tried by creating an uff and applyed it on IDs using apply function.

from sklearn.metrics import mean_absolute_percentage_error
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf(final_schema, PandasUDFType.GROUPED_MAP)
def gr_mape_val(join_df):
  
  mape = mean_absolute_percentage_error(join_df["y"], join_df["yhat"]) 
  join_df['mape'] = mape
  
  return join_df

df_apply = join_df.groupby('ID').applyInPandas(gr_mape_val, final_schema)
df_apply.show()

However, I am getting the error:

PythonException: 'TypeError: Return type of the user-defined function should be pandas.DataFrame, but is <class 'numpy.float32'>'

I understand that I am requesting for MAPE as numpy output and it should be dataframe. But I am sure if I know what exactly needs to be done differently in order to get MAPE for each ID.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

半世蒼涼 2025-01-30 05:05:05

您需要返回dataframe with pandasudftype.grouped_map ,由于您要返回一个numpy数组,因此您会看到异常。

您还需要通过函数从组中对最终返回的数据帧进行修改

- applyinpandas ,我添加了其用法以及

数据准备

s= StringIO("""
ID,ds,y,yhat,yhat_upper,yhat_lower
Ax849b,2021-07-01,1165.59, 1298.809, 1939.1261, 687.48206
Ax849b,2021-07-02,1120.69, 1295.552, 1892.4929,   693.786
Ax849b,2021-07-03,1120.69, 1294.079, 1923.0253,  664.1514
Ax849b,2021-07-04,1120.69,1295.0399, 1947.6392,  639.4879
Bz383J,2021-07-03,1108.71,1159.4934, 1917.6515, 652.76624
Bz383J,2021-07-04,1062.77,1191.2385, 1891.9268,  665.9529
""")

df = pd.read_csv(s,delimiter=',')

sparkDF = sql.createDataFrame(df)

sparkDF.show()

+------+----------+-------+---------+----------+----------+
|    ID|        ds|      y|     yhat|yhat_upper|yhat_lower|
+------+----------+-------+---------+----------+----------+
|Ax849b|2021-07-01|1165.59| 1298.809| 1939.1261| 687.48206|
|Ax849b|2021-07-02|1120.69| 1295.552| 1892.4929|   693.786|
|Ax849b|2021-07-03|1120.69| 1294.079| 1923.0253|  664.1514|
|Ax849b|2021-07-04|1120.69|1295.0399| 1947.6392|  639.4879|
|Bz383J|2021-07-03|1108.71|1159.4934| 1917.6515| 652.76624|
|Bz383J|2021-07-04|1062.77|1191.2385| 1891.9268|  665.9529|
+------+----------+-------+---------+----------+----------+

pandas udf -useage usage applip applip applip applip

final_schema =StructType([
  StructField('ID',StringType()),
  StructField('ds',StringType()),
  StructField('y',FloatType()),
  StructField('yhat',FloatType()),
  StructField('yhat_lower',FloatType()),
  StructField('yhat_upper',FloatType()),
  StructField('mape',FloatType())
  ])

@F.pandas_udf(final_schema, PandasUDFType.GROUPED_MAP)
def gr_mape_val(join_df):
    
    mape = mean_absolute_percentage_error(join_df["y"], join_df["yhat"]) 
    
    join_df['mape'] = mape
    
    return join_df


sparkDF.groupby('ID').apply(gr_mape_val).show()

+------+----------+-------+---------+----------+----------+-----------+
|    ID|        ds|      y|     yhat|yhat_lower|yhat_upper|       mape|
+------+----------+-------+---------+----------+----------+-----------+
|Ax849b|2021-07-01|1165.59| 1298.809| 687.48206| 1939.1261| 0.14515346|
|Ax849b|2021-07-02|1120.69| 1295.552|   693.786| 1892.4929| 0.14515346|
|Ax849b|2021-07-03|1120.69| 1294.079|  664.1514| 1923.0253| 0.14515346|
|Ax849b|2021-07-04|1120.69|1295.0399|  639.4879| 1947.6392| 0.14515346|
|Bz383J|2021-07-03|1108.71|1159.4934| 652.76624| 1917.6515|0.083342426|
|Bz383J|2021-07-04|1062.77|1191.2385|  665.9529| 1891.9268|0.083342426|
+------+----------+-------+---------+----------+----------+-----------+

applyininpandas

final_schema =StructType([
  StructField('ID',StringType()),
  StructField('ds',StringType()),
  StructField('y',FloatType()),
  StructField('yhat',FloatType()),
  StructField('yhat_lower',FloatType()),
  StructField('yhat_upper',FloatType()),
  StructField('mape',FloatType())
  ])


def gr_mape_val(join_df):
    
    mape = mean_absolute_percentage_error(join_df["y"], join_df["yhat"]) 
    
    join_df['mape'] = mape
    
    return join_df


sparkDF.groupby('ID').applyInPandas(gr_mape_val,final_schema).show()

+------+----------+-------+---------+----------+----------+-----------+
|    ID|        ds|      y|     yhat|yhat_lower|yhat_upper|       mape|
+------+----------+-------+---------+----------+----------+-----------+
|Ax849b|2021-07-01|1165.59| 1298.809| 687.48206| 1939.1261| 0.14515346|
|Ax849b|2021-07-02|1120.69| 1295.552|   693.786| 1892.4929| 0.14515346|
|Ax849b|2021-07-03|1120.69| 1294.079|  664.1514| 1923.0253| 0.14515346|
|Ax849b|2021-07-04|1120.69|1295.0399|  639.4879| 1947.6392| 0.14515346|
|Bz383J|2021-07-03|1108.71|1159.4934| 652.76624| 1917.6515|0.083342426|
|Bz383J|2021-07-04|1062.77|1191.2385|  665.9529| 1891.9268|0.083342426|
+------+----------+-------+---------+----------+----------+-----------+

You need return a DataFrame with PandasUDFType.GROUPED_MAP , since you are returning a numpy array , hence you see the exception.

You need to modify the schema as well towards your final returned dataframe from the group by function

Also you should use - applyInPandas , I have added its usage as well

Data Preparation

s= StringIO("""
ID,ds,y,yhat,yhat_upper,yhat_lower
Ax849b,2021-07-01,1165.59, 1298.809, 1939.1261, 687.48206
Ax849b,2021-07-02,1120.69, 1295.552, 1892.4929,   693.786
Ax849b,2021-07-03,1120.69, 1294.079, 1923.0253,  664.1514
Ax849b,2021-07-04,1120.69,1295.0399, 1947.6392,  639.4879
Bz383J,2021-07-03,1108.71,1159.4934, 1917.6515, 652.76624
Bz383J,2021-07-04,1062.77,1191.2385, 1891.9268,  665.9529
""")

df = pd.read_csv(s,delimiter=',')

sparkDF = sql.createDataFrame(df)

sparkDF.show()

+------+----------+-------+---------+----------+----------+
|    ID|        ds|      y|     yhat|yhat_upper|yhat_lower|
+------+----------+-------+---------+----------+----------+
|Ax849b|2021-07-01|1165.59| 1298.809| 1939.1261| 687.48206|
|Ax849b|2021-07-02|1120.69| 1295.552| 1892.4929|   693.786|
|Ax849b|2021-07-03|1120.69| 1294.079| 1923.0253|  664.1514|
|Ax849b|2021-07-04|1120.69|1295.0399| 1947.6392|  639.4879|
|Bz383J|2021-07-03|1108.71|1159.4934| 1917.6515| 652.76624|
|Bz383J|2021-07-04|1062.77|1191.2385| 1891.9268|  665.9529|
+------+----------+-------+---------+----------+----------+

Pandas UDF - Usage

final_schema =StructType([
  StructField('ID',StringType()),
  StructField('ds',StringType()),
  StructField('y',FloatType()),
  StructField('yhat',FloatType()),
  StructField('yhat_lower',FloatType()),
  StructField('yhat_upper',FloatType()),
  StructField('mape',FloatType())
  ])

@F.pandas_udf(final_schema, PandasUDFType.GROUPED_MAP)
def gr_mape_val(join_df):
    
    mape = mean_absolute_percentage_error(join_df["y"], join_df["yhat"]) 
    
    join_df['mape'] = mape
    
    return join_df


sparkDF.groupby('ID').apply(gr_mape_val).show()

+------+----------+-------+---------+----------+----------+-----------+
|    ID|        ds|      y|     yhat|yhat_lower|yhat_upper|       mape|
+------+----------+-------+---------+----------+----------+-----------+
|Ax849b|2021-07-01|1165.59| 1298.809| 687.48206| 1939.1261| 0.14515346|
|Ax849b|2021-07-02|1120.69| 1295.552|   693.786| 1892.4929| 0.14515346|
|Ax849b|2021-07-03|1120.69| 1294.079|  664.1514| 1923.0253| 0.14515346|
|Ax849b|2021-07-04|1120.69|1295.0399|  639.4879| 1947.6392| 0.14515346|
|Bz383J|2021-07-03|1108.71|1159.4934| 652.76624| 1917.6515|0.083342426|
|Bz383J|2021-07-04|1062.77|1191.2385|  665.9529| 1891.9268|0.083342426|
+------+----------+-------+---------+----------+----------+-----------+

applyInPandas

final_schema =StructType([
  StructField('ID',StringType()),
  StructField('ds',StringType()),
  StructField('y',FloatType()),
  StructField('yhat',FloatType()),
  StructField('yhat_lower',FloatType()),
  StructField('yhat_upper',FloatType()),
  StructField('mape',FloatType())
  ])


def gr_mape_val(join_df):
    
    mape = mean_absolute_percentage_error(join_df["y"], join_df["yhat"]) 
    
    join_df['mape'] = mape
    
    return join_df


sparkDF.groupby('ID').applyInPandas(gr_mape_val,final_schema).show()

+------+----------+-------+---------+----------+----------+-----------+
|    ID|        ds|      y|     yhat|yhat_lower|yhat_upper|       mape|
+------+----------+-------+---------+----------+----------+-----------+
|Ax849b|2021-07-01|1165.59| 1298.809| 687.48206| 1939.1261| 0.14515346|
|Ax849b|2021-07-02|1120.69| 1295.552|   693.786| 1892.4929| 0.14515346|
|Ax849b|2021-07-03|1120.69| 1294.079|  664.1514| 1923.0253| 0.14515346|
|Ax849b|2021-07-04|1120.69|1295.0399|  639.4879| 1947.6392| 0.14515346|
|Bz383J|2021-07-03|1108.71|1159.4934| 652.76624| 1917.6515|0.083342426|
|Bz383J|2021-07-04|1062.77|1191.2385|  665.9529| 1891.9268|0.083342426|
+------+----------+-------+---------+----------+----------+-----------+
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文