by ID组并根据Pyspark中的优先级创建列

发布于 2025-01-21 08:33:51 字数 1697 浏览 0 评论 0原文

有人可以帮我下面吗? 我有一个输入数据框架。

IDProcess_Typestp_stagewise
1Loan_Creation手册
1贷款NSTP
1赔偿STP
2Loan_CreationSTP
2报销NSTP
3Loan_Creation4
3Loan_Creation screat_creation4
Loan_CreationLoan_creationManuual
4Loan_CreationNSTP

输出

1ID_CREATIIONinnsstp
1loat_creationSTPMan
1stpSTP手册
报销dotionSTPSTP
2LOAN_CREATIONSTPSTP
2报销NSTPNSTP
3Loan_Creation手册MANUL
3LOAN_CREATIONSTPNSTP
4LOAN_CREATIONNSTP NSTPNSTP NSTP NSTP NSTP
NSTPNSTP NSTP NSTP NSTP需要

分组ID和Process_type列和PRISTIS_TYPE列,并优先列表,MARAUL>>>>>>>> nstp>> STP并创建另一列。

有人可以提供解决这个问题的方法。提前致谢。

略有更改与ID一起,也应在过程类型上完成组。

Can someone help me with the below.
I have an input dataframe.

IDprocess_typeSTP_stagewise
1loan_creationManual
1loan creationNSTP
1reimbursementSTP
2loan_creationSTP
2reimbursementNSTP
3loan_creationManual
3loan_creationSTP
4loan_creationManual
4loan_creationNSTP

Output dataframe required:

IDprocess_typeSTP_stagewiseSTP_type
1loan_creationManualManual
1loan creationNSTPManual
1reimbursementSTPSTP
2loan_creationSTPSTP
2reimbursementNSTPNSTP
3loan_creationManualManual
3loan_creationSTPManual
4loan_creationNSTPNSTP
4loan_creationNSTPNSTP

I need to groupby id and process_type column and prioritize, Manual >> NSTP >> STP and create a different column.

Can someone provide an approach to solve this. Thanks in Advance.

Slight change along with ID, group by should be done on process type also.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

渔村楼浪 2025-01-28 08:33:51

您可以解决此问题的一种方法是在ID中汇总并将所有独特的stp_stagewise收集到列表中,然后用custom_sort_map对其进行排序以获取第一个索引元素,最后将其加入您的主要数据帧

数据准备

s = StringIO("""
ID  STP_stagewise
1   Manual
1   NSTP
1   STP
2   STP
2   NSTP
3   Manual
3   STP
4   Manual
4   NSTP
""")

df = pd.read_csv(s,delimiter='\t')

sparkDF = sql.createDataFrame(df)

sparkDF.show()

+---+-------------+
| ID|STP_stagewise|
+---+-------------+
|  1|       Manual|
|  1|         NSTP|
|  1|          STP|
|  2|          STP|
|  2|         NSTP|
|  3|       Manual|
|  3|          STP|
|  4|       Manual|
|  4|         NSTP|
+---+-------------+

聚合 - 收集设置&排序

custom_sort_map = {'Manual':0,'NSTP':1,'STP':2}

udf_custom_sort = F.udf(lambda x: sorted(x,key=lambda x:custom_sort_map[x]), ArrayType(StringType()))

stpAgg = sparkDF.groupBy(F.col('ID')).agg(F.collect_set(F.col('STP_stagewise')).alias('STP_stagewise_set'))\
                .withColumn('sorted_STP_stagewise_set',udf_custom_sort('STP_stagewise_set'))\
                .withColumn('STP_type',F.col('sorted_STP_stagewise_set').getItem(0))

stpAgg.show()

+---+-------------------+------------------------+--------+
| ID|  STP_stagewise_set|sorted_STP_stagewise_set|STP_type|
+---+-------------------+------------------------+--------+
|  1|[STP, NSTP, Manual]|     [Manual, NSTP, STP]|  Manual|
|  3|      [STP, Manual]|           [Manual, STP]|  Manual|
|  2|        [STP, NSTP]|             [NSTP, STP]|    NSTP|
|  4|     [NSTP, Manual]|          [Manual, NSTP]|  Manual|
+---+-------------------+------------------------+--------+

加入

sparkDF = sparkDF.join(stpAgg
                       ,sparkDF['ID'] == stpAgg['ID']
                       ,'inner'
                      ).select(sparkDF['*'],stpAgg['STP_type'])

sparkDF.show()

+---+-------------+--------+
| ID|STP_stagewise|STP_type|
+---+-------------+--------+
|  1|       Manual|  Manual|
|  1|         NSTP|  Manual|
|  1|          STP|  Manual|
|  3|       Manual|  Manual|
|  3|          STP|  Manual|
|  2|          STP|    NSTP|
|  2|         NSTP|    NSTP|
|  4|       Manual|  Manual|
|  4|         NSTP|  Manual|
+---+-------------+--------+

One way you can solve this is by aggregating at id and collecting all the distinct STP_stagewise into a list and sorting it with a custom_sort_map to get the first index element and finally joining it back to your main DataFrame

Data Preparation

s = StringIO("""
ID  STP_stagewise
1   Manual
1   NSTP
1   STP
2   STP
2   NSTP
3   Manual
3   STP
4   Manual
4   NSTP
""")

df = pd.read_csv(s,delimiter='\t')

sparkDF = sql.createDataFrame(df)

sparkDF.show()

+---+-------------+
| ID|STP_stagewise|
+---+-------------+
|  1|       Manual|
|  1|         NSTP|
|  1|          STP|
|  2|          STP|
|  2|         NSTP|
|  3|       Manual|
|  3|          STP|
|  4|       Manual|
|  4|         NSTP|
+---+-------------+

Aggregation - Collect Set & Sort

custom_sort_map = {'Manual':0,'NSTP':1,'STP':2}

udf_custom_sort = F.udf(lambda x: sorted(x,key=lambda x:custom_sort_map[x]), ArrayType(StringType()))

stpAgg = sparkDF.groupBy(F.col('ID')).agg(F.collect_set(F.col('STP_stagewise')).alias('STP_stagewise_set'))\
                .withColumn('sorted_STP_stagewise_set',udf_custom_sort('STP_stagewise_set'))\
                .withColumn('STP_type',F.col('sorted_STP_stagewise_set').getItem(0))

stpAgg.show()

+---+-------------------+------------------------+--------+
| ID|  STP_stagewise_set|sorted_STP_stagewise_set|STP_type|
+---+-------------------+------------------------+--------+
|  1|[STP, NSTP, Manual]|     [Manual, NSTP, STP]|  Manual|
|  3|      [STP, Manual]|           [Manual, STP]|  Manual|
|  2|        [STP, NSTP]|             [NSTP, STP]|    NSTP|
|  4|     [NSTP, Manual]|          [Manual, NSTP]|  Manual|
+---+-------------------+------------------------+--------+

Join

sparkDF = sparkDF.join(stpAgg
                       ,sparkDF['ID'] == stpAgg['ID']
                       ,'inner'
                      ).select(sparkDF['*'],stpAgg['STP_type'])

sparkDF.show()

+---+-------------+--------+
| ID|STP_stagewise|STP_type|
+---+-------------+--------+
|  1|       Manual|  Manual|
|  1|         NSTP|  Manual|
|  1|          STP|  Manual|
|  3|       Manual|  Manual|
|  3|          STP|  Manual|
|  2|          STP|    NSTP|
|  2|         NSTP|    NSTP|
|  4|       Manual|  Manual|
|  4|         NSTP|  Manual|
+---+-------------+--------+
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文