使用火花窗口功能在30天间隔间隔
我有此数据框架:
df = (
spark
.createDataFrame([
[20210101, 'A', 103, "abc"],
[20210101, 'A', 102, "def"],
[20210101, 'A', 101, "def"],
[20210102, 'A', 34, "ghu"],
[20210101, 'B', 180, "xyz"],
[20210102, 'B', 123, "kqt"]
]
).toDF("txn_date", "txn_type", "txn_amount", "other_attributes")
)
每个日期都有多个不同类型的交易。我的任务是计算每个记录的标准偏差(相同类型和30天)。
最明显的方法(我尝试过)是根据类型创建一个窗口,并包含可以追溯到30天的记录。
days = lambda i: i * 86400
win = Window.partitionBy("txn_type").orderBy(F.col("txn_date").cast(LongType())).rangeBetween(-days(30), 0)
df = df.withColumn("stddev_last_30days", F.stddev(F.col("txn_amount")).over(win))
由于某些交易类型每天具有数百万美元的交易,因此将其呈OOM。
我尝试以部分进行(一次只需几个记录记录),但这会导致错误计算,因为标准偏差不是附加的。
我还尝试了用于交易类型和日期的所有记录的“ collect_set”(因此,所有金额作为一个列中的数组都出现在一个列中),但这也呈OOM。
我尝试一次处理一个月(因为我需要回去1个月,所以至少需要2个月的数据),但即使那样,我的执行者也不知所措。
解决此问题的可扩展方法是什么?
注意:
在原始数据中,列
txn_date
存储在“ yyyymmdd”格式中。数据框架中还有其他列对于每个日期和类型可能不一样。为了简单起见,我还没有将它们包括在示例代码中。
I have this data frame:
df = (
spark
.createDataFrame([
[20210101, 'A', 103, "abc"],
[20210101, 'A', 102, "def"],
[20210101, 'A', 101, "def"],
[20210102, 'A', 34, "ghu"],
[20210101, 'B', 180, "xyz"],
[20210102, 'B', 123, "kqt"]
]
).toDF("txn_date", "txn_type", "txn_amount", "other_attributes")
)
Each date has multiple transactions of each of the different types. My task is to compute the standard deviation of the amount for each record (for the same type and going back 30 days).
The most obvious approach (that I tried) is to create a window based on type and include records going back to past 30 days.
days = lambda i: i * 86400
win = Window.partitionBy("txn_type").orderBy(F.col("txn_date").cast(LongType())).rangeBetween(-days(30), 0)
df = df.withColumn("stddev_last_30days", F.stddev(F.col("txn_amount")).over(win))
Since some of the transaction types have millions of transactions per day, this runs into OOM.
I tried doing it in parts (take only few records for each date at a time) but this leads to error prone calculations since standard deviation is not additive.
I also tried 'collect_set' for all records for a transaction type and date (so all amounts come in as an array in one column), but this runs into OOM as well.
I tried processing one month at a time (I need at a minimum 2 months data since I need to go back 1 month) but even that overwhelms my executors.
What would be a scalable way to solve this problem?
Notes:
In the original data, column
txn_date
is stored as long in "yyyyMMdd" format.There are other columns in the data frame that may or may not be same for each date and type. I haven't included them in the sample code for simplicity.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
过滤
删除不需要的数据总是很好。您说您只需要过去60天,所以您可以
过滤
out不需要的东西。这条线只会将行只保留,日期不超过60个最后几天(直到今天):
为了说明其他问题,我现在不使用它。
窗口
第一个简单的事情,如果它已经以较长的格式,您无需再次施放长时间,因此我们可以删除
.cast(longtype())
。另一个, 大事 ,是您的窗户的下限是错误的。看,让我们在输入中再添加一条线:
该行表示1999年的日期。添加了行后,运行代码,我们得到了:
您可以看到2021年的STDDEV也受到影响,因此也受到影响,因此30白天窗口不起作用,您的窗口实际上 all 可以使用它。我们可以检查日期
20210101
的下限是什么:这可能是您最大的问题。 您永远不要尝试超越日期和时间。始终使用专门用于日期和时间的功能。
您可以使用此窗口:
Filtering
It's always good to remove data which is not needed. You said you need just last 60 days, so You could
filter
out what's not needed.This line would keep only rows with date not older than 60 last days (until today):
I'll not use it now in order to illustrate other issues.
Window
The first simple thing, if it's already in long format, you don't need to cast to long again, so we can remove
.cast(LongType())
.The other, big thing, is that your window's lower bound is wrong. Look, let's add one more line to the input:
The line represents the date from the year 1999. After the line was added, running the code, we get this:
You can see that stddev for 2021 year lines was also affected, so 30 day window does not work, your window actually takes all the data it can. We can check what is the lower bound for date
20210101
:Probably this was your biggest problem. You should never try to outsmart dates and times. Always use functions specialized for dates and times.
You can use this window:
unix_timestamp
can transform your 'yyyyMMdd' format into a proper long-format number (UNIX time in seconds). From this, now you can subtract seconds (30 days worth of seconds).