计算MAPE并应用于Pyspark分组的数据框[@pandas_udf]
目标: 计算每个唯一id
的shee_absolute_percentage_error (mape)。
y
- 真实值yhat
- 预测的值
示例pyspark dataframe:join_df
+----------+----------+-------+---------+----------+----------+
| ID| ds| y| yhat|yhat_upper|yhat_lower|
+----------+----------+-------+---------+----------+----------+
| Ax849b|2021-07-01|1165.59| 1298.809| 1939.1261| 687.48206|
| Ax849b|2021-07-02|1120.69| 1295.552| 1892.4929| 693.786|
| Ax849b|2021-07-03|1120.69| 1294.079| 1923.0253| 664.1514|
| Ax849b|2021-07-04|1120.69|1295.0399| 1947.6392| 639.4879|
| Bz383J|2021-07-03|1108.71|1159.4934| 1917.6515| 652.76624|
| Bz383J|2021-07-04|1062.77|1191.2385| 1891.9268| 665.9529|
+----------+----------+-------+---------+----------+----------+
final_schema =StructType([
StructField('ds',DateType()),
StructField('ID',IntegerType()),
StructField('y',FloatType()),
StructField('yhat',FloatType()),
StructField('yhat_upper',FloatType()),
StructField('yhat_lower',FloatType()),
StructField('mape',FloatType())
])
我尝试通过创建uff
并应用并应用了它在id
上使用应用
函数。
from sklearn.metrics import mean_absolute_percentage_error
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf(final_schema, PandasUDFType.GROUPED_MAP)
def gr_mape_val(join_df):
mape = mean_absolute_percentage_error(join_df["y"], join_df["yhat"])
join_df['mape'] = mape
return join_df
df_apply = join_df.groupby('ID').applyInPandas(gr_mape_val, final_schema)
df_apply.show()
但是,我遇到了错误:
pythonexception:'typeError:返回用户定义函数的返回类型应为pandas.dataframe,但IS< class'numpy.float32'>'
我了解我要求Mape作为Numpy输出,应该是数据框架。但是,我确定是否知道需要做些不同的事情,以便为每个id
获得MAPE。
Goal:
Calculate mean_absolute_percentage_error
(MAPE) for each unique ID
.
y
- real valueyhat
- predicted value
Sample PySpark Dataframe: join_df
+----------+----------+-------+---------+----------+----------+
| ID| ds| y| yhat|yhat_upper|yhat_lower|
+----------+----------+-------+---------+----------+----------+
| Ax849b|2021-07-01|1165.59| 1298.809| 1939.1261| 687.48206|
| Ax849b|2021-07-02|1120.69| 1295.552| 1892.4929| 693.786|
| Ax849b|2021-07-03|1120.69| 1294.079| 1923.0253| 664.1514|
| Ax849b|2021-07-04|1120.69|1295.0399| 1947.6392| 639.4879|
| Bz383J|2021-07-03|1108.71|1159.4934| 1917.6515| 652.76624|
| Bz383J|2021-07-04|1062.77|1191.2385| 1891.9268| 665.9529|
+----------+----------+-------+---------+----------+----------+
final_schema =StructType([
StructField('ds',DateType()),
StructField('ID',IntegerType()),
StructField('y',FloatType()),
StructField('yhat',FloatType()),
StructField('yhat_upper',FloatType()),
StructField('yhat_lower',FloatType()),
StructField('mape',FloatType())
])
I have tried by creating an uff
and applyed it on ID
s using apply
function.
from sklearn.metrics import mean_absolute_percentage_error
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf(final_schema, PandasUDFType.GROUPED_MAP)
def gr_mape_val(join_df):
mape = mean_absolute_percentage_error(join_df["y"], join_df["yhat"])
join_df['mape'] = mape
return join_df
df_apply = join_df.groupby('ID').applyInPandas(gr_mape_val, final_schema)
df_apply.show()
However, I am getting the error:
PythonException: 'TypeError: Return type of the user-defined function should be pandas.DataFrame, but is <class 'numpy.float32'>'
I understand that I am requesting for MAPE as numpy output and it should be dataframe. But I am sure if I know what exactly needs to be done differently in order to get MAPE for each ID
.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您需要返回
dataframe
with pandasudftype.grouped_map ,由于您要返回一个numpy数组,因此您会看到异常。您还需要通过函数从组中对最终返回的数据帧进行修改
- applyinpandas ,我添加了其用法以及
数据准备
pandas udf -useage usage applip applip applip applip
applyininpandas
You need return a
DataFrame
with PandasUDFType.GROUPED_MAP , since you are returning a numpy array , hence you see the exception.You need to modify the schema as well towards your final returned dataframe from the group by function
Also you should use - applyInPandas , I have added its usage as well
Data Preparation
Pandas UDF - Usage
applyInPandas