用Xarray -Compute()结果延迟的dask仍然延迟

发布于 2025-01-27 14:01:54 字数 770 浏览 3 评论 0原文

我试图在两个数据集上使用Dask和Xarray进行一些分析(例如AVG),然后计算两个结果之间的差异。

这是我的代码,

cluster = LocalCluster(n_workers=5, threads_per_worker=3, **worker_kwargs)

def calc_avg(path):
    
    mean = xr.open_mfdataset( path,combine='nested', concat_dim="time", parallel=True, decode_times=False, decode_cf=False)['var'].sel(lat=slice(south,north), lon=slice(west,east)).mean(dim='time')
    return mean

def diff_(x,y):
    return x-y

p1 = "/path/to/first/multi-file/dataset"
p2 = "/path/to/second/multi-file/dataset"

a = dask.delayed(calc_avg)(p1)  
b = dask.delayed(calc_avg)(p2)
total = dask.delayed(diff_)(a,b)
result = total.compute()

此处的执行时间是17s。

但是,绘制结果(result.plot())需要超过1分钟,因此似乎在尝试绘制结果时实际上发生了计算。

这是使用DASK延迟的正确方法吗?

I tried to perform with Dask and xarray some analysis (e.g. avg) over two datasets, then compute a difference between the two results.

This is my code

cluster = LocalCluster(n_workers=5, threads_per_worker=3, **worker_kwargs)

def calc_avg(path):
    
    mean = xr.open_mfdataset( path,combine='nested', concat_dim="time", parallel=True, decode_times=False, decode_cf=False)['var'].sel(lat=slice(south,north), lon=slice(west,east)).mean(dim='time')
    return mean

def diff_(x,y):
    return x-y

p1 = "/path/to/first/multi-file/dataset"
p2 = "/path/to/second/multi-file/dataset"

a = dask.delayed(calc_avg)(p1)  
b = dask.delayed(calc_avg)(p2)
total = dask.delayed(diff_)(a,b)
result = total.compute()

The executiuon time here is 17s.

However, plotting the result (result.plot()) takes more than 1 min, so it seems that the calculation actually happens when trying to plot the result.

Is this the proper way to use Dask delayed?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

漫雪独思 2025-02-03 14:01:54

您正在打电话给 > ,它本身就是延迟函数中的Dask操作。因此,当您调用result.com pupute时,您正在执行函数calc_avg平均值。但是,calc_avg返回DASK支持的数据。是的,17S任务将计划的延迟 calc_avgearne的dask图将其定为计划dask.array open_mfdataset和Array OPS的DASK图。

要解决此问题,请放下延迟的包装器,然后简单地使用dask.Array Xarray工作流程:

a = calc_avg(p1)  # this is already a dask array because
                  # calc_avg calls open_mfdataset
b = calc_avg(p2)  # so is this
total = a - b     # dask understands array math, so this "just works"
result = total.compute()    # execute the scheduled job

请参阅 xarray指南与dask的并行计算指南进行简介。

You’re wrapping a call to xr.open_mfdataset, which is itself a dask operation, in a delayed function. So when you call result.compute, you’re executing the functions calc_avg and mean. However, calc_avg returns a dask-backed DataArray. So yep, the 17s task converts the scheduled delayed dask graph of calc_avg and mean into a scheduled dask.array dask graph of open_mfdataset and array ops.

To resolve this, drop the delayed wrappers and simply use the dask.array xarray workflow:

a = calc_avg(p1)  # this is already a dask array because
                  # calc_avg calls open_mfdataset
b = calc_avg(p2)  # so is this
total = a - b     # dask understands array math, so this "just works"
result = total.compute()    # execute the scheduled job

See the xarray guide to parallel computing with dask for an introduction.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文