dask:具有不同汇总规则
我正在使用DASK中的面板数据(即,ID和时间段的面板)工作,并希望将频率从微秒重新置于30秒。示例数据看起来像这样:
size price ID
datetime
2018-09-26 13:50:00.000600 300 17.8185 AR
2018-09-26 13:50:00.004797 25 37.1165 BCOR
2018-09-26 13:50:00.005955 300 17.8185 AR
2018-09-26 13:50:00.006066 100 78.6200 XLI
2018-09-26 13:50:00.006862 100 73.0600 ABT
2018-09-26 13:50:00.007164 100 73.0600 ABT
2018-09-26 13:50:00.008643 100 73.3332 FAS
2018-09-26 13:50:00.008762 100 73.0600 ABT
2018-09-26 13:50:00.008793 2 114.4950 MSFT
2018-09-26 13:50:00.008978 100 20.6350 NWL
其中ID是String
,DateTime是DateTime
Object(当前设置为索引),大小为int64
和价格为float64
。我想:
- groupby
id
- 重新采用30秒的频率,
- 同时通过其平均值和汇总大小按其和总和进行汇总。从本质上讲,通过不同函数进行聚合列。
我了解Dask不支持GroupBy-Resample操作,但是基于出色的帖子在这里,使用dask和pandas的混合物似乎可以做到。
我目前的尝试(基于上面的链接帖子)是:
def per_group(blk):
return blk.resample('30S').agg({blk['price']: np.mean, blk['size']: np.sum})
ddf.groupby('ID').apply(per_group, meta=ddf).compute()
但是它返回typeError:'系列'对象是可变的,因此无法进行哈希
。我的感觉是它与“ ID”列有关,但我无法弄清楚。我还尝试提供meta = {'size':np.int64,'price':np.float64,'id':'object'}
而不是无用。
很想看到其他任何方式可以更有效地做到这一点!谢谢。
I am working with panel data (i.e., a panel of IDs and time periods) in Dask and wish to resample the frequency from microseconds to 30 seconds. Sample data looks like this:
size price ID
datetime
2018-09-26 13:50:00.000600 300 17.8185 AR
2018-09-26 13:50:00.004797 25 37.1165 BCOR
2018-09-26 13:50:00.005955 300 17.8185 AR
2018-09-26 13:50:00.006066 100 78.6200 XLI
2018-09-26 13:50:00.006862 100 73.0600 ABT
2018-09-26 13:50:00.007164 100 73.0600 ABT
2018-09-26 13:50:00.008643 100 73.3332 FAS
2018-09-26 13:50:00.008762 100 73.0600 ABT
2018-09-26 13:50:00.008793 2 114.4950 MSFT
2018-09-26 13:50:00.008978 100 20.6350 NWL
where ID is a string
, datetime is the datetime
object (currently set as the index), size is int64
and price is float64
. I want to:
- groupby
ID
- resample onto a 30-second frequency
- while aggregating price by its mean and aggregating size by its sum. Essentially, aggregate columns by different functions.
I understand that Dask doesn't support groupby-resample operations, but based on an excellent post here, it seems doable using a mix of dask and pandas.
My current attempt (based on the linked post above) is:
def per_group(blk):
return blk.resample('30S').agg({blk['price']: np.mean, blk['size']: np.sum})
ddf.groupby('ID').apply(per_group, meta=ddf).compute()
but it returns TypeError: 'Series' objects are mutable, thus they cannot be hashed
. My sense is that it has something to do with the 'ID' column but I can't figure it out. I also tried supplying meta={'size': np.int64, 'price': np.float64, 'ID': 'object'}
instead but to no avail.
Would love to see any other way this could be done more efficiently! Thanks.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
要使用
.Resample
索引应为DateTime(或另一个合适的DTYPE)。一种解决方案是修改聚合函数并添加DateTime索引的设置(另一个是预先通过DateTime索引):完整的可重复段:
To use
.resample
the index should be a datetime (or another suitable dtype). One solution is to modify the aggregation function and add setting of the datetime index (another is to index by datetime in advance):The full reproducible snippet: