分散式管線平行簡介¶
建立於:2024 年 7 月 9 日 | 最後更新:2024 年 12 月 12 日 | 最後驗證:2024 年 11 月 5 日
作者:Howard Huang
注意
在 github 中檢視和編輯本教學。
本教學使用 gpt 風格的 Transformer 模型來示範使用 torch.distributed.pipelining API 實作分散式管線平行。
如何使用
torch.distributed.pipelining
API如何將管線平行應用於 Transformer 模型
如何在多個微批次上利用不同的排程
熟悉 PyTorch 中的 基本分散式訓練
設定¶
透過 torch.distributed.pipelining
,我們將對模型的執行進行分割,並排程微批次的計算。我們將使用簡化版本的 Transformer 解碼器模型。該模型架構僅用於教學目的,包含多個 Transformer 解碼器層,因為我們想要示範如何將模型分割成不同的區塊。首先,讓我們定義模型
import torch
import torch.nn as nn
from dataclasses import dataclass
@dataclass
class ModelArgs:
dim: int = 512
n_layers: int = 8
n_heads: int = 8
vocab_size: int = 10000
class Transformer(nn.Module):
def __init__(self, model_args: ModelArgs):
super().__init__()
self.tok_embeddings = nn.Embedding(model_args.vocab_size, model_args.dim)
# Using a ModuleDict lets us delete layers witout affecting names,
# ensuring checkpoints will correctly save and load.
self.layers = torch.nn.ModuleDict()
for layer_id in range(model_args.n_layers):
self.layers[str(layer_id)] = nn.TransformerDecoderLayer(model_args.dim, model_args.n_heads)
self.norm = nn.LayerNorm(model_args.dim)
self.output = nn.Linear(model_args.dim, model_args.vocab_size)
def forward(self, tokens: torch.Tensor):
# Handling layers being 'None' at runtime enables easy pipeline splitting
h = self.tok_embeddings(tokens) if self.tok_embeddings else tokens
for layer in self.layers.values():
h = layer(h, h)
h = self.norm(h) if self.norm else h
output = self.output(h).clone() if self.output else h
return output
然後,我們需要在腳本中匯入必要的函式庫,並初始化分散式訓練程序。在本例中,我們定義了一些全域變數,以便稍後在腳本中使用
import os
import torch.distributed as dist
from torch.distributed.pipelining import pipeline, SplitPoint, PipelineStage, ScheduleGPipe
global rank, device, pp_group, stage_index, num_stages
def init_distributed():
global rank, device, pp_group, stage_index, num_stages
rank = int(os.environ["LOCAL_RANK"])
world_size = int(os.environ["WORLD_SIZE"])
device = torch.device(f"cuda:{rank}") if torch.cuda.is_available() else torch.device("cpu")
dist.init_process_group()
# This group can be a sub-group in the N-D parallel case
pp_group = dist.new_group()
stage_index = rank
num_stages = world_size
您應該對 rank
、world_size
和 init_process_group()
代碼很熟悉,因為這些代碼通常用於所有分散式程式中。管線平行專用的全域變數包括 pp_group
(將用於傳送/接收通訊的處理程序群組)、stage_index
(在本例中,每個階段只有一個排名,因此索引等於排名)以及 num_stages
(相當於 world_size)。
num_stages
用於設定管線平行排程中將使用的階段數。例如,對於 num_stages=4
,微批次需要經過 4 次正向傳播和 4 次反向傳播才能完成。stage_index
對於框架了解如何在階段之間進行通訊是必要的。例如,對於第一階段(stage_index=0
),它將使用來自資料載入器的資料,並且不需要接收來自任何先前對等點的資料來執行其計算。
步驟 1:分割 Transformer 模型¶
有兩種不同的分割模型的方法
第一種是手動模式,我們可以透過刪除模型屬性的一部分來手動建立模型的兩個實例。在本例中,對於兩個階段(2 個排名),模型被切成兩半。
def manual_model_split(model) -> PipelineStage:
if stage_index == 0:
# prepare the first stage model
for i in range(4, 8):
del model.layers[str(i)]
model.norm = None
model.output = None
elif stage_index == 1:
# prepare the second stage model
for i in range(4):
del model.layers[str(i)]
model.tok_embeddings = None
stage = PipelineStage(
model,
stage_index,
num_stages,
device,
)
return stage
正如我們所看到的,第一階段沒有層正規化或輸出層,它只包含前四個 Transformer 區塊。第二階段沒有輸入嵌入層,但包含輸出層和最後四個 Transformer 區塊。然後,該函數傳回目前排名的 PipelineStage
。
第二種方法是基於追蹤器的模式,它會根據 split_spec
參數自動分割模型。使用管線規格,我們可以指示 torch.distributed.pipelining
在哪裡分割模型。在以下程式碼區塊中,我們在第四個 Transformer 解碼器層之前進行分割,這與上面描述的手動分割方式相同。同樣地,我們可以在完成分割後呼叫 build_stage
來檢索 PipelineStage
。
步驟 2:定義主要執行¶
在 main 函數中,我們將建立階段應遵循的特定管線排程。torch.distributed.pipelining
支援多個排程,包括每個排名單階段排程 GPipe
和 1F1B
,以及每個排名多階段排程,例如 Interleaved1F1B
和 LoopedBFS
。
if __name__ == "__main__":
init_distributed()
num_microbatches = 4
model_args = ModelArgs()
model = Transformer(model_args)
# Dummy data
x = torch.ones(32, 500, dtype=torch.long)
y = torch.randint(0, model_args.vocab_size, (32, 500), dtype=torch.long)
example_input_microbatch = x.chunk(num_microbatches)[0]
# Option 1: Manual model splitting
stage = manual_model_split(model)
# Option 2: Tracer model splitting
# stage = tracer_model_split(model, example_input_microbatch)
model.to(device)
x = x.to(device)
y = y.to(device)
def tokenwise_loss_fn(outputs, targets):
loss_fn = nn.CrossEntropyLoss()
outputs = outputs.reshape(-1, model_args.vocab_size)
targets = targets.reshape(-1)
return loss_fn(outputs, targets)
schedule = ScheduleGPipe(stage, n_microbatches=num_microbatches, loss_fn=tokenwise_loss_fn)
if rank == 0:
schedule.step(x)
elif rank == 1:
losses = []
output = schedule.step(target=y, losses=losses)
print(f"losses: {losses}")
dist.destroy_process_group()
在上面的範例中,我們使用手動方法來分割模型,但可以取消註解程式碼以嘗試基於追蹤器的模型分割函數。在我們的排程中,我們需要傳入微批次的數量和用於評估目標的損失函數。
.step()
函數處理整個小批量,並根據先前傳遞的 n_microbatches
自動將其拆分為微批量。然後根據排程類別對微批量執行操作。在上面的範例中,我們使用 GPipe,它遵循一個簡單的全正向傳播然後全反向傳播的排程。從 rank 1 返回的輸出將與模型在單個 GPU 上執行且使用整個批次的輸出相同。同樣地,我們可以傳遞一個 losses
容器來儲存每個微批量的對應損失。
步驟 3:啟動分散式程序¶
最後,我們準備好執行腳本了。我們將使用 torchrun
創建一個單主機、2 程序的工作。我們的腳本已經以一種方式編寫,rank 0 執行 pipeline stage 0 所需的邏輯,而 rank 1 執行 pipeline stage 1 的邏輯。
torchrun --nnodes 1 --nproc_per_node 2 pipelining_tutorial.py
結論¶
在本教學中,我們學習了如何使用 PyTorch 的 torch.distributed.pipelining
API 實現分散式管線並行。我們探索了環境設置、定義 Transformer 模型以及對其進行分區以進行分散式訓練。我們討論了兩種模型分區方法,手動和基於追蹤器,並演示了如何在不同階段排程微批量的計算。最後,我們介紹了管線排程的執行以及使用 torchrun
啟動分散式程序。
其他資源¶
我們已成功將 torch.distributed.pipelining
整合到 torchtitan 儲存庫中。TorchTitan 是一個乾淨、最小的程式碼庫,用於使用原生 PyTorch 進行大規模 LLM 訓練。有關管線並行的生產級用法以及與其他分散式技術的組合,請參閱 TorchTitan 端到端 3D 並行範例。