DASK多阶段资源设置原因无法序列化错误

发布于 2025-02-06 08:41:23 字数 6211 浏览 2 评论 0 原文

使用Dask文档的确切代码 https://jobqueue.dask.org/en/latest/latest/exampleast/examples.htmples.html

如果页面更改,这是代码:

from dask_jobqueue import SLURMCluster
from distributed import Client
from dask import delayed

cluster = SLURMCluster(memory='8g',
                       processes=1,
                       cores=2,
                       extra=['--resources ssdGB=200,GPU=2'])

cluster.scale(2)
client = Client(cluster)

def step_1_w_single_GPU(data):
    return "Step 1 done for: %s" % data


def step_2_w_local_IO(data):
    return "Step 2 done for: %s" % data


stage_1 = [delayed(step_1_w_single_GPU)(i) for i in range(10)]
stage_2 = [delayed(step_2_w_local_IO)(s2) for s2 in stage_1]

result_stage_2 = client.compute(stage_2,
                                resources={tuple(stage_1): {'GPU': 1},
                                           tuple(stage_2): {'ssdGB': 100}})

这将导致此类错误:

distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File "/opt/eagleseven/pyenv/e7cloudv0/lib/python3.8/site-packages/distributed/protocol/core.py", line 76, in dumps
    frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
  File "/opt/eagleseven/pyenv/e7cloudv0/lib/python3.8/site-packages/msgpack/__init__.py", line 38, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 229, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 291, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'Delayed' object
distributed.comm.utils - ERROR - can not serialize 'Delayed' object
Traceback (most recent call last):
  File "/opt/eagleseven/pyenv/e7cloudv0/lib/python3.8/site-packages/distributed/comm/utils.py", line 33, in _to_frames
    return list(protocol.dumps(msg, **kwargs))
  File "/opt/eagleseven/pyenv/e7cloudv0/lib/python3.8/site-packages/distributed/protocol/core.py", line 76, in dumps
    frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
  File "/opt/eagleseven/pyenv/e7cloudv0/lib/python3.8/site-packages/msgpack/__init__.py", line 38, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 229, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 291, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'Delayed' object
distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
  File "/opt/eagleseven/pyenv/e7cloudv0/lib/python3.8/site-packages/distributed/batched.py", line 94, in _background_send
    nbytes = yield self.comm.write(
  File "/opt/eagleseven/pyenv/e7cloudv0/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/opt/eagleseven/pyenv/e7cloudv0/lib/python3.8/site-packages/distributed/comm/tcp.py", line 250, in write
    frames = await to_frames(
  File "/opt/eagleseven/pyenv/e7cloudv0/lib/python3.8/site-packages/distributed/comm/utils.py", line 50, in to_frames
    return _to_frames()
  File "/opt/eagleseven/pyenv/e7cloudv0/lib/python3.8/site-packages/distributed/comm/utils.py", line 33, in _to_frames
    return list(protocol.dumps(msg, **kwargs))
  File "/opt/eagleseven/pyenv/e7cloudv0/lib/python3.8/site-packages/distributed/protocol/core.py", line 76, in dumps
    frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
  File "/opt/eagleseven/pyenv/e7cloudv0/lib/python3.8/site-packages/msgpack/__init__.py", line 38, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 229, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 291, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'Delayed' object

Python verion:3.8.10 dask:2022.2.0 Dask-Jobqueue:0.7.3 这个问题是不言而喻的。设置就像文档中一样。我没有什么可以解释的,但是Stackoverflow说我的详细信息对代码太低,因此我需要写更多的内容以允许发布此问题。

Using the exact code from Dask's documentation at
https://jobqueue.dask.org/en/latest/examples.html

In case the page changes, this is the code:

from dask_jobqueue import SLURMCluster
from distributed import Client
from dask import delayed

cluster = SLURMCluster(memory='8g',
                       processes=1,
                       cores=2,
                       extra=['--resources ssdGB=200,GPU=2'])

cluster.scale(2)
client = Client(cluster)

def step_1_w_single_GPU(data):
    return "Step 1 done for: %s" % data


def step_2_w_local_IO(data):
    return "Step 2 done for: %s" % data


stage_1 = [delayed(step_1_w_single_GPU)(i) for i in range(10)]
stage_2 = [delayed(step_2_w_local_IO)(s2) for s2 in stage_1]

result_stage_2 = client.compute(stage_2,
                                resources={tuple(stage_1): {'GPU': 1},
                                           tuple(stage_2): {'ssdGB': 100}})

This results in an error of such:

distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File "/opt/eagleseven/pyenv/e7cloudv0/lib/python3.8/site-packages/distributed/protocol/core.py", line 76, in dumps
    frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
  File "/opt/eagleseven/pyenv/e7cloudv0/lib/python3.8/site-packages/msgpack/__init__.py", line 38, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 229, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 291, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'Delayed' object
distributed.comm.utils - ERROR - can not serialize 'Delayed' object
Traceback (most recent call last):
  File "/opt/eagleseven/pyenv/e7cloudv0/lib/python3.8/site-packages/distributed/comm/utils.py", line 33, in _to_frames
    return list(protocol.dumps(msg, **kwargs))
  File "/opt/eagleseven/pyenv/e7cloudv0/lib/python3.8/site-packages/distributed/protocol/core.py", line 76, in dumps
    frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
  File "/opt/eagleseven/pyenv/e7cloudv0/lib/python3.8/site-packages/msgpack/__init__.py", line 38, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 229, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 291, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'Delayed' object
distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
  File "/opt/eagleseven/pyenv/e7cloudv0/lib/python3.8/site-packages/distributed/batched.py", line 94, in _background_send
    nbytes = yield self.comm.write(
  File "/opt/eagleseven/pyenv/e7cloudv0/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/opt/eagleseven/pyenv/e7cloudv0/lib/python3.8/site-packages/distributed/comm/tcp.py", line 250, in write
    frames = await to_frames(
  File "/opt/eagleseven/pyenv/e7cloudv0/lib/python3.8/site-packages/distributed/comm/utils.py", line 50, in to_frames
    return _to_frames()
  File "/opt/eagleseven/pyenv/e7cloudv0/lib/python3.8/site-packages/distributed/comm/utils.py", line 33, in _to_frames
    return list(protocol.dumps(msg, **kwargs))
  File "/opt/eagleseven/pyenv/e7cloudv0/lib/python3.8/site-packages/distributed/protocol/core.py", line 76, in dumps
    frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
  File "/opt/eagleseven/pyenv/e7cloudv0/lib/python3.8/site-packages/msgpack/__init__.py", line 38, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 229, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 291, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'Delayed' object

Python verion: 3.8.10
dask: 2022.2.0
dask-jobqueue: 0.7.3
The problem is self-evident. Setup is just like in the documentation. There is nothing more I can explain, but stackoverflow is saying my details-to-code is too low, so I need to write more stuff to allow this question to be posted.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

为你拒绝所有暧昧 2025-02-13 08:41:23

正如@michael Delgado在评论中指出的那样,这似乎是文档的问题(升级在这里)。资源是一个词典,每个键是资源的名称和代表任务使用数量的值。

在回答相关问题时,最初的提交作者马特·罗克林(Matt Rocklin)提到此功能(指定任务级资源)经常要求,但到目前为止尚未可用: https://stackoverflow.com/a/a/63310721/10693596

一种可能性是使用注释用于图表的特定组件,请参见 this Answer

As noted by @Michael Delgado in the comments, this appears to be a problem with the documentation (raised here). Resources are a dictionary with each key being name of a resource and value representing the amount used by a task.

In an answer to a related question, Matt Rocklin, the initial commit author, mentions that this feature (specifying task-level resources) is frequently requested, but not available as of now: https://stackoverflow.com/a/63310721/10693596

One possibility is to use annotation for specific components of the graph, see this answer.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文