DASK数据帧并行任务

发布于 2025-02-02 20:58:13 字数 1836 浏览 4 评论 0原文

我想从数据框架创建功能(附加列),并且具有以下许多功能的结构。

之后此文档 https://docs.dask.dask.org/en/stable /delayed-best-practices.html 我提出了以下代码。

但是我收到错误消息:consturrent._base.cancellederror,很多时候我都会收到警告:distribute.utils_perf-警告 - 全部垃圾收集最近花了10%的CPU时间(阈值:10%),

我知道我是对象,我是对象延迟延迟非常大(当我使用评论的DF时,可以正常运行),这就是为什么程序崩溃但有更好的方法可以这样做吗?

import pandas as pd
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd 
import numpy as np
import dask




def main():
    #df = pd.DataFrame({"col1": np.random.randint(1, 100, 100000), "col2": np.random.randint(101, 200, 100000), "col3": np.random.uniform(0, 4, 100000)})
    df = pd.DataFrame({"col1": np.random.randint(1, 100, 100000000), "col2": np.random.randint(101, 200, 100000000), "col3": np.random.uniform(0, 4, 100000000)})

    ddf = dd.from_pandas(df, npartitions=100)

    ddf = ddf.set_index("col1")
    delay = []
    
    
    def create_col_sth():
        
        group = ddf.groupby("col1")["col3"]
        
        @dask.delayed
        def small_fun(lag):
            return f"col_{lag}", group.transform(lambda x: x.shift(lag), meta=('x', 'float64')).apply(lambda x: np.log(x), meta=('x', 'float64'))


        for lag in range(5):
            x = small_fun(lag)
            delay.append(x)
        
    create_col_sth()    
    delayed = dask.compute(*delay)
    
    
    
    for data in delayed:
        ddf[data[0]] = data[1]
        
    ddf.to_parquet("test", engine="fastparquet")


if __name__ == "__main__":
    cluster = LocalCluster(n_workers=6, 
                    threads_per_worker=2,
                    memory_limit='8GB')
    client = Client(cluster)
    main()

I want to create features(additional columns) from a dataframe and I have the following structure for many functions.

Following this documentation https://docs.dask.org/en/stable/delayed-best-practices.html I have come up with the code below.

However I get the error message: concurrent.futures._base.CancelledError and many times I get the warning: distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)

I understand that the object I am appending to delay is very large(it works ok when I use the commented out df) which is why the program crashes but is there a better way of doing it?

import pandas as pd
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd 
import numpy as np
import dask




def main():
    #df = pd.DataFrame({"col1": np.random.randint(1, 100, 100000), "col2": np.random.randint(101, 200, 100000), "col3": np.random.uniform(0, 4, 100000)})
    df = pd.DataFrame({"col1": np.random.randint(1, 100, 100000000), "col2": np.random.randint(101, 200, 100000000), "col3": np.random.uniform(0, 4, 100000000)})

    ddf = dd.from_pandas(df, npartitions=100)

    ddf = ddf.set_index("col1")
    delay = []
    
    
    def create_col_sth():
        
        group = ddf.groupby("col1")["col3"]
        
        @dask.delayed
        def small_fun(lag):
            return f"col_{lag}", group.transform(lambda x: x.shift(lag), meta=('x', 'float64')).apply(lambda x: np.log(x), meta=('x', 'float64'))


        for lag in range(5):
            x = small_fun(lag)
            delay.append(x)
        
    create_col_sth()    
    delayed = dask.compute(*delay)
    
    
    
    for data in delayed:
        ddf[data[0]] = data[1]
        
    ddf.to_parquet("test", engine="fastparquet")


if __name__ == "__main__":
    cluster = LocalCluster(n_workers=6, 
                    threads_per_worker=2,
                    memory_limit='8GB')
    client = Client(cluster)
    main()

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

无可置疑 2025-02-09 20:58:13

不确定这是否会解决您的所有问题,但是通常您不需要(也不需要)混合延迟和dask.datafame操作。此外,您不应通过示例中的group之类的封闭将大数据对象传递到延迟的功能中。取而代之的是,将它们作为显式参数或在这种情况下完全不使用延迟,并使用dask.dataframe本机操作或使用dask.dataframe.map_partitions使用内存操作。

实施这些,我将重写您的主要功能,如下所示:

df = pd.DataFrame({
    "col1": np.random.randint(1, 100, 100000000),
    "col2": np.random.randint(101, 200, 100000000),
    "col3": np.random.uniform(0, 4, 100000000),
})

ddf = dd.from_pandas(df, npartitions=100)
ddf = ddf.set_index("col1")

group = ddf.groupby("col1")["col3"]

# directly assign the dataframe operations as columns
for lag in range(5):
    ddf[f"col_{lag}"] = (
        group
        .transform(lambda x: x.shift(lag), meta=('x', 'float64'))
        .apply(lambda x: np.log(x), meta=('x', 'float64'))
    )

# this triggers the operation implicitly - no need to call compute
ddf.to_parquet("test", engine="fastparquet")

Not sure if this will resolve all of your issues, but generally you don't need to (and shouldn't) mix delayed and dask.datafame operations like this. Additionally, you shouldn't pass large data objects into delayed functions through closures like group in your example. Instead, include them as explicit arguments, or in this case, don't use delayed at all and use dask.dataframe native operations or in-memory operations with dask.dataframe.map_partitions.

Implementing these, I would rewrite your main function as follows:

df = pd.DataFrame({
    "col1": np.random.randint(1, 100, 100000000),
    "col2": np.random.randint(101, 200, 100000000),
    "col3": np.random.uniform(0, 4, 100000000),
})

ddf = dd.from_pandas(df, npartitions=100)
ddf = ddf.set_index("col1")

group = ddf.groupby("col1")["col3"]

# directly assign the dataframe operations as columns
for lag in range(5):
    ddf[f"col_{lag}"] = (
        group
        .transform(lambda x: x.shift(lag), meta=('x', 'float64'))
        .apply(lambda x: np.log(x), meta=('x', 'float64'))
    )

# this triggers the operation implicitly - no need to call compute
ddf.to_parquet("test", engine="fastparquet")
反话 2025-02-09 20:58:13

经过长期对Dask感到沮丧,我想我砍掉了用Dask包裹的熊猫转变的圣洁圣杯。

学习点:

  1. 智能索引。如果您要分组或合并,则应考虑为使用这些列索引列​​。

  2. 智能分区和重新分配。如果您有10k行的数据框架和1M行的另一行,则它们自然应该具有不同的分区。

  3. 除了合并以外,不要使用DASK数据框架转换方法。其他的应该是包裹在map_partitions周围的熊猫代码中。

  4. 不要累积太大的图形,因此请考虑在索引或进行复杂的转换之后保存。
     

  5. 如果可能过滤数据框并使用较小的子集工作,则可以将其合并回较大的数据集。

  6. 如果您在本地计算机中工作,请在系统规范的边界内设置内存限制。这一点非常重要。在下面的示例中,我创建了一百万行3列的一百万行是INT64,两个是float64,每个是8个字节,总共24个字节,这给了我2400万字节。

import pandas as pd
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd 
import numpy as np
import dask


# https://stackoverflow.com/questions/52642966/repartition-dask-dataframe-to-get-even-partitions
def _rebalance_ddf(ddf):
    """Repartition dask dataframe to ensure that partitions are roughly equal size.

    Assumes `ddf.index` is already sorted.
    """
    if not ddf.known_divisions:  # e.g. for read_parquet(..., infer_divisions=False)
        ddf = ddf.reset_index().set_index(ddf.index.name, sorted=True)
    index_counts = ddf.map_partitions(lambda _df: _df.index.value_counts().sort_index()).compute()
    index = np.repeat(index_counts.index, index_counts.values)
    divisions, _ = dd.io.io.sorted_division_locations(index, npartitions=ddf.npartitions)
    return ddf.repartition(divisions=divisions)


def main(client):
    size = 1000000

    df = pd.DataFrame({"col1": np.random.randint(1, 10000, size), "col2": np.random.randint(101, 20000, size), "col3": np.random.uniform(0, 100, size)})

    # Select appropriate partitions
    ddf = dd.from_pandas(df, npartitions=500)
    del df
    gc.collect()
    # This is correct if you want to group by a certain column it is always best if that column is an indexed one
    ddf = ddf.set_index("col1")

        
    
    ddf = _rebalance_ddf(ddf)
    print(ddf.memory_usage_per_partition(index=True, deep=False).compute())
    print(ddf.memory_usage(deep=True).sum().compute())

    # Always persist your data to prevent big task graphs actually if you omit this step processing will fail
    ddf.to_parquet("test", engine="fastparquet")
    
    ddf = dd.read_parquet("test")

    
    # Dummy code to create a dataframe to be merged based on col1
    ddf2 = ddf[["col2", "col3"]]
    ddf2["col2/col3"] = ddf["col2"] / ddf["col3"] 
    ddf2 = ddf2.drop(columns=["col2", "col3"])
    
    # Repartition the data
    ddf2 = _rebalance_ddf(ddf2)
    print(ddf2.memory_usage_per_partition(index=True, deep=False).compute())
    print(ddf2.memory_usage(deep=True).sum().compute())

    
    
    
    def mapped_fun(data):
        for lag in range(5):
            data[f"col_{lag}"] = data.groupby("col1")["col3"].transform(lambda x: x.shift(lag)).apply(lambda x: np.log(x))
        return data

    # Process the group by transformation in pandas but wrapped with Dask if you use the Dask functions to do this you will 
    # have a variety of issues.
    ddf = ddf.map_partitions(mapped_fun)

    # Additional... you can merge ddf with ddf2 but on an indexed column otherwise you run into a variety of issues
    ddf = ddf.merge(ddf2, on=['col1'], how="left")

    ddf.to_parquet("final", engine="fastparquet")


if __name__ == "__main__":
    cluster = LocalCluster(n_workers=6, 
                    threads_per_worker=2,
                    memory_limit='8GB')
    client = Client(cluster)
    main(client)

After long periods of frustration with Dask, I think I hacked the holy grail of refactoring your pandas transformations wrapped with dask.

Learning points:

  1. Index intelligently. If you are grouping by or merging you should consider indexing the columns you use for those.

  2. Partition and repartition intelligently. If you have a dataframe of 10k rows and another of 1m rows, they should naturally have different partitions.

  3. Don't use dask data frame transformation methods except for example merge. The others should be in pandas code wrapped around map_partitions.

  4. Don't accumulate too large graphs so consider saving after for example indexing or after making a complex transformation.  

  5. If possible filter the data frame and work with smaller subset you can always merge this back to the bigger data set.

  6. If you are working in your local machine set the memory limits within the boundaries of system specifications. This point is very important. In the example below I create one million rows of 3 columns one is an int64 and two are float64 which are 8bytes each and 24bytes in total this gives me 24 million bytes.

import pandas as pd
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd 
import numpy as np
import dask


# https://stackoverflow.com/questions/52642966/repartition-dask-dataframe-to-get-even-partitions
def _rebalance_ddf(ddf):
    """Repartition dask dataframe to ensure that partitions are roughly equal size.

    Assumes `ddf.index` is already sorted.
    """
    if not ddf.known_divisions:  # e.g. for read_parquet(..., infer_divisions=False)
        ddf = ddf.reset_index().set_index(ddf.index.name, sorted=True)
    index_counts = ddf.map_partitions(lambda _df: _df.index.value_counts().sort_index()).compute()
    index = np.repeat(index_counts.index, index_counts.values)
    divisions, _ = dd.io.io.sorted_division_locations(index, npartitions=ddf.npartitions)
    return ddf.repartition(divisions=divisions)


def main(client):
    size = 1000000

    df = pd.DataFrame({"col1": np.random.randint(1, 10000, size), "col2": np.random.randint(101, 20000, size), "col3": np.random.uniform(0, 100, size)})

    # Select appropriate partitions
    ddf = dd.from_pandas(df, npartitions=500)
    del df
    gc.collect()
    # This is correct if you want to group by a certain column it is always best if that column is an indexed one
    ddf = ddf.set_index("col1")

        
    
    ddf = _rebalance_ddf(ddf)
    print(ddf.memory_usage_per_partition(index=True, deep=False).compute())
    print(ddf.memory_usage(deep=True).sum().compute())

    # Always persist your data to prevent big task graphs actually if you omit this step processing will fail
    ddf.to_parquet("test", engine="fastparquet")
    
    ddf = dd.read_parquet("test")

    
    # Dummy code to create a dataframe to be merged based on col1
    ddf2 = ddf[["col2", "col3"]]
    ddf2["col2/col3"] = ddf["col2"] / ddf["col3"] 
    ddf2 = ddf2.drop(columns=["col2", "col3"])
    
    # Repartition the data
    ddf2 = _rebalance_ddf(ddf2)
    print(ddf2.memory_usage_per_partition(index=True, deep=False).compute())
    print(ddf2.memory_usage(deep=True).sum().compute())

    
    
    
    def mapped_fun(data):
        for lag in range(5):
            data[f"col_{lag}"] = data.groupby("col1")["col3"].transform(lambda x: x.shift(lag)).apply(lambda x: np.log(x))
        return data

    # Process the group by transformation in pandas but wrapped with Dask if you use the Dask functions to do this you will 
    # have a variety of issues.
    ddf = ddf.map_partitions(mapped_fun)

    # Additional... you can merge ddf with ddf2 but on an indexed column otherwise you run into a variety of issues
    ddf = ddf.merge(ddf2, on=['col1'], how="left")

    ddf.to_parquet("final", engine="fastparquet")


if __name__ == "__main__":
    cluster = LocalCluster(n_workers=6, 
                    threads_per_worker=2,
                    memory_limit='8GB')
    client = Client(cluster)
    main(client)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文