Dask在分支图中执行重新计算
假设,我创建以下图:
import dask
import time
@dask.delayed
def step_1():
print("Running Step 1")
time.sleep(1)
return True
@dask.delayed
def step_2(prev_step):
print("Running Step 2")
time.sleep(1)
return True
@dask.delayed
def step_3a(prev_step):
print("Running Step 3a")
time.sleep(1)
return True
@dask.delayed
def step_3b(prev_step):
print("Running Step 3b")
time.sleep(1)
return True
stp_1 = step_1()
stp_2 = step_2(stp_1)
stp_3a = step_3a(stp_2)
stp_3b = step_3b(stp_2)
from dask import visualize
visualize([stp_3a, stp_3b])
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=1, threads_per_worker=3, dashboard_address="localhost:27998")
client = Client(cluster)
client
start = time.perf_counter()
stp_3a_futures = client.compute(stp_3a) # So that the future stays in memory
stp_3a_results = client.gather(stp_3a_futures)
duration = time.perf_counter() - start
print(duration)
[Out]: 3.1600782200694084
这是有道理的。但是现在,当我执行step_3b
时,我希望它在一秒钟内完成,因为它已经计算出step_1
和step_2
。但是,不幸的是,它并不能将这两个步骤保留在内存中,并且step_3b
的计算也需要3秒:
start = time.perf_counter()
stp_3b_futures = client.compute(stp_3b) # So that the future stays in memory
stp_3b_results = client.gather(stp_3b_futures)
duration = time.perf_counter() - start
print(duration)
[Out]: 3.0438701044768095
现在,我的问题是:
- 有没有办法保留
step_2
andstep_1
在群集的内存中使用仅step_3a
的延迟对象(即,stp_3a
)?
我知道我可以在client.persist()
on stp_2
上调用client.persist()
,但这不是我想要的答案。在我的用例中,当我将计算step_3a
时,我对step_2
的延迟对象没有任何引用。
预先感谢您可以回答的人。 :)
Suppose, I create the following graph:
import dask
import time
@dask.delayed
def step_1():
print("Running Step 1")
time.sleep(1)
return True
@dask.delayed
def step_2(prev_step):
print("Running Step 2")
time.sleep(1)
return True
@dask.delayed
def step_3a(prev_step):
print("Running Step 3a")
time.sleep(1)
return True
@dask.delayed
def step_3b(prev_step):
print("Running Step 3b")
time.sleep(1)
return True
stp_1 = step_1()
stp_2 = step_2(stp_1)
stp_3a = step_3a(stp_2)
stp_3b = step_3b(stp_2)
from dask import visualize
visualize([stp_3a, stp_3b])
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=1, threads_per_worker=3, dashboard_address="localhost:27998")
client = Client(cluster)
client
Now, I compute step_3a
and it should take about 3 seconds.
start = time.perf_counter()
stp_3a_futures = client.compute(stp_3a) # So that the future stays in memory
stp_3a_results = client.gather(stp_3a_futures)
duration = time.perf_counter() - start
print(duration)
[Out]: 3.1600782200694084
This makes sense. But now, when I execute step_3b
, I expect it to finish in one second since it has already computed step_1
and step_2
. But, unfortunately, it doesn't keep those two steps in memory and the computation for step_3b
also takes 3 seconds:
start = time.perf_counter()
stp_3b_futures = client.compute(stp_3b) # So that the future stays in memory
stp_3b_results = client.gather(stp_3b_futures)
duration = time.perf_counter() - start
print(duration)
[Out]: 3.0438701044768095
Now, my question is:
- is there a way to keep
step_2
andstep_1
in cluster's memory using ONLY the delayed object ofstep_3a
(i.e.,stp_3a
)?
I know I can call client.persist()
on stp_2
but that's not the answer I'm looking for. In my use-case, when I'll be computing step_3a
, I won't have any reference to the delayed object for step_2
.
thank you in advance for those of you who can answer. :)
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
Graphain
与最近的DASK版本很好地运行:现在,计算:
The
graphchain
works well with the recent dask version:Now, the computations:
有(是?)尝试进行一些图形级缓存/回忆,您可能会发现有用:除此之外
将避免重新计算共同的要求。
There is (was?) an attempt to do some graph-level caching/memoization, that you might find useful: https://github.com/radix-ai/graphchain
Aside from that, one can wrap all the desired results in one
compute
call:This way
dask
will avoid recomputing the common requirements.