dask:具有不同汇总规则

发布于 2025-01-31 13:32:56 字数 1628 浏览 5 评论 0原文

我正在使用DASK中的面板数据(即,ID和时间段的面板)工作,并希望将频率从微秒重新置于30秒。示例数据看起来像这样:

                            size     price       ID
datetime                                           
2018-09-26 13:50:00.000600   300   17.8185       AR
2018-09-26 13:50:00.004797    25   37.1165     BCOR
2018-09-26 13:50:00.005955   300   17.8185       AR
2018-09-26 13:50:00.006066   100   78.6200      XLI
2018-09-26 13:50:00.006862   100   73.0600      ABT
2018-09-26 13:50:00.007164   100   73.0600      ABT
2018-09-26 13:50:00.008643   100   73.3332      FAS
2018-09-26 13:50:00.008762   100   73.0600      ABT
2018-09-26 13:50:00.008793     2  114.4950     MSFT
2018-09-26 13:50:00.008978   100   20.6350      NWL

其中ID是String,DateTime是DateTime Object(当前设置为索引),大小为int64和价格为float64。我想:

  1. groupby id
  2. 重新采用30秒的频率,
  3. 同时通过其平均值和汇总大小按其和总和进行汇总。从本质上讲,通过不同函数进行聚合列。

我了解Dask不支持GroupBy-Resample操作,但是基于出色的帖子在这里,使用dask和pandas的混合物似乎可以做到。

我目前的尝试(基于上面的链接帖子)是:

def per_group(blk):
    return blk.resample('30S').agg({blk['price']: np.mean, blk['size']: np.sum})

ddf.groupby('ID').apply(per_group, meta=ddf).compute() 

但是它返回typeError:'系列'对象是可变的,因此无法进行哈希。我的感觉是它与“ ID”列有关,但我无法弄清楚。我还尝试提供meta = {'size':np.int64,'price':np.float64,'id':'object'}而不是无用。

很想看到其他任何方式可以更有效地做到这一点!谢谢。

I am working with panel data (i.e., a panel of IDs and time periods) in Dask and wish to resample the frequency from microseconds to 30 seconds. Sample data looks like this:

                            size     price       ID
datetime                                           
2018-09-26 13:50:00.000600   300   17.8185       AR
2018-09-26 13:50:00.004797    25   37.1165     BCOR
2018-09-26 13:50:00.005955   300   17.8185       AR
2018-09-26 13:50:00.006066   100   78.6200      XLI
2018-09-26 13:50:00.006862   100   73.0600      ABT
2018-09-26 13:50:00.007164   100   73.0600      ABT
2018-09-26 13:50:00.008643   100   73.3332      FAS
2018-09-26 13:50:00.008762   100   73.0600      ABT
2018-09-26 13:50:00.008793     2  114.4950     MSFT
2018-09-26 13:50:00.008978   100   20.6350      NWL

where ID is a string, datetime is the datetime object (currently set as the index), size is int64 and price is float64. I want to:

  1. groupby ID
  2. resample onto a 30-second frequency
  3. while aggregating price by its mean and aggregating size by its sum. Essentially, aggregate columns by different functions.

I understand that Dask doesn't support groupby-resample operations, but based on an excellent post here, it seems doable using a mix of dask and pandas.

My current attempt (based on the linked post above) is:

def per_group(blk):
    return blk.resample('30S').agg({blk['price']: np.mean, blk['size']: np.sum})

ddf.groupby('ID').apply(per_group, meta=ddf).compute() 

but it returns TypeError: 'Series' objects are mutable, thus they cannot be hashed. My sense is that it has something to do with the 'ID' column but I can't figure it out. I also tried supplying meta={'size': np.int64, 'price': np.float64, 'ID': 'object'} instead but to no avail.

Would love to see any other way this could be done more efficiently! Thanks.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

对岸观火 2025-02-07 13:32:56

要使用.Resample索引应为DateTime(或另一个合适的DTYPE)。一种解决方案是修改聚合函数并添加DateTime索引的设置(另一个是预先通过DateTime索引):

def per_group(df):
    return (
        df
        .set_index("datetime")
        .resample("30S")
        .agg({"price": "mean", "size": "mean"})
    )

ddf.groupby("ID").apply(per_group).compute()

完整的可重复段:

from io import StringIO

from dask.dataframe import from_pandas
from pandas import read_fwf, to_datetime

data = StringIO(
    """
datetime                    size     price       ID
2018-09-26 13:50:00.000600   300   17.8185       AR
2018-09-26 13:50:00.004797    25   37.1165     BCOR
2018-09-26 13:50:00.005955   300   17.8185       AR
2018-09-26 13:50:00.006066   100   78.6200      XLI
2018-09-26 13:50:00.006862   100   73.0600      ABT
2018-09-26 13:50:00.007164   100   73.0600      ABT
2018-09-26 13:50:00.008643   100   73.3332      FAS
2018-09-26 13:50:00.008762   100   73.0600      ABT
2018-09-26 13:50:00.008793     2  114.4950     MSFT
2018-09-26 13:50:00.008978   100   20.6350      NWL
"""
)

df = read_fwf(data)
df["datetime"] = df["datetime"] + " " + df["Unnamed: 1"]
df["datetime"] = to_datetime(df["datetime"])

ddf = from_pandas(df, npartitions=2)


def per_group(df):
    return (
        df.set_index("datetime").resample("30S").agg({"price": "mean", "size": "mean"})
    )


ddf.groupby("ID").apply(per_group).compute()

To use .resample the index should be a datetime (or another suitable dtype). One solution is to modify the aggregation function and add setting of the datetime index (another is to index by datetime in advance):

def per_group(df):
    return (
        df
        .set_index("datetime")
        .resample("30S")
        .agg({"price": "mean", "size": "mean"})
    )

ddf.groupby("ID").apply(per_group).compute()

The full reproducible snippet:

from io import StringIO

from dask.dataframe import from_pandas
from pandas import read_fwf, to_datetime

data = StringIO(
    """
datetime                    size     price       ID
2018-09-26 13:50:00.000600   300   17.8185       AR
2018-09-26 13:50:00.004797    25   37.1165     BCOR
2018-09-26 13:50:00.005955   300   17.8185       AR
2018-09-26 13:50:00.006066   100   78.6200      XLI
2018-09-26 13:50:00.006862   100   73.0600      ABT
2018-09-26 13:50:00.007164   100   73.0600      ABT
2018-09-26 13:50:00.008643   100   73.3332      FAS
2018-09-26 13:50:00.008762   100   73.0600      ABT
2018-09-26 13:50:00.008793     2  114.4950     MSFT
2018-09-26 13:50:00.008978   100   20.6350      NWL
"""
)

df = read_fwf(data)
df["datetime"] = df["datetime"] + " " + df["Unnamed: 1"]
df["datetime"] = to_datetime(df["datetime"])

ddf = from_pandas(df, npartitions=2)


def per_group(df):
    return (
        df.set_index("datetime").resample("30S").agg({"price": "mean", "size": "mean"})
    )


ddf.groupby("ID").apply(per_group).compute()
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文