按ID进行汇总,计数日期字段,并在时间字段上汇总

发布于 2025-01-18 17:09:25 字数 1844 浏览 4 评论 0原文

输入数据帧:

IDDATEPERSONAL_AUXOUTBOUND_AUX
11/3/20220:09:000:00:08
11/4/20220:19:000:00:40
11/5/20220:13:000:00:33
11/6/20220:08:000:00:22
11/7/20220:13:000:00:13
21/3/20220:13:500:00:15
21/4/20220: 14:000:00:33
22022 年 1 月 5 日0:09:000:00:21
22022年1月6日0:14:000:00:12
32022年1月3日0:14:500:00:17
32022年1月4日0:15:000: 00:34
31/5/20220:10:000:00:23
32022年1月6日0:15:000:00:14
32022年1月7日 0:14:500:00:17
------------------- ----------------

输出数据帧: 按 ID 分组,将日期计算为工作日期、personal_aux 的总和以及 outbound_aux

ID的总和Day_workedSum_personal_AUXSum_outbound_aux
151:02:000:1:56
240:50:500:1:21
351: 09:000:1:45

有人可以指导在 pyspark 中的时间列上进行聚合吗?提前致谢

Input Dataframe:

IDDATEPERSONAL_AUXOUTBOUND_AUX
11/3/20220:09:000:00:08
11/4/20220:19:000:00:40
11/5/20220:13:000:00:33
11/6/20220:08:000:00:22
11/7/20220:13:000:00:13
21/3/20220:13:500:00:15
21/4/20220:14:000:00:33
21/5/20220:09:000:00:21
21/6/20220:14:000:00:12
31/3/20220:14:500:00:17
31/4/20220:15:000:00:34
31/5/20220:10:000:00:23
31/6/20220:15:000:00:14
31/7/20220:14:500:00:17
-----------------------------------

Output Dataframe:
group by ID, count the Date as date worked, sum of personal_aux and sum of outbound_aux

IDDay_workedSum_personal_AUXSum_outbound_aux
151:02:000:1:56
240:50:500:1:21
351:09:000:1:45

Can someone guide, in doing aggregate over a time column in pyspark. Thanks in Advance

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

相守太难 2025-01-25 17:09:25
from pyspark.sql import functions as F
from pyspark.sql import types as T
import datetime

def secscon(x):     
    sec = x     
    return str(datetime.timedelta(seconds=sec))
    
df = input_df \
    .withColumn('PERSONAL_AUX_SEC', F.unix_timestamp(F.col('PERSONAL_AUX'), 'H:mm:ss')) \
    .withColumn('OUTBOUND_AUX_SEC', F.unix_timestamp(F.col('OUTBOUAND_AUX'), 'H:mm:ss')) \
    .withColumn('ID', F.col('ID').cast('int'))
    .groupBy('ID') \
    .agg(
       F.count('DATE').alias('Days_worked'),
       F.sum('PERSONAL_AUX_SEC').alias('sum_personal_aux').cast('double'),
       F.sum('OUTBOUND_AUX_SEC').alias('sum_outbound_aux').cast('double')
    )) 
    
schema = T.StructType([
    T.StructField('ID',T.IntegerType(),True),
    T.StructField('Days_worked',T.IntegerType(),True),
    T.StructField('sum_personal_aux',T.StringType(),True),
    T.StructField('sum_outbound_aux',T.StringType(), True)
])  
    
df1 = df.rdd.map(lambda x: (x[0], x[1] , secscon(x[2]), secscon(x[3]))).toDF(schema)
from pyspark.sql import functions as F
from pyspark.sql import types as T
import datetime

def secscon(x):     
    sec = x     
    return str(datetime.timedelta(seconds=sec))
    
df = input_df \
    .withColumn('PERSONAL_AUX_SEC', F.unix_timestamp(F.col('PERSONAL_AUX'), 'H:mm:ss')) \
    .withColumn('OUTBOUND_AUX_SEC', F.unix_timestamp(F.col('OUTBOUAND_AUX'), 'H:mm:ss')) \
    .withColumn('ID', F.col('ID').cast('int'))
    .groupBy('ID') \
    .agg(
       F.count('DATE').alias('Days_worked'),
       F.sum('PERSONAL_AUX_SEC').alias('sum_personal_aux').cast('double'),
       F.sum('OUTBOUND_AUX_SEC').alias('sum_outbound_aux').cast('double')
    )) 
    
schema = T.StructType([
    T.StructField('ID',T.IntegerType(),True),
    T.StructField('Days_worked',T.IntegerType(),True),
    T.StructField('sum_personal_aux',T.StringType(),True),
    T.StructField('sum_outbound_aux',T.StringType(), True)
])  
    
df1 = df.rdd.map(lambda x: (x[0], x[1] , secscon(x[2]), secscon(x[3]))).toDF(schema)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文