SAS使用滞后函数或其他方法等于SAS中实现的目标

发布于 2025-01-27 10:18:49 字数 2379 浏览 2 评论 0原文

我正在将传统SAS代码重写为Pyspark。在其中一个块中,SAS代码使用了滞后函数。我理解笔记的方式,说明ID是相隔不到4天的两个进气日期,这是一个重复。

/*Next create flag if the same ID has two intake dates less than 4 days apart*/
/*Data MUST be sorted by ID and DESCENDING IntakeDate!!!*/
data duplicates (drop= lag_ID  lag_IntakeDate);
 set df2;
 by ID;
    lag_ID = lag(ID);
    lag_INtakeDate = lag(IntakeDate);
    if ID = lag_ID then do;
        intake2TIME = intck('day', lag_IntakeDate, IntakeDate);
        end;
    if 0 <= abs(intake2TIME) < 4 then DUPLICATE = 1;
run;
/* If the DUPLICATE > 1, then it is a duplicate and eventually will be dropped.*/

我尝试满足评论中所述的条件:我被SQL拔出的ID和以ID订购的进气日期和下降的进气日期:

SELECT ID, intakeDate, col3, col4
from df order by ID, intakeDate DESC

我搜索了同等滞后的lag,这就是我发现的: https://www.educba.com/pyspark-lag/”

SELECT *
FROM df
WHERE EXISTS (
    SELECT * 
    FROM df v2 
    WHERE df.ID = v2.ID AND DATEDIFF(df.IntakeDate, v2.IntakeDate) > 4 ) 
/* not sure about the second condition, though*/)

初始DF

+-----------+------------------+
|         Id|        IntakeDate|
+-----------+------------------+
|        5.0|        2021-04-14|
|        5.0|        2021-05-06|
|        5.0|        2021-05-08|
|       10.0|        2021-04-21|
|       10.0|        2021-05-25|
|       14.0|        2021-03-08|
|       14.0|        2021-03-09|
|       14.0|        2021-09-30|
|       14.0|        2022-04-08|
|       15.0|        2021-04-27|
|       15.0|        2021-05-18|
|       15.0|        2022-01-17|
|       26.0|        2021-08-27|
|       26.0|        2021-09-17|
+-----------+------------------+

预期DF如果下一个Intakedate小于小于前约日期的3天

+-----------+------------------+
|         Id|        IntakeDate|
+-----------+------------------+
|        5.0|        2021-04-14|
|        5.0|        2021-05-06| row to drop
|        5.0|        2021-05-08|
|       10.0|        2021-04-21|
|       10.0|        2021-05-25|
|       14.0|        2021-03-08| row to drop
|       14.0|        2021-03-09|
|       14.0|        2021-09-30|
|       14.0|        2022-04-08|
|       15.0|        2021-04-27|
|       15.0|        2021-05-18|
|       15.0|        2022-01-17|
|       26.0|        2021-08-27|
|       26.0|        2021-09-17|
+-----------+------------------+

I am rewriting legacy SAS codes to PySpark. In one of those blocks, the SAS codes used the lag function. The way I understood the notes, it says an ID is a duplicate if it as two intake dates that are less than 4 days apart.

/*Next create flag if the same ID has two intake dates less than 4 days apart*/
/*Data MUST be sorted by ID and DESCENDING IntakeDate!!!*/
data duplicates (drop= lag_ID  lag_IntakeDate);
 set df2;
 by ID;
    lag_ID = lag(ID);
    lag_INtakeDate = lag(IntakeDate);
    if ID = lag_ID then do;
        intake2TIME = intck('day', lag_IntakeDate, IntakeDate);
        end;
    if 0 <= abs(intake2TIME) < 4 then DUPLICATE = 1;
run;
/* If the DUPLICATE > 1, then it is a duplicate and eventually will be dropped.*/

I tried meeting the condition as described in the comments: I pulled by sql the ID and intake dates ordered by ID and descending intake date:

SELECT ID, intakeDate, col3, col4
from df order by ID, intakeDate DESC

I googled the lag equivalent and this is what I found:
https://www.educba.com/pyspark-lag/

However, I have not used window function before, the concept introduced by the site does not somehow make sense to me, though I tried the following to check if my understanding of WHERE EXISTS might work:

SELECT *
FROM df
WHERE EXISTS (
    SELECT * 
    FROM df v2 
    WHERE df.ID = v2.ID AND DATEDIFF(df.IntakeDate, v2.IntakeDate) > 4 ) 
/* not sure about the second condition, though*/)

Initial df

+-----------+------------------+
|         Id|        IntakeDate|
+-----------+------------------+
|        5.0|        2021-04-14|
|        5.0|        2021-05-06|
|        5.0|        2021-05-08|
|       10.0|        2021-04-21|
|       10.0|        2021-05-25|
|       14.0|        2021-03-08|
|       14.0|        2021-03-09|
|       14.0|        2021-09-30|
|       14.0|        2022-04-08|
|       15.0|        2021-04-27|
|       15.0|        2021-05-18|
|       15.0|        2022-01-17|
|       26.0|        2021-08-27|
|       26.0|        2021-09-17|
+-----------+------------------+

expected df will have row dropped if the next intakedate is less than 3 days of the prior date

+-----------+------------------+
|         Id|        IntakeDate|
+-----------+------------------+
|        5.0|        2021-04-14|
|        5.0|        2021-05-06| row to drop
|        5.0|        2021-05-08|
|       10.0|        2021-04-21|
|       10.0|        2021-05-25|
|       14.0|        2021-03-08| row to drop
|       14.0|        2021-03-09|
|       14.0|        2021-09-30|
|       14.0|        2022-04-08|
|       15.0|        2021-04-27|
|       15.0|        2021-05-18|
|       15.0|        2022-01-17|
|       26.0|        2021-08-27|
|       26.0|        2021-09-17|
+-----------+------------------+

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

丢了幸福的猪 2025-02-03 10:18:50

请尝试以下代码:

import pyspark.sql.window as Window
import pyspark.sql.functions as F

lead_over_id = Window.partitionBy('id').orderBy('IntakeDate')

df = (df
    .withColumn('lead_1_date', F.lag('IntakeDate', -1).over(lead_over_id))
    .withColumn('date_diff', F.datediff('IntakeDate', 'lead_1_date'))
    .where((F.col('date_diff') > 4) | F.col('date_diff').isnull())
    .drop('lead_1_date', 'date_diff')
)

Please try the following code:

import pyspark.sql.window as Window
import pyspark.sql.functions as F

lead_over_id = Window.partitionBy('id').orderBy('IntakeDate')

df = (df
    .withColumn('lead_1_date', F.lag('IntakeDate', -1).over(lead_over_id))
    .withColumn('date_diff', F.datediff('IntakeDate', 'lead_1_date'))
    .where((F.col('date_diff') > 4) | F.col('date_diff').isnull())
    .drop('lead_1_date', 'date_diff')
)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文