Dask在分支图中执行重新计算

发布于 2025-02-04 13:46:52 字数 2318 浏览 5 评论 0原文

假设,我创建以下图:

import dask
import time


@dask.delayed
def step_1():
    print("Running Step 1")
    time.sleep(1)
    return True

@dask.delayed
def step_2(prev_step):
    print("Running Step 2")
    time.sleep(1)
    return True

@dask.delayed
def step_3a(prev_step):
    print("Running Step 3a")
    time.sleep(1)
    return True

@dask.delayed
def step_3b(prev_step):
    print("Running Step 3b")
    time.sleep(1)
    return True
stp_1 = step_1()
stp_2 = step_2(stp_1)
stp_3a = step_3a(stp_2)
stp_3b = step_3b(stp_2)
from dask import visualize

visualize([stp_3a, stp_3b])

from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=1, threads_per_worker=3, dashboard_address="localhost:27998")
client = Client(cluster)
client


start = time.perf_counter()

stp_3a_futures = client.compute(stp_3a) # So that the future stays in memory

stp_3a_results = client.gather(stp_3a_futures)

duration = time.perf_counter() - start

print(duration)
[Out]: 3.1600782200694084

这是有道理的。但是现在,当我执行step_3b时,我希望它在一秒钟内完成,因为它已经计算出step_1step_2。但是,不幸的是,它并不能将这两个步骤保留在内存中,并且step_3b的计算也需要3秒:


start = time.perf_counter()

stp_3b_futures = client.compute(stp_3b) # So that the future stays in memory

stp_3b_results = client.gather(stp_3b_futures)

duration = time.perf_counter() - start

print(duration)

[Out]: 3.0438701044768095

现在,我的问题是:

  • 有没有办法保留step_2 and step_1在群集的内存中使用 step_3a的延迟对象(即,stp_3a)?

我知道我可以在client.persist() on stp_2上调用client.persist(),但这不是我想要的答案。在我的用例中,当我将计算step_3a时,我对step_2的延迟对象没有任何引用。

预先感谢您可以回答的人。 :)

Suppose, I create the following graph:

import dask
import time


@dask.delayed
def step_1():
    print("Running Step 1")
    time.sleep(1)
    return True

@dask.delayed
def step_2(prev_step):
    print("Running Step 2")
    time.sleep(1)
    return True

@dask.delayed
def step_3a(prev_step):
    print("Running Step 3a")
    time.sleep(1)
    return True

@dask.delayed
def step_3b(prev_step):
    print("Running Step 3b")
    time.sleep(1)
    return True
stp_1 = step_1()
stp_2 = step_2(stp_1)
stp_3a = step_3a(stp_2)
stp_3b = step_3b(stp_2)
from dask import visualize

visualize([stp_3a, stp_3b])

branched computation graph

from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=1, threads_per_worker=3, dashboard_address="localhost:27998")
client = Client(cluster)
client

Now, I compute step_3a and it should take about 3 seconds.


start = time.perf_counter()

stp_3a_futures = client.compute(stp_3a) # So that the future stays in memory

stp_3a_results = client.gather(stp_3a_futures)

duration = time.perf_counter() - start

print(duration)
[Out]: 3.1600782200694084

This makes sense. But now, when I execute step_3b, I expect it to finish in one second since it has already computed step_1 and step_2. But, unfortunately, it doesn't keep those two steps in memory and the computation for step_3b also takes 3 seconds:


start = time.perf_counter()

stp_3b_futures = client.compute(stp_3b) # So that the future stays in memory

stp_3b_results = client.gather(stp_3b_futures)

duration = time.perf_counter() - start

print(duration)

[Out]: 3.0438701044768095

Now, my question is:

  • is there a way to keep step_2 and step_1 in cluster's memory using ONLY the delayed object of step_3a (i.e., stp_3a)?

I know I can call client.persist() on stp_2 but that's not the answer I'm looking for. In my use-case, when I'll be computing step_3a, I won't have any reference to the delayed object for step_2.

thank you in advance for those of you who can answer. :)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

梦中楼上月下 2025-02-11 13:46:52

Graphain与最近的DASK版本很好地运行:

from time import sleep

from dask import delayed
from dask.config import set as dask_set
from graphchain import optimize


@delayed
def step_1():
    print("Running Step 1")
    sleep(1)
    return True


@delayed
def step_2(prev_step):
    print("Running Step 2")
    sleep(1)
    return True


@delayed
def step_3a(prev_step):
    print("Running Step 3a")
    sleep(1)
    return True


@delayed
def step_3b(prev_step):
    print("Running Step 3b")
    sleep(1)
    return True


stp_1 = step_1()
stp_2 = step_2(stp_1)
stp_3a = step_3a(stp_2)
stp_3b = step_3b(stp_2)

现在,计算:

%time stp_3a.compute()
# Running Step 1
# Running Step 2
# Running Step 3a
# CPU times: user 330 ms, sys: 14.3 ms, total: 344 ms
# Wall time: 3.01 s

%time stp_3b.compute()
# Running Step 1
# Running Step 2
# Running Step 3b
# CPU times: user 6.4 ms, sys: 3.03 ms, total: 9.43 ms
# Wall time: 3.01 s

with dask_set(delayed_optimize=optimize):
    
    %time stp_3a.compute()
    # Running Step 1
    # Running Step 2
    # Running Step 3a
    # CPU times: user 364 ms, sys: 20.8 ms, total: 385 ms
    # Wall time: 3.04 s
    
    %time stp_3b.compute()
    # Running Step 3b
    # CPU times: user 5 ms, sys: 2.97 ms, total: 7.97 ms
    # Wall time: 1.01 s

The graphchain works well with the recent dask version:

from time import sleep

from dask import delayed
from dask.config import set as dask_set
from graphchain import optimize


@delayed
def step_1():
    print("Running Step 1")
    sleep(1)
    return True


@delayed
def step_2(prev_step):
    print("Running Step 2")
    sleep(1)
    return True


@delayed
def step_3a(prev_step):
    print("Running Step 3a")
    sleep(1)
    return True


@delayed
def step_3b(prev_step):
    print("Running Step 3b")
    sleep(1)
    return True


stp_1 = step_1()
stp_2 = step_2(stp_1)
stp_3a = step_3a(stp_2)
stp_3b = step_3b(stp_2)

Now, the computations:

%time stp_3a.compute()
# Running Step 1
# Running Step 2
# Running Step 3a
# CPU times: user 330 ms, sys: 14.3 ms, total: 344 ms
# Wall time: 3.01 s

%time stp_3b.compute()
# Running Step 1
# Running Step 2
# Running Step 3b
# CPU times: user 6.4 ms, sys: 3.03 ms, total: 9.43 ms
# Wall time: 3.01 s

with dask_set(delayed_optimize=optimize):
    
    %time stp_3a.compute()
    # Running Step 1
    # Running Step 2
    # Running Step 3a
    # CPU times: user 364 ms, sys: 20.8 ms, total: 385 ms
    # Wall time: 3.04 s
    
    %time stp_3b.compute()
    # Running Step 3b
    # CPU times: user 5 ms, sys: 2.97 ms, total: 7.97 ms
    # Wall time: 1.01 s
好听的两个字的网名 2025-02-11 13:46:52

有(是?)尝试进行一些图形级缓存/回忆,您可能会发现有用:除此之外

results = dask.compute([stp_3a, stp_3b])

​将避免重新计算共同的要求。

There is (was?) an attempt to do some graph-level caching/memoization, that you might find useful: https://github.com/radix-ai/graphchain

Aside from that, one can wrap all the desired results in one compute call:

results = dask.compute([stp_3a, stp_3b])

This way dask will avoid recomputing the common requirements.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文