DASK数据帧并行任务
我想从数据框架创建功能(附加列),并且具有以下许多功能的结构。
之后此文档 https://docs.dask.dask.org/en/stable /delayed-best-practices.html 我提出了以下代码。
但是我收到错误消息:consturrent._base.cancellederror,很多时候我都会收到警告:distribute.utils_perf-警告 - 全部垃圾收集最近花了10%的CPU时间(阈值:10%),
我知道我是对象,我是对象延迟延迟非常大(当我使用评论的DF时,可以正常运行),这就是为什么程序崩溃但有更好的方法可以这样做吗?
import pandas as pd
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd
import numpy as np
import dask
def main():
#df = pd.DataFrame({"col1": np.random.randint(1, 100, 100000), "col2": np.random.randint(101, 200, 100000), "col3": np.random.uniform(0, 4, 100000)})
df = pd.DataFrame({"col1": np.random.randint(1, 100, 100000000), "col2": np.random.randint(101, 200, 100000000), "col3": np.random.uniform(0, 4, 100000000)})
ddf = dd.from_pandas(df, npartitions=100)
ddf = ddf.set_index("col1")
delay = []
def create_col_sth():
group = ddf.groupby("col1")["col3"]
@dask.delayed
def small_fun(lag):
return f"col_{lag}", group.transform(lambda x: x.shift(lag), meta=('x', 'float64')).apply(lambda x: np.log(x), meta=('x', 'float64'))
for lag in range(5):
x = small_fun(lag)
delay.append(x)
create_col_sth()
delayed = dask.compute(*delay)
for data in delayed:
ddf[data[0]] = data[1]
ddf.to_parquet("test", engine="fastparquet")
if __name__ == "__main__":
cluster = LocalCluster(n_workers=6,
threads_per_worker=2,
memory_limit='8GB')
client = Client(cluster)
main()
I want to create features(additional columns) from a dataframe and I have the following structure for many functions.
Following this documentation https://docs.dask.org/en/stable/delayed-best-practices.html I have come up with the code below.
However I get the error message: concurrent.futures._base.CancelledError and many times I get the warning: distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
I understand that the object I am appending to delay is very large(it works ok when I use the commented out df) which is why the program crashes but is there a better way of doing it?
import pandas as pd
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd
import numpy as np
import dask
def main():
#df = pd.DataFrame({"col1": np.random.randint(1, 100, 100000), "col2": np.random.randint(101, 200, 100000), "col3": np.random.uniform(0, 4, 100000)})
df = pd.DataFrame({"col1": np.random.randint(1, 100, 100000000), "col2": np.random.randint(101, 200, 100000000), "col3": np.random.uniform(0, 4, 100000000)})
ddf = dd.from_pandas(df, npartitions=100)
ddf = ddf.set_index("col1")
delay = []
def create_col_sth():
group = ddf.groupby("col1")["col3"]
@dask.delayed
def small_fun(lag):
return f"col_{lag}", group.transform(lambda x: x.shift(lag), meta=('x', 'float64')).apply(lambda x: np.log(x), meta=('x', 'float64'))
for lag in range(5):
x = small_fun(lag)
delay.append(x)
create_col_sth()
delayed = dask.compute(*delay)
for data in delayed:
ddf[data[0]] = data[1]
ddf.to_parquet("test", engine="fastparquet")
if __name__ == "__main__":
cluster = LocalCluster(n_workers=6,
threads_per_worker=2,
memory_limit='8GB')
client = Client(cluster)
main()
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
不确定这是否会解决您的所有问题,但是通常您不需要(也不需要)混合延迟和dask.datafame操作。此外,您不应通过示例中的
group
之类的封闭将大数据对象传递到延迟的功能中。取而代之的是,将它们作为显式参数或在这种情况下完全不使用延迟,并使用dask.dataframe本机操作或使用dask.dataframe.map_partitions使用内存操作。实施这些,我将重写您的主要功能,如下所示:
Not sure if this will resolve all of your issues, but generally you don't need to (and shouldn't) mix delayed and dask.datafame operations like this. Additionally, you shouldn't pass large data objects into delayed functions through closures like
group
in your example. Instead, include them as explicit arguments, or in this case, don't use delayed at all and use dask.dataframe native operations or in-memory operations with dask.dataframe.map_partitions.Implementing these, I would rewrite your main function as follows:
经过长期对Dask感到沮丧,我想我砍掉了用Dask包裹的熊猫转变的圣洁圣杯。
学习点:
智能索引。如果您要分组或合并,则应考虑为使用这些列索引列。
智能分区和重新分配。如果您有10k行的数据框架和1M行的另一行,则它们自然应该具有不同的分区。
除了合并以外,不要使用DASK数据框架转换方法。其他的应该是包裹在map_partitions周围的熊猫代码中。
不要累积太大的图形,因此请考虑在索引或进行复杂的转换之后保存。
 
如果可能过滤数据框并使用较小的子集工作,则可以将其合并回较大的数据集。
如果您在本地计算机中工作,请在系统规范的边界内设置内存限制。这一点非常重要。在下面的示例中,我创建了一百万行3列的一百万行是INT64,两个是float64,每个是8个字节,总共24个字节,这给了我2400万字节。
After long periods of frustration with Dask, I think I hacked the holy grail of refactoring your pandas transformations wrapped with dask.
Learning points:
Index intelligently. If you are grouping by or merging you should consider indexing the columns you use for those.
Partition and repartition intelligently. If you have a dataframe of 10k rows and another of 1m rows, they should naturally have different partitions.
Don't use dask data frame transformation methods except for example merge. The others should be in pandas code wrapped around map_partitions.
Don't accumulate too large graphs so consider saving after for example indexing or after making a complex transformation.
If possible filter the data frame and work with smaller subset you can always merge this back to the bigger data set.
If you are working in your local machine set the memory limits within the boundaries of system specifications. This point is very important. In the example below I create one million rows of 3 columns one is an int64 and two are float64 which are 8bytes each and 24bytes in total this gives me 24 million bytes.