Pyspark使用Pivot的用户保留量不同
我正在尝试使用Pyspark创建一个用户保留表,我可以将其转移到AWS胶水中,以创建一个可以在快速观察的ETL作业中查询的ETL作业。
基本上,我有两个表,一张具有用户注册日期,另一张具有用户活动日期。然后将此注册日期与活动日期进行比较,以计算用户活动的注册后多长时间。此后,我想跟踪在某个月份注册有多少用户,在0、1、2周之后等等。正常的队列表按月分组,然后跟踪,这可能会导致一种情况,即在注册后3个月后,然后在2个月后,用户活动较大。
桌子的剪辑和预期的结果可以在下面看到:
- USER_ID 1有5个活动,第2周,第2周和第6周的第2周,第
- 2周和第1周。ASUS_ID2有5个活动,第0周,第1周,第1周,第1周。 ,第2周和第3周的第1周。Auser_ID3
- 有3个活动,第0周,第1周和第4周的1个活动,
但是,
- 有3个唯一用户(ID:1,2,3)在0八月注册后的几周或晚些时。
- 八月注册后1周或更晚,有3个唯一用户(用户_id:1、2、3)。
- ...
- 八月注册后4周或更晚,有2个唯一用户(用户_id:1,3)。
- 8月注册后的5周或更晚,有1个独特的用户(用户_ID:1)。
- 八月注册后的6周或更晚,有1个独特的用户(User_id:1)。
- 八月注册后的7周或晚些时候,有0个唯一用户。
df_reg = df\
.sort(col('user_id').asc(), col('created_at').asc())\
.groupBy('registered_at_month')\
.agg(countDistinct('user_id').alias('reg'))
在几周内,此后应用枢轴函数来获取表:
retention = []
for week in weeks:
print(week)
df_out = df\
.filter((col('diff_week') >= week))\
.sort(col('user_id').asc(), col('created_at').asc())\
.groupBy('registered_at_month')\
.agg(countDistinct('user_id').alias('countDistinct'))\
.withColumn('week', lit(week))
retention.append(df_out)
df_retention = functools.reduce(DataFrame.union, retention)
df_retention_2 = df_retention\
.groupBy('registered_at_month')\
.pivot('week')\
.agg(first('countDistinct'))\
.orderBy('registered_at_month')
是否有一种更干净的方式?最好没有循环。此外,当输入数据变大时,枢轴功能会永远存在,并且每月注册成千上万的用户和数百周的活动?最后,可以使用一些计算的字段直接在快速观看中直接完成?
非常感谢任何帮助!谢谢!
I'm trying to create a user retention table using Pyspark which I can transfer to AWS Glue to create an ETL job that I can query using Athena in QuickSight.
Basically, I have two tables, one with the user registration date and one with the user activity date. This registration date is then compared with the activity date to calculate how long after the registration the user is active. Thereafter I wanna track how many of the users that was registered on a certain month are active after, 0, 1, 2 weeks etc. I therefore wanna calculate the distinct count of users after week 0, after week 1, etc, i.e. not the normal cohort table where they are grouped by month and then tracked which could result in a scenario where the user activity is larger at 3 months after registration then after 2 months.
A snip of the table and the desired outcome can be seen below:
- user_id 1 has 5 activities, 2 at week 0, 2 at week 2 and 1 at week 6.
- user_id 2 has 5 activities, 1 at week 0, 2 at week 1, 1 at week 2 and 1 at week 3.
- user_id 3 has 3 activities, 1 at week 0, 1 at week 1 and 1 at week 4
However,
- There are 3 unique users (id: 1, 2, 3) seen at 0 weeks or later after registrations in August.
- There are 3 unique users (user_id: 1, 2, 3) seen at 1 weeks or later after registrations in August.
- ...
- There are 2 unique users (user_id: 1, 3) seen at 4 weeks or later after registrations in August.
- There are 1 unique user (user_id: 1) seen at 5 weeks or later after registration in August.
- There are 1 unique user (user_id: 1) seen at 6 weeks or later after registration in August.
- There are 0 unique users seen at 7 weeks or later after registration in August.
To get the number of registrations per month I just do a simple groupBy:
df_reg = df\
.sort(col('user_id').asc(), col('created_at').asc())\
.groupBy('registered_at_month')\
.agg(countDistinct('user_id').alias('reg'))
To get the distinct count of users after each week I apply a filter to the dataframe and loop through the weeks and thereafter apply a pivot function to get the table:
retention = []
for week in weeks:
print(week)
df_out = df\
.filter((col('diff_week') >= week))\
.sort(col('user_id').asc(), col('created_at').asc())\
.groupBy('registered_at_month')\
.agg(countDistinct('user_id').alias('countDistinct'))\
.withColumn('week', lit(week))
retention.append(df_out)
df_retention = functools.reduce(DataFrame.union, retention)
df_retention_2 = df_retention\
.groupBy('registered_at_month')\
.pivot('week')\
.agg(first('countDistinct'))\
.orderBy('registered_at_month')
Is there a cleaner way of doing this? Preferably without a for loop. Also, the pivot function takes forever when the input data gets large and there are thousands of users registered per month and hundreds of weeks of activity? Finally, could this be done directly in QuickSight using some calculated fields?
Very thankful for any help! Thank you!
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
是的,有一种更具性能的方法可以做到这一点。在Spark中,当Spark重新组织其执行者之间的数据时,汇总组很昂贵,因为它意味着一个混乱阶段。在当前代码中,您每周要汇总,这意味着您正在执行
n+2
聚合,其中n
是周数:一个用于注册数量用户,n
每周,一个用于枢轴聚合。您可以通过每周在同一聚合中求和,而不是每周求和,然后将其概括。这是代码:
使用
Weeks
包含从0到10的整数的数组,以及以下df
dataframe:您将获得以下
result
output:output:它的性能比您的解决方案
注意:在汇总之前对数据框进行排序是毫无用处的,因为聚合会使数据重新定位。但是,这里没有伤害,因为火花催化剂在聚集之前忽略了那种分类。
Yes, there is a more performant way to do this. In Spark, group by aggregation are expensive as it implies a shuffle phase, when Spark reorganizes data among its executors. In you current code, you're aggregating for each week, meaning you're performing
n+2
aggregations, wheren
is the number of weeks: one for number of registered users,n
for each weeks and one for pivot aggregation.You can reduce this to two aggregations, by summing over each week in the same aggregation, instead of summing per week and then pivot. Here is the code:
With
weeks
array containing the integers from 0 to 10, and the followingdf
dataframe:You get the following
result
output:And it will be more performant than your solution
Note: it is useless to sort your dataframe before aggregation, as aggregation reorders data. However, here there is no harm as Spark Catalyst ignore those kind of sorting before aggregation.