• 教學 >
  • 分散式管線平行簡介
捷徑

分散式管線平行簡介

建立於:2024 年 7 月 9 日 | 最後更新:2024 年 12 月 12 日 | 最後驗證:2024 年 11 月 5 日

作者Howard Huang

注意

editgithub 中檢視和編輯本教學。

本教學使用 gpt 風格的 Transformer 模型來示範使用 torch.distributed.pipelining API 實作分散式管線平行。

您將學到什麼
  • 如何使用 torch.distributed.pipelining API

  • 如何將管線平行應用於 Transformer 模型

  • 如何在多個微批次上利用不同的排程

先決條件

設定

透過 torch.distributed.pipelining,我們將對模型的執行進行分割,並排程微批次的計算。我們將使用簡化版本的 Transformer 解碼器模型。該模型架構僅用於教學目的,包含多個 Transformer 解碼器層,因為我們想要示範如何將模型分割成不同的區塊。首先,讓我們定義模型

import torch
import torch.nn as nn
from dataclasses import dataclass

@dataclass
class ModelArgs:
   dim: int = 512
   n_layers: int = 8
   n_heads: int = 8
   vocab_size: int = 10000

class Transformer(nn.Module):
   def __init__(self, model_args: ModelArgs):
      super().__init__()

      self.tok_embeddings = nn.Embedding(model_args.vocab_size, model_args.dim)

      # Using a ModuleDict lets us delete layers witout affecting names,
      # ensuring checkpoints will correctly save and load.
      self.layers = torch.nn.ModuleDict()
      for layer_id in range(model_args.n_layers):
            self.layers[str(layer_id)] = nn.TransformerDecoderLayer(model_args.dim, model_args.n_heads)

      self.norm = nn.LayerNorm(model_args.dim)
      self.output = nn.Linear(model_args.dim, model_args.vocab_size)

   def forward(self, tokens: torch.Tensor):
      # Handling layers being 'None' at runtime enables easy pipeline splitting
      h = self.tok_embeddings(tokens) if self.tok_embeddings else tokens

      for layer in self.layers.values():
            h = layer(h, h)

      h = self.norm(h) if self.norm else h
      output = self.output(h).clone() if self.output else h
      return output

然後,我們需要在腳本中匯入必要的函式庫,並初始化分散式訓練程序。在本例中,我們定義了一些全域變數,以便稍後在腳本中使用

import os
import torch.distributed as dist
from torch.distributed.pipelining import pipeline, SplitPoint, PipelineStage, ScheduleGPipe

global rank, device, pp_group, stage_index, num_stages
def init_distributed():
   global rank, device, pp_group, stage_index, num_stages
   rank = int(os.environ["LOCAL_RANK"])
   world_size = int(os.environ["WORLD_SIZE"])
   device = torch.device(f"cuda:{rank}") if torch.cuda.is_available() else torch.device("cpu")
   dist.init_process_group()

   # This group can be a sub-group in the N-D parallel case
   pp_group = dist.new_group()
   stage_index = rank
   num_stages = world_size

您應該對 rankworld_sizeinit_process_group() 代碼很熟悉,因為這些代碼通常用於所有分散式程式中。管線平行專用的全域變數包括 pp_group(將用於傳送/接收通訊的處理程序群組)、stage_index(在本例中,每個階段只有一個排名,因此索引等於排名)以及 num_stages(相當於 world_size)。

num_stages 用於設定管線平行排程中將使用的階段數。例如,對於 num_stages=4,微批次需要經過 4 次正向傳播和 4 次反向傳播才能完成。stage_index 對於框架了解如何在階段之間進行通訊是必要的。例如,對於第一階段(stage_index=0),它將使用來自資料載入器的資料,並且不需要接收來自任何先前對等點的資料來執行其計算。

步驟 1:分割 Transformer 模型

有兩種不同的分割模型的方法

第一種是手動模式,我們可以透過刪除模型屬性的一部分來手動建立模型的兩個實例。在本例中,對於兩個階段(2 個排名),模型被切成兩半。

def manual_model_split(model) -> PipelineStage:
   if stage_index == 0:
      # prepare the first stage model
      for i in range(4, 8):
            del model.layers[str(i)]
      model.norm = None
      model.output = None

   elif stage_index == 1:
      # prepare the second stage model
      for i in range(4):
            del model.layers[str(i)]
      model.tok_embeddings = None

   stage = PipelineStage(
      model,
      stage_index,
      num_stages,
      device,
   )
   return stage

正如我們所看到的,第一階段沒有層正規化或輸出層,它只包含前四個 Transformer 區塊。第二階段沒有輸入嵌入層,但包含輸出層和最後四個 Transformer 區塊。然後,該函數傳回目前排名的 PipelineStage

第二種方法是基於追蹤器的模式,它會根據 split_spec 參數自動分割模型。使用管線規格,我們可以指示 torch.distributed.pipelining 在哪裡分割模型。在以下程式碼區塊中,我們在第四個 Transformer 解碼器層之前進行分割,這與上面描述的手動分割方式相同。同樣地,我們可以在完成分割後呼叫 build_stage 來檢索 PipelineStage

步驟 2:定義主要執行

在 main 函數中,我們將建立階段應遵循的特定管線排程。torch.distributed.pipelining 支援多個排程,包括每個排名單階段排程 GPipe1F1B,以及每個排名多階段排程,例如 Interleaved1F1BLoopedBFS

if __name__ == "__main__":
   init_distributed()
   num_microbatches = 4
   model_args = ModelArgs()
   model = Transformer(model_args)

   # Dummy data
   x = torch.ones(32, 500, dtype=torch.long)
   y = torch.randint(0, model_args.vocab_size, (32, 500), dtype=torch.long)
   example_input_microbatch = x.chunk(num_microbatches)[0]

   # Option 1: Manual model splitting
   stage = manual_model_split(model)

   # Option 2: Tracer model splitting
   # stage = tracer_model_split(model, example_input_microbatch)

   model.to(device)
   x = x.to(device)
   y = y.to(device)

   def tokenwise_loss_fn(outputs, targets):
      loss_fn = nn.CrossEntropyLoss()
      outputs = outputs.reshape(-1, model_args.vocab_size)
      targets = targets.reshape(-1)
      return loss_fn(outputs, targets)

   schedule = ScheduleGPipe(stage, n_microbatches=num_microbatches, loss_fn=tokenwise_loss_fn)

   if rank == 0:
      schedule.step(x)
   elif rank == 1:
      losses = []
      output = schedule.step(target=y, losses=losses)
      print(f"losses: {losses}")
   dist.destroy_process_group()

在上面的範例中,我們使用手動方法來分割模型,但可以取消註解程式碼以嘗試基於追蹤器的模型分割函數。在我們的排程中,我們需要傳入微批次的數量和用於評估目標的損失函數。

.step() 函數處理整個小批量,並根據先前傳遞的 n_microbatches 自動將其拆分為微批量。然後根據排程類別對微批量執行操作。在上面的範例中,我們使用 GPipe,它遵循一個簡單的全正向傳播然後全反向傳播的排程。從 rank 1 返回的輸出將與模型在單個 GPU 上執行且使用整個批次的輸出相同。同樣地,我們可以傳遞一個 losses 容器來儲存每個微批量的對應損失。

步驟 3:啟動分散式程序

最後,我們準備好執行腳本了。我們將使用 torchrun 創建一個單主機、2 程序的工作。我們的腳本已經以一種方式編寫,rank 0 執行 pipeline stage 0 所需的邏輯,而 rank 1 執行 pipeline stage 1 的邏輯。

torchrun --nnodes 1 --nproc_per_node 2 pipelining_tutorial.py

結論

在本教學中,我們學習了如何使用 PyTorch 的 torch.distributed.pipelining API 實現分散式管線並行。我們探索了環境設置、定義 Transformer 模型以及對其進行分區以進行分散式訓練。我們討論了兩種模型分區方法,手動和基於追蹤器,並演示了如何在不同階段排程微批量的計算。最後,我們介紹了管線排程的執行以及使用 torchrun 啟動分散式程序。

其他資源

我們已成功將 torch.distributed.pipelining 整合到 torchtitan 儲存庫中。TorchTitan 是一個乾淨、最小的程式碼庫,用於使用原生 PyTorch 進行大規模 LLM 訓練。有關管線並行的生產級用法以及與其他分散式技術的組合,請參閱 TorchTitan 端到端 3D 並行範例

文件

存取 PyTorch 的完整開發人員文件

檢視文件

教學

取得適合初學者和進階開發人員的深入教學

檢視教學

資源

尋找開發資源並獲得問題解答

檢視資源