返回介绍

数学基础

统计学习

深度学习

工具

Scala

一、基础概念

发布于 2023-07-17 23:38:23 字数 57038 浏览 0 评论 0 收藏 0

  1. Accelerate 是一个库,只需添加四行代码,就可以在任何分布式 configuration 中运行相同的 PyTorch 代码:

    ​x
    + from accelerate import Accelerator
    + accelerator = Accelerator()
    
    
    + model, optimizer, training_dataloader, scheduler = accelerator.prepare(
    +     model, optimizer, training_dataloader, scheduler
    + )
    
    
      for batch in training_dataloader:
          optimizer.zero_grad()
          inputs, targets = batch
          inputs = inputs.to(device)
          targets = targets.to(device)
          outputs = model(inputs)
          loss = loss_function(outputs, targets)
    +     accelerator.backward(loss)
          optimizer.step()
          scheduler.step()

    上述代码可以通过 AccelerateCLI 接口在任何系统上启动:

    
    
    xxxxxxxxxx
    accelerate launch {my_script.py}
  2. 安装:

    
    
    xxxxxxxxxx
    pip install accelerate conda install -c conda-forge accelerate pip install git+https://github.com/huggingface/accelerate
  3. 配置:

    
    
    xxxxxxxxxx
    accelerate config

    然后 accelerate 会向你询问一些问题从而生成配置。

    检查配置:

    
    
    xxxxxxxxxx
    accelerate env
  4. 为了使用 Accelerate,你只需要修改四件事:

    • 首先,导入 Accelerator 并创建一个 accelerator 对象:

      
      
      xxxxxxxxxx
      from accelerate import Accelerator accelerator = Accelerator()
    • 然后,移除针对你的模型和输入数据的所有 .to(device).cuda() 的调用。accelerator 将会为你正确处理这个问题,并为你把所有这些对象放在正确的设备上。

      如果你知道你在做什么,你可以保留那些 .to(device) 的调用,但你应该使用 accelerator 对象提供的设备: accelerator.device

      要完全停用自动的 device placement ,在初始化 Accelerator 时传递 device_placement=False

    • 接着,将所有与训练有关的对象(optimizer, model, training dataloader, learning rate scheduler )传递给accelerator.prepare() 方法。这将确保一切都为训练做好准备。

      
      
      xxxxxxxxxx
      model, optimizer, train_dataloader, lr_scheduler = accelerator.prepare( model, optimizer, train_dataloader, lr_scheduler )

      具体而言,training dataloader 将被分片到所有可用的 GPU/TPU 核心上,这样每个设备都能看到训练数据集的不同部分。此外,所有进程的随机数状态将在每次迭代开始时通过 dataloader 进行同步,以确保数据以相同的方式被混洗(如果你决定使用shuffle=True 或任何类型的 random sampler )。

      训练的实际 batch size 将是使用的设备数量乘以你在脚本中设置的 batch size 。另外,你可以在创建 accelerator 对象时使用 split_batches=True 参半,此时无论你在多少个 GPU 上运行你的脚本,实际 batch size 都会保持不变。

      你需要再开始实际的 training loop 之前执行 accelerator.prepare()

      只有当 scheduler 需要再每个 optimizer step 中被 stepped 时,才需要把 learning rate scheduler 传递给 prepare()

      任何获取 training dataloader length 的方法(例如,你需要记录 total training step)都应该在 accelerator.prepare() 之后进行。

      你可能想、也可能不想把你的validation dataloader 发送到 prepare() ,这取决于你是否想运行分布式评估。

    • 最后,用 accelerator.backward(loss) 替换 loss.backward()

    现在,你的脚本将在你的本地机器上运行,也可以在多个 GPUTPU 上运行。你可以使用你喜欢的工具来启动分布式训练,或者你可以使用 Accelerate launcher 启动。

  5. 分布式评估:

    • 可以进行常规评估,此时你需要将 validation dataloader 保持在 accelerator.prepare() 之外。并且,你需要手动将 input 数据放在 accelerator.device 上。

    • 也可以进行分布式评估,此时你需要将 validation dataloader 放置在 accelerator.prepare() 之内:

      
      
      xxxxxxxxxx
      validation_dataloader = accelerator.prepare(validation_dataloader)

      就像 training dataloader,这意味着在分布式评估时,每个设备将仅看到部分 evaluation 数据。这意味着你需要把 predictions 进行 group 。可以通过 accelerator.gather_for_metrics() 方法来实现:

      
      
      xxxxxxxxxx
      for inputs, targets in validation_dataloader: predictions = model(inputs) # Gather all predictions and targets all_predictions, all_targets = accelerator.gather_for_metrics((predictions, targets)) # Example of use with a Datasets.Metric metric.add_batch(all_predictions, all_targets)

      类似 training dataloader ,把 validation dataloader 传入 prepare() 可能会改变该 dataloader :如果你在n$ n $ 个 GPU 上运行,则它的长度将被除以n$ n $ (因为你的实际 batch size 将被乘以n$ n $ ),除非你设置 split_batches=True

      任何获取 validation dataloader length 的方法都应该在 accelerator.prepare() 之后进行。

      数据集末尾的一些数据可能是重复的,所以这个 batch 的数据可以平均分配给所有的工作者。因此,应该通过gather_for_metrics() 方法计算指标,以便在收集时自动删除重复的数据。如果出于某种原因,你不希望自动完成这项工作,可以用 accelerator.gather() 来收集所有进程的数据,然后手动完成。

      gather()gather_for_metrics() 要求每个进程上的张量是相同尺寸的。如果你在每个进程上有不同尺寸的张量(例如,当动态填充到一个 batch 的最大长度时),你应该使用 accelerator.gather.pad_across_processes() 方法将张量填充到跨进程的最大尺寸。

  6. 启动分布式脚本:你可以使用常规命令来启动你的分布式训练(如 PyTorchtorch.distributed.launch ),它们与 Accelerate 完全兼容。这里唯一需要注意的是: Accelerate 使用 environment 来确定所有有用的信息,所以 torch.distributed.launch 应与标志 --use_env 一起使用。

    Accelerate 还提供了一个 CLI 工具,它统一了所有的 launcher ,所以你只需要记住一个命令:

    
    
    xxxxxxxxxx
    accelerate config

    你需要回答问题,然后 Accelerate 将在你的 cache folder 创建一个 default_config.yaml 文件。这个缓存目录是(根据优先级递减):

    • 环境变量 HF_HOME 的内容,以 accelerate 为后缀。
    • 如果不存在,则环境变量 XDG_CACHE_HOME 的内容,以 huggingface/accelerate 为后缀。
    • 如果也不存在,则为 ~/.cache/huggingface/accelerate

    你也可以通过标志 --config_file 来指定你要保存的文件的位置。

    然后,你可以通过运行来测试你的设置是否一切顺利:

    
    
    xxxxxxxxxx
    accelerate test

    这将启动一个简短的脚本,测试分布式环境。你也可以在测试期间指定配置文件的位置:

    
    
    xxxxxxxxxx
    accelerate test --config_file path_to_config.yaml

    如果测试通过,你可以通过如下的命令来执行你的脚本:

    
    
    xxxxxxxxxx
    accelerate launch path_to_script.py --args_for_the_script

    也可以指定配置文件的位置:

    
    
    xxxxxxxxxx
    accelerate launch --config_file path_to_config.yaml path_to_script.py --args_for_the_script
  7. notebook 中启动:在 Accelerate 0.3.0 中引入了一个 notebook_launcher() 从而帮助你在 notebook 上启动训练。

    只要在 notebook 的一个 cell 中定义一个负责整个 train and/or evaluation 的函数,然后用以下代码执行一个 cell

    
    
    xxxxxxxxxx
    from accelerate import notebook_launcher notebook_launcher(training_function)

    注意:你的 Accelerator 对象应该只在 training_function 中定义,这是因为初始化应该只在 launcher 内完成。

  8. TPU 上训练:如果你想在 TPU 上启动你的脚本,有一些注意事项是你应该注意的。在幕后,TPU 将为你的 training step (前向传播、反向传播、以及 optimizer step )中发生的所有操作创建一个 graph 。这就是为什么你的第一个训练步总是非常长,因为建立和编译这个 graph 需要一些时间。

    好消息是,这个编译将被缓存,所以第二步和所有后续的 step 将更快。坏消息是,这只适用于你的所有 step 做完全相同的操作,这意味着:

    • 所有 batch 必须有用相同的张量尺寸。
    • 必须使用静态的代码(即,如果单个 step 中存在循环,那么循环次数在每个 step 必须相同)。

    如果上述任何一项在两个 step 之间发生变化,都会触发新的编译,这将再次花费大量时间。在实践中,这意味着:你必须特别注意让你的输入中的所有张量具有相同的形状(所以没有动态填充),并且不应该使用具有 for 循环的层,其中 for 循环的根据 input 的不同而具有不同长度(如 LSTM)。否则,训练会慢得令人难受。

    可以针对 TPU 执行一些特殊的代码:

    
    
    xxxxxxxxxx
    from accelerate import DistributedType if accelerator.distributed_type == DistributedType.TPU: # do something of static shape else: # go crazy and be dynamic

    最后要注意的是:如果你的模型有 tied weight (比如语言模型将 embedding matrix 的权重与 decoder 的权重绑定),将这个模型移动到 TPU (无论是你自己移动、还是由 prepare() 移动)会破坏绑定。你将需要在之后重新绑定权重。

  9. 在单个进程上执行的语句:有些语句只需要在特定的进程上执行而无需在所有进程上执行,如数据下载、记录日志、以及打印进度条。此时可以执行:

    
    
    xxxxxxxxxx
    if accelerator.is_local_main_process: # Is executed once per server from tqdm.auto import tqdm progress_bar = tqdm(range(args.max_train_steps), disable=not accelerator.is_local_main_process)

    local 意思是每台机器上运行:如果你在两台服务器上训练,其中每台服务器有几个 GPU ,则代码将在每台服务器上执行一次。

    如果你希望对所有进程仅执行一次(如,上传模型到 model hub),则可以执行:

    
    
    xxxxxxxxxx
    if accelerator.is_main_process: # Is executed once only

    对于 print 语句,你希望在每台机器上执行一次,则可以用 accelerator.print 代替 print 函数。

  10. 延迟执行:当你运行你的常规脚本时,指令是按顺序执行的。使用 Accelerate 在几个 GPU 上同时部署你的脚本会带来一个复杂的问题:虽然每个进程都是按顺序执行所有指令,但有些可能比其他的快。

    你可能需要等待所有进程达到一定程度后再执行某条指令。例如,在确定每个进程都完成了训练之前,你不应该保存一个模型。要做到这一点,可以执行:

    
    
    xxxxxxxxxx
    accelerator.wait_for_everyone()

    这条指令将阻塞所有先到的进程,直到所有其他进程都到达该点(如果你只在一个 GPUCPU 上运行你的脚本,这不会有任何作用)。

  11. 保存/加载模型:保存训练好的模型可能需要一些调整:

    • 首先,你应该等待所有的进程到达脚本中的 “延迟执行” 所描述的那个点。

    • 然后,你应该在保存模型之前 unwrap 你的模型。这是因为在通过 prepare() 方法时,你的模型可能被 wrap 从而用于分布式训练。如:

      
      
      xxxxxxxxxx
      accelerator.wait_for_everyone() unwrapped_model = accelerator.unwrap_model(model) accelerator.save(unwrapped_model.state_dict(), filename)

      如果你的脚本包含加载 checkpoint 的逻辑,我们也建议你在 unwrapped model 中加载你的权重(这只在 prepare() 后使用加载函数时有用)。如:

      
      
      xxxxxxxxxx
      unwrapped_model = accelerator.unwrap_model(model) unwrapped_model.load_state_dict(torch.load(filename))
  12. 保存/加载整个状态:当训练你的模型时,你可能想保存模型、优化器、随机数生成器、以及潜在的 LR scheduler 的当前状态,以便在同一个脚本中恢复训练。你可以分别使用 save_state()load_state() 来做到这一点,只需简单地传入一个保存位置。

    如果你通过 register_for_checkpointing() 注册了任何其他需要存储的 stateful item ,它们也会被保存和/或加载。

    示例:

    
    
    xxxxxxxxxx
    from accelerate import Accelerator import torch accelerator = Accelerator() my_scheduler = torch.optim.lr_scheduler.StepLR(my_optimizer, step_size=1, gamma=0.99) my_model, my_optimizer, my_training_dataloader = accelerate.prepare(my_model, my_optimizer, my_training_dataloader) # Register the LR scheduler accelerate.register_for_checkpointing(my_scheduler) # Save the starting state accelerate.save_state("my/save/path") device = accelerator.device my_model.to(device) # Perform training for epoch in range(num_epochs): for batch in my_training_dataloader: my_optimizer.zero_grad() inputs, targets = batch inputs = inputs.to(device) targets = targets.to(device) outputs = my_model(inputs) loss = my_loss_function(outputs, targets) accelerator.backward(loss) my_optimizer.step() my_scheduler.step() # Restore previous state accelerate.load_state("my/save/path")
  13. 梯度裁剪:如果你在脚本中使用梯度剪裁,你应该把对 torch.nn.utils.clip_grad_norm_torch.nn.utils.clip_grad_value_ 的调用分别替换为 accelerator.clipgrad_norm()accelerator.clipgrad_value()

  14. 混合精度训练:如果你用 Accelerate 在混合精度下训练,那么模型内的计算将以混合精度进行,而模型外的每一次计算都将以 full precision 执行。例如,loss 的计算通常在模型外,且涉及 softmax 。然而,你可能想把你的 loss 计算放在 accelerator.autocast上下文管理器中:

    
    
    xxxxxxxxxx
    with accelerator.autocast(): loss = complex_loss_function(outputs, target)

    混合精度训练的另一个注意事项是:梯度会在开始时跳过一些更新,有时在训练过程中也会跳过。这是因为动态损失缩放 dynamic loss scaling 策略,在训练过程中会有一些时刻,梯度已经溢出,loss scaling factor 会减少,从而避免在下一步再次发生这种情况。

    这意味着你可能会在没有梯度更新的时候就更新你的 learning rate scheduler 。这在一般情况下是没有问题的,但是当你的训练数据非常少,或者你的 scheduler 的第一个学习率值非常重要时,可能会有影响。在这种情况下,你可以跳过 learning rate scheduler 的更新:

    
    
    xxxxxxxxxx
    if not accelerator.optimizer_step_was_skipped: lr_scheduler.step()
  15. 梯度累积:要执行梯度累积,请使用 accumulate() 并指定 gradient_accumulation_steps 。在多设备训练时,这也会自动确保梯度同步或不同步,检查是否真的应该执行该 step ,并自动计算损失:

    
    
    xxxxxxxxxx
    accelerator = Accelerator(gradient_accumulation_steps=2) model, optimizer, training_dataloader = accelerator.prepare(model, optimizer, training_dataloader) for input, label in training_dataloader: with accelerator.accumulate(model): predictions = model(input) loss = loss_function(predictions, label) accelerator.backward(loss) optimizer.step() scheduler.step() optimizer.zero_grad()

    相比之下,传统的梯度累加方法会用更冗长的代码:

    
    
    xxxxxxxxxx
    + from accelerate import Accelerator + accelerator = Accelerator() + model, optimizer, training_dataloader, scheduler = accelerator.prepare( + model, optimizer, training_dataloader, scheduler + ) for index, batch in enumerate(training_dataloader): inputs, targets = batch - inputs = inputs.to(device) - targets = targets.to(device) outputs = model(inputs) loss = loss_function(outputs, targets) loss = loss / gradient_accumulation_steps + accelerator.backward(loss) if (index+1) % gradient_accumulation_steps == 0: optimizer.step() scheduler.step() optimizer.zero_grad()
  16. DeepSpeedDeepSpeed 支持是实验性的,所以底层 API 将在不久的将来发展,可能会有一些轻微的破坏性变化。具体而言, Accelerate 还不支持你自己编写的 DeepSpeed 配置,这将在下一个版本中添加。

  17. 使用 accelerate launch

    
    
    xxxxxxxxxx
    accelerate launch {script_name.py} --arg1 --arg2 ...

    指定单个 GPU

    
    
    xxxxxxxxxx
    CUDA_VISIBLE_DEVICES="0" accelerate launch {script_name.py} --arg1 --arg2 ...

    在两个 GPU 上混合精度训练:

    
    
    xxxxxxxxxx
    accelerate launch --multi_gpu --mixed_precision=fp16 --num_processes=2 {script_name.py} {--arg1} {--arg2} ...

    建议总是在 accelerate launch 之前执行 accelerate config ,这样就无需再 accelerate launch 中指定各种配置。

  18. notebooklaunch

    • 确保任何使用 CUDA 的代码在一个函数中,该函数被传递给 notebook_launcher()
    • 设置 num_processes 为训练的设备数量(如,GPU, CPU, TPU 数量)。
    • 如果使用 TPU ,在 training loop 函数之外声明你的模型。

    如:

    
    
    xxxxxxxxxx
    from accelerate import notebook_launcher args = ("fp16", 42, 64) notebook_launcher(training_loop, args, num_processes=2)

    对于 TPU

    
    
    xxxxxxxxxx
    model = create_model("resnet50d", pretrained=True, num_classes=len(label_to_id)) args = (model, "fp16", 42, 64) notebook_launcher(training_loop, args, num_processes=8)
  19. 启用 FSDP

    • 首先进行配置:accelerate configFSDP 配置的一个例子:

      
      
      xxxxxxxxxx
      compute_environment: LOCAL_MACHINE deepspeed_config: {} distributed_type: FSDP downcast_bf16: 'no' fsdp_config: fsdp_auto_wrap_policy: TRANSFORMER_BASED_WRAP fsdp_backward_prefetch_policy: BACKWARD_PRE fsdp_offload_params: false fsdp_sharding_strategy: 1 fsdp_state_dict_type: FULL_STATE_DICT fsdp_transformer_layer_cls_to_wrap: GPT2Block machine_rank: 0 main_process_ip: null main_process_port: null main_training_function: main mixed_precision: 'no' num_machines: 1 num_processes: 2 use_cpu: false

      然后开始训练:

      
      
      xxxxxxxxxx
      accelerate launch examples/nlp_example.py
    • 这些配置参数的含义为:

      • Sharding Strategy

        • FULL_SHARD:对 optimizer states, gradients, parameters 都进行分片。
        • SHARD_GRAD_OP:仅对 optimizer states, gradients 进行分片。
        • NO_SHARD:不进行分片。
      • Offload Params:一个布尔值,指定是否将 parametersgradients 卸载到 CPU

      • Auto Wrap Policy:可以为 TRANSFORMER_BASED_WRAP, SIZE_BASED_WRAP, NO_WRAP

      • Transformer Layer Class to Wrap:当使用 TRANSFORMER_BASED_WRAP 时,指定特定的 transformer layer class name (大小写敏感)从而执行 wrap 。如 BertLayer, GPTJBlock, T5Block,...

      • Min Num Params:使用 SIZE_BASED_WRAP 的最小参数数量。

      • Backward Prefetch:可以为 BACKWARD_PRE, BACKWARD_POST, NO_PREFETCH

      • State Dict Type:可以为 FULL_STATE_DICT, LOCAL_STATE_DICT, SHARDED_STATE_DICT

    • 有几个需要注意的地方:

      • PyTorch FSDP 会自动 wrap 子模块,对参数进行扁平化处理,并将参数分片。由于这个原因,任何在 model wrapping 之前创建的 optimizer 都会被破坏,并占用更多的内存。因此,强烈建议在创建 optimizer 之前准备好模型,这也是很有效的。Accelerate 将自动 wrap 模型,并在单个模型的情况下为你创建一个优化器,并发出警告信息:

        
        
        xxxxxxxxxx
        FSDP Warning: When using FSDP, it is efficient and recommended to call prepare for the model before creating the optimizer.

        下面是使用 FSDP 时准备模型和优化器的推荐方法:

        
        
        xxxxxxxxxx
        model = AutoModelForSequenceClassification.from_pretrained("bert-base-cased", return_dict=True) + model = accelerator.prepare(model) optimizer = torch.optim.AdamW(params=model.parameters(), lr=lr) - model, optimizer, train_dataloader, eval_dataloader, lr_scheduler = accelerator.prepare( - model, optimizer, train_dataloader, eval_dataloader, lr_scheduler - ) + optimizer, train_dataloader, eval_dataloader, lr_scheduler = accelerator.prepare( + optimizer, train_dataloader, eval_dataloader, lr_scheduler + )
      • 在单个模型的情况下,如果你用多个 parameter groups创建了优化器,并且用它们一起调用 prepare ,那么 parameter groups 将被丢失,并显示以下警告:

        
        
        xxxxxxxxxx
        FSDP Warning: When using FSDP, several parameter groups will be conflated into a single one due to nested module wrapping and parameter flattening.

        这是因为,由于嵌套的 FSDP 模块的参数扁平化为一维数组,在 wrapping 前创建的 parameter groupswrapping 后将没有意义。

      • 在有多个模型的情况下,有必要在创建优化器之前准备好模型,否则会抛出一个错误。然后将优化器以与相应模型相同的顺序传递给 prepare() 方法,否则 accelerator.save_state()accelerator.load_state() 将导致错误/意外的行为。

      • 这个功能与 Transformers libraryrun_translation.py 脚本中的 --predict_with_generate 不兼容。

    • 对于更多的控制,用户可以利用 FullyShardedDataParallelPlugin 。在创建这个类的实例后,用户可以把它传递给 Accelerator 类的实例。

  20. 启用 DeepSpeedDeepSpeed 实现了 ZeRO 论文中描述的一切。目前,它提供了如下的支持:Optimizer state partitioningZeRO stage 1)、Gradient partitioningZeRO stage 2)、Parameter partitioningZeRO stage 3)、Custom mixed precision training handling、一系列基于 CUDA 扩展的快速优化器、ZeRO-OffloadCPUDisk/NVMe

    DeepSpeed ZeRO-2 主要只用于训练,因为它的功能对推理没有用处。DeepSpeed ZeRO-3 也可以用于推理,因为它允许在多个 GPU 上加载巨大的模型。

    • Accelerate 通过两种方式集成 DeepSpeed

      • 通过 deepspeed 配置文件来集成。它支持 DeepSpeed 的所有核心功能,并为用户提供了很大的灵活性。用户可能需要根据配置来改变几行代码。
      • 通过 deepspeed_plugin 来集成。这支持 DeepSpeed 功能的子集,并对其余的配置使用默认选项。用户不需要改变任何代码。
    • 什么被集成了?

      • 训练:DeepSpeed ZeRO 训练支持完整的 ZeRO stages 1, 2 and 3、以及 optimizer states, gradients and parametersCPU/Disk offload

        • Stage 1:将 optimizer states 分片到数据并行 workers/GPUs 上。
        • Stage 2:将 optimizer states + gradients 分片到数据并行 workers/GPUs 上。
        • Stage 3:将 optimizer states + gradients + model parameters 分片到数据并行 workers/GPUs 上。
        • Optimizer Offload:将 optimizer states + gradients 卸载到 CPU/Disk ,建立在 ZERO Stage 2 之上。
        • Param Offload:将 model parameters 卸载到 CPU/Disk ,建立在 ZERO Stage 3 之上。

        注意:关于 Disk Offload ,磁盘应该是 NVME 的,以便有好的速度,但技术上可以在任何磁盘上工作。

      • 推断:DeepSpeed ZeRO Inference 支持 ZeRO Stage 3ZeRO-Infinity 。它使用与训练相同的 ZeRO 协议,但它不使用优化器和 lr scheduler

    • 如何工作:

      • 首先安装 DeepSpeed version >=0.6.5

      • 然后配置:accelerate config 。一个配置的例子:

        
        
        xxxxxxxxxx
        compute_environment: LOCAL_MACHINE deepspeed_config: gradient_accumulation_steps: 1 gradient_clipping: 1.0 offload_optimizer_device: none offload_param_device: none zero3_init_flag: true zero_stage: 2 distributed_type: DEEPSPEED fsdp_config: {} machine_rank: 0 main_process_ip: null main_process_port: null main_training_function: main mixed_precision: fp16 num_machines: 1 num_processes: 2 use_cpu: false
      • 最后执行训练:accelerate launch examples/nlp_example.py

    • 配置参数的含义:

      • zero_stage0 表示禁用,1 表示 optimizer state partitioning2 表示 optimizer+gradient state partitioning3 表示 optimizer+gradient+parameter partitioning
      • gradient_accumulation_steps:一个整数,表示在 averagingapplying 这些梯度之前,积累梯度的 training steps 数量。
      • gradient_clipping:一个浮点数,指定启用梯度剪裁的值。
      • offload_optimizer_devicenone 表示禁用 optimizer offloadingcpu 表示 offload optimizerCPUnvme 表示offload optimizerNVMe SSD 。仅适用于 ZeRO >= Stage-2
      • offload_param_devicenone 表示禁用 parameter offloadingcpu 表示 offload parameterCPUnvme 表示offload parameterNVMe SSD 。仅适用于 ZeRO Stage-3
      • zero3_init_flag:决定是否启用 deepspeed.zero.Init 来构建大规模模型。只适用于 ZeRO Stage-3
      • zero3_save_16bit_model:决定是否在使用 ZeRO Stage-3 时保存 16 位模型权重。
      • mixed_precisionno 用于 FP32 训练,fp16 用于 FP16 混合精度训练,bf16用于 BF16 混合精度训练。
    • 当使用配置文件时,需要修改一些代码:

      • DeepSpeed Optimizers and Schedulers

        • 如果是 DeepSpeed Optim + DeepSpeed Scheduler :用户必须使用 enhance.utils.DummyOptimenhance.utils.DummyScheduler 来取代他们代码中的 PyTorch/Custom 优化器和调度器:

          
          
          xxxxxxxxxx
          # Creates Dummy Optimizer if `optimizer` was spcified in the config file else creates Adam Optimizer optimizer_cls = ( torch.optim.AdamW if accelerator.state.deepspeed_plugin is None or "optimizer" not in accelerator.state.deepspeed_plugin.deepspeed_config else DummyOptim ) optimizer = optimizer_cls(optimizer_grouped_parameters, lr=args.learning_rate) # Creates Dummy Scheduler if `scheduler` was spcified in the config file else creates `args.lr_scheduler_type` Scheduler if ( accelerator.state.deepspeed_plugin is None or "scheduler" not in accelerator.state.deepspeed_plugin.deepspeed_config ): lr_scheduler = get_scheduler( name=args.lr_scheduler_type, optimizer=optimizer, num_warmup_steps=args.num_warmup_steps, num_training_steps=args.max_train_steps, ) else: lr_scheduler = DummyScheduler( optimizer, total_num_steps=args.max_train_steps, warmup_num_steps=args.num_warmup_steps )
        • Custom Optim + Custom Scheduler:当 DeepSpeed 配置文件中没有 optimizer keyscheduler key 的情况。在这种情况下,不需要用户修改代码,通过 DeepSpeed Plugin 使用集成时就是这种情况。

        • Custom Optim + DeepSpeed Scheduler :这种情况下,用户必须使用accelerate.utils.DummyScheduler 来替换代码中的 PyTorch/Custom scheduler

        • DeepSpeed Optim + Custom Scheduler:这将导致一个错误,因为当使用 DeepSpeed Optim 时必须使用 DeepSpeed Scheduler

      • DeepSpeed 配置文件中存在一些 "auto" 值,这些值是由 prepare 方法根据所提供的模型、dataloadersdummy optimizerdummy schedulers 自动处理的。那些不是 "auto" 的字段必须由用户明确指定。如 zero_stage2_config.json 文件:

        
        
        xxxxxxxxxx
        { "fp16": { "enabled": true, "loss_scale": 0, "loss_scale_window": 1000, "initial_scale_power": 16, "hysteresis": 2, "min_loss_scale": 1 }, "optimizer": { "type": "AdamW", "params": { "lr": "auto", "weight_decay": "auto", "torch_adam": true, "adam_w_mode": true } }, "scheduler": { "type": "WarmupDecayLR", "params": { "warmup_min_lr": "auto", "warmup_max_lr": "auto", "warmup_num_steps": "auto", "total_num_steps": "auto" } }, "zero_optimization": { "stage": 2, "allgather_partitions": true, "allgather_bucket_size": 2e8, "overlap_comm": true, "reduce_scatter": true, "reduce_bucket_size": "auto", "contiguous_gradients": true }, "gradient_accumulation_steps": 1, "gradient_clipping": "auto", "steps_per_print": 2000, "train_batch_size": "auto", "train_micro_batch_size_per_gpu": "auto", "wall_clock_breakdown": false }
    • 保存和加载:

      • 对于 ZeRO Stage-1ZeRO Stage-2,模型的保存和加载不需要改动。

      • 对于 ZeRO Stage-3state_dict 仅只包含占位符,因为模型的权重被分片到多个 GPUZeRO Stage-3 有两个选项:

        • 保存整个 16 位的模型权重,然后使用 model.load_state_dict(torch.load(pytorch_model.bin)) 来直接加载。为此,要么在 DeepSpeed 配置文件中把 zero_optimization.stage3_gather_16bit_weights_on_model_save 设为True ,要么在 DeepSpeed Plugin 中把 zero3_save_16bit_model 设为 True

          请注意,这个选项需要在一个 GPU 上整合权重,这可能会很慢,而且对内存要求很高,所以只有在需要时才使用这个功能。

          示例:

          
          
          xxxxxxxxxx
          unwrapped_model = accelerator.unwrap_model(model) # New Code # # Saves the whole/unpartitioned fp16 model when in ZeRO Stage-3 to the output directory if # `stage3_gather_16bit_weights_on_model_save` is True in DeepSpeed Config file or # `zero3_save_16bit_model` is True in DeepSpeed Plugin. # For Zero Stages 1 and 2, models are saved as usual in the output directory. # The model name saved is `pytorch_model.bin` unwrapped_model.save_pretrained( args.output_dir, is_main_process=accelerator.is_main_process, save_function=accelerator.save, state_dict=accelerator.get_state_dict(model), )
        • 为了获得 32 位的权重,首先使用 model.save_checkpoint() 保存模型:

          
          
          xxxxxxxxxx
          success = model.save_checkpoint(PATH, ckpt_id, checkpoint_state_dict) status_msg = "checkpointing: PATH={}, ckpt_id={}".format(PATH, ckpt_id) if success: logging.info(f"Success {status_msg}") else: logging.warning(f"Failure {status_msg}")

          这将在 checkpoint 目录下创建 ZeRO modeloptimizerpartitions 以及 zero_to_fp32.py 脚本。你可以使用这个脚本来做离线整合,这不需要配置文件或 GPU 。如:

          
          
          xxxxxxxxxx
          cd /path/to/checkpoint_dir ./zero_to_fp32.py . pytorch_model.bin # Processing zero checkpoint at global_step1 # Detected checkpoint of type zero stage 3, world_size: 2 # Saving fp32 state dict to pytorch_model.bin (total_numel=60506624)

          要想加载 32 位的模型,做法如下:

          
          
          xxxxxxxxxx
          from deepspeed.utils.zero_to_fp32 import load_state_dict_from_zero_checkpoint unwrapped_model = accelerator.unwrap_model(model) fp32_model = load_state_dict_from_zero_checkpoint(unwrapped_model, checkpoint_dir)

          如果你仅仅想得到 state_dict,做法如下:

          
          
          xxxxxxxxxx
          from deepspeed.utils.zero_to_fp32 import get_fp32_state_dict_from_zero_checkpoint state_dict = get_fp32_state_dict_from_zero_checkpoint(checkpoint_dir)

          注意,加载时需要大约 2 倍于 final checkpoint 大小的内存。

    • ZeRO InferenceDeepSpeed ZeRO Inference 支持 ZeRO stage 3 。通过 accelerate 的集成,你只需要 prepare 模型和 dataloader ,如下:

      
      
      xxxxxxxxxx
      model, eval_dataloader = accelerator.prepare(model, eval_dataloader)
    • 注意事项:

      • 目前的集成不支持 DeepSpeedPipeline Parallelism
      • 当前的集成不支持 mpu ,限制了 Megatron-LM 中支持的张量并行。
      • 目前的集成不支持多个模型。
  21. 目前 Accelerate 支持如下的 trackerTensorBoard, WandB, CometML, MLFlow ,如:

    
    
    xxxxxxxxxx
    from accelerate import Accelerator from accelerate.utils import LoggerType accelerator = Accelerator(log_with="all") # For all available trackers in the environment accelerator = Accelerator(log_with="wandb") accelerator = Accelerator(log_with=["wandb", LoggerType.TENSORBOARD])

    然后需要初始化 tracker

    
    
    xxxxxxxxxx
    hps = {"num_iterations": 5, "learning_rate": 1e-2} accelerator.init_trackers("my_project", config=hps)

    然后记录日志:

    
    
    xxxxxxxxxx
    accelerator.log({"train_loss": 1.12, "valid_loss": 0.8}, step=1)

    最后在训练结束时调用:accelerator.end_training()

    你也可以通过 accelerator.get_tracker 来获取内置的 tracker 对象:

    
    
    xxxxxxxxxx
    wandb_tracker = accelerator.get_tracker("wandb") if accelerator.is_main_process: wandb_run.log_artifact(some_artifact_to_log)
  22. 处理大模型:常规的加载预训练模型的方式:

    
    
    xxxxxxxxxx
    import torch my_model = ModelClass(...) # step 1 state_dict = torch.load(checkpoint_file) # step 2 my_model.load_state_dict(state_dict) # step 3

    这对于常规大小的模型而言很有效,但是无法处理大型模型:在 step 1 我们在 RAM 中加载一个完整版本的模型,并花一些时间随机初始化权重(这将在 step 3 被丢弃);在 step 2 ,我们在 RAM 中加载另一个完整版本的模型,并使用预训练的权重。

    Accelerate 提供一些工具来帮助处理大模型(这些 API 是实验性质的,未来可能会发生改变):

    • init_empty_weights 上下文管理器:初始化一个模型而无需使用任何内存。这依赖于 PyTorch 1.9 中引入的 meta device

      
      
      xxxxxxxxxx
      from accelerate import init_empty_weights with init_empty_weights(): my_model = ModelClass(...)

      在该上下文管理器中,每当有一个 parameter 被创建时,它就被立即移动到 meta device

    • sharded checkpoints:有可能你的模型太大从而无法装入内存,这并不意味着它不能被加载:如果你有一个或几个 GPU ,这就有更多的内存可用于存储你的模型。此时需要你的 checkpoint 被拆分为几个小文件,即 checkpoint shards

      Accelerate 将处理 checkpoint shards ,但是要满足如下格式:你的 checkpoint shards 应该放在一个文件夹中,并且有几个包含部分 state dict 的文件、以及一个 index.json 文件(将 parameter name 映射到包含该 parameter weights 的文件)。如:

      
      
      xxxxxxxxxx
      first_state_dict.bin index.json second_state_dict.bin

      其中 index.json 内容为:

      
      
      xxxxxxxxxx
      { "linear1.weight": "first_state_dict.bin", "linear1.bias": "first_state_dict.bin", "linear2.weight": "second_state_dict.bin", "linear2.bias": "second_state_dict.bin" }
    • load_checkpoint_and_dispatch:在 empty model 中加载一个 checkpoint 。它支持 full checkpoints (包含整个 state dict 的单一文件)以及 sharded checkpoints 。它还会在你可用的设备( GPUCPU )上自动分配这些权重,所以如果你正在加载一个 sharded checkpoints ,最大的RAM 用量将是最大分片的大小。

      例如:

      
      
      xxxxxxxxxx
      git clone https://huggingface.co/sgugger/sharded-gpt-j-6B cd sharded-gpt-j-6B git-lfs install git pull

      初始化模型:

      
      
      xxxxxxxxxx
      from accelerate import init_empty_weights from transformers import AutoConfig, AutoModelForCausalLM checkpoint = "EleutherAI/gpt-j-6B" config = AutoConfig.from_pretrained(checkpoint) with init_empty_weights(): model = AutoModelForCausalLM.from_config(config)

      加载权重:

      
      
      xxxxxxxxxx
      from accelerate import load_checkpoint_and_dispatch model = load_checkpoint_and_dispatch( model, "sharded-gpt-j-6B", device_map="auto", no_split_module_classes=["GPTJBlock"] )

      通过 device_map="auto"Accelerate 根据可用资源自动决定将模型的每一层放在哪里:

      • 首先,我们使用 GPU 上的最大可用空间。
      • 如果我们仍然需要空间,我们将剩余的权重存储在 CPU 上。
      • 如果没有足够的 RAM ,我们将剩余的权重作为内存映射的张量存储在硬盘上。

      no_split_module_classes=["GPTJBlock"] 表示属于 GPTJBlock 的模块不应该在不同的设备上分割。你应该在这里设置所有包括某种残差连接的 block

      可以通过 model.hf_device_map 查看模型的权重的设备。

  23. 分布式训练的复现:

    • 设置随机数种子:

      
      
      xxxxxxxxxx
      from accelerate import set_seed set_seed(42)

      它在内部设置了五种随机数种子:

      
      
      xxxxxxxxxx
      random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) torch.cuda.manual_seed_all(seed) # ^^ safe to call this function even if cuda is not available if is_tpu_available(): xm.set_rng_state(seed)
    • 设置 batch size:当使用 Accelerate 训练时,传递给 dataloaderbatch sizebatch size/GPU ,因此 final batch sizebatch size * device num

    • 设置学习率:学习率应该和 device num 成正比,如:

      
      
      xxxxxxxxxx
      learning_rate = 1e-3 accelerator = Accelerator() learning_rate *= accelerator.num_processes optimizer = AdamW(params=model.parameters(), lr=learning_rate)
  24. 梯度同步:在 DDP 中,PyTorch 在一些特定的点上进行进程间通信。然而在梯度累积时,你会累积 nloss 并跳过 .backward() 。这可能会导致明显的减速,因为所有的进程都需要与它们进行更多次的通信。

    可以通过 no_sync 上下文管理器来避免:

    
    
    xxxxxxxxxx
    ddp_model, dataloader = accelerator.prepare(model, dataloader) for index, batch in enumerate(dataloader): inputs, targets = batch # Trigger gradient synchronization on the last batch if index != (len(dataloader)-1): - with ddp_model.no_sync(): + with accelerator.no_sync(model): # Gradients only accumulate outputs = ddp_model(inputs) loss = loss_func(outputs, targets) accelerator.backward(loss) else: # Gradients finally sync outputs = ddp_model(inputs) loss = loss_func(outputs) accelerator.backward(loss)

    或者直接使用 accelerator.accumulate

    
    
    xxxxxxxxxx
    ddp_model, dataloader = accelerator.prepare(model, dataloader) for batch in dataloader: with accelerator.accumulate(model): optimizer.zero_grad() inputs, targets = batch outputs = model(inputs) loss = loss_function(outputs, targets) accelerator.backward(loss)
  25. 进程间同步:

    
    
    xxxxxxxxxx
    accelerator.wait_for_everyone()

    这将阻塞所有进程直到所有进程都达到该点。

    用途:

    • 加载数据集:

      
      
      xxxxxxxxxx
      with accelerator.main_process_first(): datasets = load_dataset("glue", "mrpc")

      这等价于:

      
      
      xxxxxxxxxx
      # First do something on the main process if accelerator.is_main_process: datasets = load_dataset("glue", "mrpc") else: accelerator.wait_for_everyone() # And then send it to the rest of them if not accelerator.is_main_process: datasets = load_dataset("glue", "mrpc") else: accelerator.wait_for_everyone()
    • 存取 state_dict

      
      
      xxxxxxxxxx
      if accelerator.is_main_process: model = accelerator.unwrap_model(model) torch.save(model.state_dict(), "weights.pth") with accelerator.main_process_first(): state = torch.load("weights.pth") model.load_state_dict(state)
    • global main 进程上 tokenizing,然后传播到每个 worker

      
      
      xxxxxxxxxx
      datasets = load_dataset("glue", "mrpc") with accelerator.main_process_first(): tokenized_datasets = datasets.map( tokenize_function, batched=True, remove_columns=["idx", "sentence1", "sentence2"], )

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文