Pytorch Python 分布式多重处理:收集/连接不同长度/大小的张量数组

发布于 2025-01-13 05:49:52 字数 410 浏览 1 评论 0原文

如果多个 GPU 级别上有不同长度的张量数组,则默认的 all_gather 方法将不起作用,因为它要求长度相同。

例如,如果您:

if gpu == 0:
    q = torch.tensor([1.5, 2.3], device=torch.device(gpu))
else:
    q = torch.tensor([5.3], device=torch.device(gpu))

如果我需要按如下方式收集这两个张量数组:

all_q = [torch.tensor([1.5, 2.3], torch.tensor[5.3])

默认 torch.all_gather 不起作用,因为长度 2, 1 不同。

If you have tensor arrays of different lengths across several gpu ranks, the default all_gather method does not work as it requires the lengths to be same.

For example, if you have:

if gpu == 0:
    q = torch.tensor([1.5, 2.3], device=torch.device(gpu))
else:
    q = torch.tensor([5.3], device=torch.device(gpu))

If I need to gather these two tensor arrays as follows:

all_q = [torch.tensor([1.5, 2.3], torch.tensor[5.3])

the default torch.all_gather does not work as the lengths, 2, 1 are different.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

凝望流年 2025-01-20 05:49:52

由于无法直接使用内置方法进行收集,因此我们需要按照以下步骤编写自定义函数:

  1. 使用 dist.all_gather 获取所有数组的大小。
  2. 找到最大尺寸。
  3. 使用零/常量将本地数组填充到最大大小。
  4. 使用 dist.all_gather 获取所有填充数组。
  5. 使用步骤 1 中找到的大小取消添加的零/常量。

下面的函数执行此操作:

def all_gather(q, ws, device):
    """
    Gathers tensor arrays of different lengths across multiple gpus
    
    Parameters
    ----------
        q : tensor array
        ws : world size
        device : current gpu device
        
    Returns
    -------
        all_q : list of gathered tensor arrays from all the gpus

    """
    local_size = torch.tensor(q.size(), device=device)
    all_sizes = [torch.zeros_like(local_size) for _ in range(ws)]
    dist.all_gather(all_sizes, local_size)
    max_size = max(all_sizes)

    size_diff = max_size.item() - local_size.item()
    if size_diff:
        padding = torch.zeros(size_diff, device=device, dtype=q.dtype)
        q = torch.cat((q, padding))

    all_qs_padded = [torch.zeros_like(q) for _ in range(ws)]
    dist.all_gather(all_qs_padded, q)
    all_qs = []
    for q, size in zip(all_qs_padded, all_sizes):
        all_qs.append(q[:size])
    return all_qs

一旦我们能够执行上述操作,我们就可以轻松地使用 torch.cat 进一步连接成一个如果需要数组:

torch.cat(all_q)
[torch.tensor([1.5, 2.3, 5.3])

改编自:github

As it is not directly possible to gather using built in methods, we need to write custom function with the following steps:

  1. Use dist.all_gather to get sizes of all arrays.
  2. Find the max size.
  3. Pad local array to max size using zeros/constants.
  4. Use dist.all_gather to get all padded arrays.
  5. Unpad the added zeros/constants using sizes found in step 1.

The below function does this:

def all_gather(q, ws, device):
    """
    Gathers tensor arrays of different lengths across multiple gpus
    
    Parameters
    ----------
        q : tensor array
        ws : world size
        device : current gpu device
        
    Returns
    -------
        all_q : list of gathered tensor arrays from all the gpus

    """
    local_size = torch.tensor(q.size(), device=device)
    all_sizes = [torch.zeros_like(local_size) for _ in range(ws)]
    dist.all_gather(all_sizes, local_size)
    max_size = max(all_sizes)

    size_diff = max_size.item() - local_size.item()
    if size_diff:
        padding = torch.zeros(size_diff, device=device, dtype=q.dtype)
        q = torch.cat((q, padding))

    all_qs_padded = [torch.zeros_like(q) for _ in range(ws)]
    dist.all_gather(all_qs_padded, q)
    all_qs = []
    for q, size in zip(all_qs_padded, all_sizes):
        all_qs.append(q[:size])
    return all_qs

Once, we are able to do the above, we can then easily use torch.cat to further concatenate into a single array if needed:

torch.cat(all_q)
[torch.tensor([1.5, 2.3, 5.3])

Adapted from: github

旧情勿念 2025-01-20 05:49:52

这是 @omsrisagar 解决方案的扩展,支持任意维张量(不仅仅是一维张量)。

def all_gather_nd(tensor):
    """
    Gathers tensor arrays of different lengths in a list.
    The length dimension is 0. This supports any number of extra dimensions in the tensors.
    All the other dimensions should be equal between the tensors.

    Args:
        tensor (Tensor): Tensor to be broadcast from current process.

    Returns:
        (Tensor): output list of tensors that can be of different sizes
    """
    world_size = dist.get_world_size()
    local_size = torch.tensor(tensor.size(), device=tensor.device)
    all_sizes = [torch.zeros_like(local_size) for _ in range(world_size)]
    dist.all_gather(all_sizes, local_size)

    max_length = max(size[0] for size in all_sizes)

    length_diff = max_length.item() - local_size[0].item()
    if length_diff:
        pad_size = (length_diff, *tensor.size()[1:])
        padding = torch.zeros(pad_size, device=tensor.device, dtype=tensor.dtype)
        tensor = torch.cat((tensor, padding))

    all_tensors_padded = [torch.zeros_like(tensor) for _ in range(world_size)]
    dist.all_gather(all_tensors_padded, tensor)
    all_tensors = []
    for tensor_, size in zip(all_tensors_padded, all_sizes):
        all_tensors.append(tensor_[:size[0]])
    return all_tensors

请注意,这要求所有张量具有相同的维数,并且除第一个维度外的所有维度都相等。

Here is an extension of @omsrisagar's solution that supports tensors of any number of dimensions (not only 1-dimensional tensors).

def all_gather_nd(tensor):
    """
    Gathers tensor arrays of different lengths in a list.
    The length dimension is 0. This supports any number of extra dimensions in the tensors.
    All the other dimensions should be equal between the tensors.

    Args:
        tensor (Tensor): Tensor to be broadcast from current process.

    Returns:
        (Tensor): output list of tensors that can be of different sizes
    """
    world_size = dist.get_world_size()
    local_size = torch.tensor(tensor.size(), device=tensor.device)
    all_sizes = [torch.zeros_like(local_size) for _ in range(world_size)]
    dist.all_gather(all_sizes, local_size)

    max_length = max(size[0] for size in all_sizes)

    length_diff = max_length.item() - local_size[0].item()
    if length_diff:
        pad_size = (length_diff, *tensor.size()[1:])
        padding = torch.zeros(pad_size, device=tensor.device, dtype=tensor.dtype)
        tensor = torch.cat((tensor, padding))

    all_tensors_padded = [torch.zeros_like(tensor) for _ in range(world_size)]
    dist.all_gather(all_tensors_padded, tensor)
    all_tensors = []
    for tensor_, size in zip(all_tensors_padded, all_sizes):
        all_tensors.append(tensor_[:size[0]])
    return all_tensors

Note that this requires that all the tensors have the same number of dimensions and have all their dimensions equal, except for the first dimension.

红尘作伴 2025-01-20 05:49:52

自 PyTorch 1.6.0 以来,填充数据不必要引入了接受可变形状的 all_to_all

import torch
import torch.distributed as dist


def all_gather_vlen(tensor: torch.Tensor, group=None) -> list[torch.Tensor]:
    """Gather tensors with the same number of dimensions but different lengths."""
    world_size = dist.get_world_size(group=group)
    # Gather lengths first
    shape = torch.as_tensor(tensor.shape, device=tensor.device)
    shapes = [torch.empty_like(shape) for _ in range(world_size)]
    dist.all_gather(shapes, shape, group=group)
    # Gather data
    inputs = [tensor] * world_size
    outputs = [
        torch.empty(*_shape, dtype=tensor.dtype, device=tensor.device)
        for _shape in shapes
    ]
    dist.all_to_all(outputs, inputs, group=group)
    return outputs

如果张量具有不同的维数:

def all_gather_vdim(tensor: torch.Tensor, group=None) -> list[torch.Tensor]:
    """Gather tensors with different number of dimensions."""
    world_size = dist.get_world_size(group=group)
    # Gather shapes first
    shapes = all_gather_vlen(
        torch.as_tensor(tensor.shape, device=tensor.device), group=group
    )
    # Gather data
    inputs = [tensor] * world_size
    outputs = [
        torch.empty(*_shape, dtype=tensor.dtype, device=tensor.device)
        for _shape in shapes
    ]
    dist.all_to_all(outputs, inputs, group=group)
    return outputs

Padding data has been unnecessary, since PyTorch 1.6.0 introduced all_to_all which accepts variable shapes:

import torch
import torch.distributed as dist


def all_gather_vlen(tensor: torch.Tensor, group=None) -> list[torch.Tensor]:
    """Gather tensors with the same number of dimensions but different lengths."""
    world_size = dist.get_world_size(group=group)
    # Gather lengths first
    shape = torch.as_tensor(tensor.shape, device=tensor.device)
    shapes = [torch.empty_like(shape) for _ in range(world_size)]
    dist.all_gather(shapes, shape, group=group)
    # Gather data
    inputs = [tensor] * world_size
    outputs = [
        torch.empty(*_shape, dtype=tensor.dtype, device=tensor.device)
        for _shape in shapes
    ]
    dist.all_to_all(outputs, inputs, group=group)
    return outputs

If tensors have different numbers of dimensions:

def all_gather_vdim(tensor: torch.Tensor, group=None) -> list[torch.Tensor]:
    """Gather tensors with different number of dimensions."""
    world_size = dist.get_world_size(group=group)
    # Gather shapes first
    shapes = all_gather_vlen(
        torch.as_tensor(tensor.shape, device=tensor.device), group=group
    )
    # Gather data
    inputs = [tensor] * world_size
    outputs = [
        torch.empty(*_shape, dtype=tensor.dtype, device=tensor.device)
        for _shape in shapes
    ]
    dist.all_to_all(outputs, inputs, group=group)
    return outputs
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文