管線並行¶
注意
torch.distributed.pipelining
目前處於 alpha 狀態並正在開發中。 API 變更是有可能的。 它從 PiPPy 專案遷移而來。
為什麼需要管線並行?¶
管線並行是深度學習的 基本 並行方法之一。 它允許對模型的 執行 進行分割,以便多個 微批次 可以同時執行模型程式碼的不同部分。 管線並行對於以下情況可能是一種有效的技術:
大規模訓練
頻寬受限的叢集
大型模型推論
以上場景的一個共同點是,每個裝置的計算量無法隱藏傳統並行的通訊,例如 FSDP 的權重全收集。
torch.distributed.pipelining
是什麼?¶
雖然管線並行對於擴展很有前景,但它通常難以實現,因為它除了模型權重之外,還需要對模型的 執行進行分割。 執行分割通常需要對您的模型進行侵入式程式碼變更。 複雜性的另一個方面來自於 在分散式環境中排程微批次,並考慮到 資料流依賴性。
pipelining
套件提供了一個工具組,能夠 自動 執行上述功能,方便在 一般 模型上實作管線並行 (pipeline parallelism)。
它包含兩個部分:一個 分割前端 和一個 分散式運行時。分割前端會直接接收你的模型程式碼,將其分割成「模型分割區 (model partitions)」,並捕捉資料流關係。分散式運行時則會在不同的裝置上平行執行管線階段 (pipeline stages),處理微批次分割 (micro-batch splitting)、排程 (scheduling)、通訊和梯度傳播等事宜。
總體而言,pipelining
套件提供以下功能:
根據簡單的規範分割模型程式碼。
豐富的管線排程支援,包括 GPipe、1F1B、交錯式 1F1B (Interleaved 1F1B) 和迴圈 BFS (Looped BFS),並提供編寫自定義排程的基礎架構。
一流的跨主機管線並行支援,因為管線並行通常用於此(透過較慢的互連)。
可與其他 PyTorch 並行技術組合使用,例如資料並行 (DDP、FSDP) 或張量並行。 TorchTitan 專案展示了 Llama 模型上的「3D 並行」應用。
步驟 1:建立 PipelineStage
¶
在使用 PipelineSchedule
之前,我們需要建立 PipelineStage
物件,以封裝在該階段中執行的模型部分。 PipelineStage
負責分配通訊緩衝區,並建立 send/recv 操作以與其對等節點進行通訊。它管理中間緩衝區,例如尚未消耗的前向傳播輸出,並提供一個實用工具來執行階段模型的反向傳播。
PipelineStage
需要知道階段模型的輸入和輸出形狀,以便它可以正確地分配通訊緩衝區。形狀必須是靜態的,例如,在運行時,形狀不能一步一步地改變。如果運行時形狀與預期形狀不符,將會引發 PipeliningShapeError
類別。在與其他並行處理或應用混合精度組合時,必須考慮這些技術,以便 PipelineStage
知道階段模組在運行時輸出的正確形狀(和 dtype)。
使用者可以直接建構 PipelineStage
實例,方法是傳入一個 nn.Module
,該模組代表應在該階段運行的模型部分。 這可能需要變更原始模型程式碼。 請參閱選項 1:手動分割模型中的範例。
或者,分割前端可以使用圖分割自動將您的模型分割成一系列 nn.Module
。 此技術要求可以使用 torch.Export
追蹤模型。 將產生的 nn.Module
與其他並行技術組合使用是實驗性的,可能需要一些變通方法。 如果使用者無法輕易變更模型程式碼,則使用此前端可能更具吸引力。 有關更多資訊,請參閱選項 2:自動分割模型。
步驟 2:使用 PipelineSchedule
執行¶
我們現在可以將 PipelineStage
連接到管線排程,並使用輸入資料執行排程。 這是一個 GPipe 範例
from torch.distributed.pipelining import ScheduleGPipe
# Create a schedule
schedule = ScheduleGPipe(stage, n_microbatches)
# Input data (whole batch)
x = torch.randn(batch_size, in_dim, device=device)
# Run the pipeline with input `x`
# `x` will be divided into microbatches automatically
if rank == 0:
schedule.step(x)
else:
output = schedule.step()
請注意,上述程式碼需要在每個 worker 上啟動,因此我們使用啟動器服務來啟動多個進程
torchrun --nproc_per_node=2 example.py
分割模型的選項¶
選項 1:手動分割模型¶
為了直接建構 PipelineStage
,使用者負責提供一個單一的 nn.Module
實例,該實例擁有相關的 nn.Parameters
和 nn.Buffers
,並定義一個 forward()
方法來執行該階段的相關操作。 例如,Torchtitan 中定義的 Transformer 類別的精簡版本顯示了建構易於分割模型的模式。
class Transformer(nn.Module):
def __init__(self, model_args: ModelArgs):
super().__init__()
self.tok_embeddings = nn.Embedding(...)
# Using a ModuleDict lets us delete layers without affecting names,
# ensuring checkpoints will correctly save and load.
self.layers = torch.nn.ModuleDict()
for layer_id in range(model_args.n_layers):
self.layers[str(layer_id)] = TransformerBlock(...)
self.output = nn.Linear(...)
def forward(self, tokens: torch.Tensor):
# Handling layers being 'None' at runtime enables easy pipeline splitting
h = self.tok_embeddings(tokens) if self.tok_embeddings else tokens
for layer in self.layers.values():
h = layer(h, self.freqs_cis)
h = self.norm(h) if self.norm else h
output = self.output(h).float() if self.output else h
return output
以這種方式定義的模型可以很容易地針對每個階段進行配置,方法是首先初始化整個模型(使用 meta-device 以避免 OOM 錯誤),刪除該階段不需要的層,然後建立一個封裝模型的 PipelineStage。 例如
with torch.device("meta"):
assert num_stages == 2, "This is a simple 2-stage example"
# we construct the entire model, then delete the parts we do not need for this stage
# in practice, this can be done using a helper function that automatically divides up layers across stages.
model = Transformer()
if stage_index == 0:
# prepare the first stage model
del model.layers["1"]
model.norm = None
model.output = None
elif stage_index == 1:
# prepare the second stage model
model.tok_embeddings = None
del model.layers["0"]
from torch.distributed.pipelining import PipelineStage
stage = PipelineStage(
model,
stage_index,
num_stages,
device,
input_args=example_input_microbatch,
)
PipelineStage
需要一個範例參數 input_args
,代表階段的運行時輸入,這將是一個微批次的輸入資料。 此參數會傳遞到階段模組的 forward 方法中,以確定通訊所需的輸入和輸出形狀。
當與其他資料或模型並行技術組合時,如果模型區塊的輸出形狀/dtype 受到影響,則可能還需要 output_args
。
選項 2:自動分割模型¶
如果您有一個完整的模型,並且不想花時間將其修改為一系列「模型分割區」,那麼 pipeline
API 可以提供協助。 這是一個簡短的範例
class Model(torch.nn.Module):
def __init__(self) -> None:
super().__init__()
self.emb = torch.nn.Embedding(10, 3)
self.layers = torch.nn.ModuleList(
Layer() for _ in range(2)
)
self.lm = LMHead()
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.emb(x)
for layer in self.layers:
x = layer(x)
x = self.lm(x)
return x
如果我們列印模型,我們可以看到多個層級結構,這使得手動分割變得困難
Model(
(emb): Embedding(10, 3)
(layers): ModuleList(
(0-1): 2 x Layer(
(lin): Linear(in_features=3, out_features=3, bias=True)
)
)
(lm): LMHead(
(proj): Linear(in_features=3, out_features=3, bias=True)
)
)
讓我們看看 pipeline
API 如何運作
from torch.distributed.pipelining import pipeline, SplitPoint
# An example micro-batch input
x = torch.LongTensor([1, 2, 4, 5])
pipe = pipeline(
module=mod,
mb_args=(x,),
split_spec={
"layers.1": SplitPoint.BEGINNING,
}
)
pipeline
API 根據 split_spec
分割您的模型,其中 SplitPoint.BEGINNING
代表在 forward
函數中執行特定子模組之前添加分割點,類似地,SplitPoint.END
代表在之後添加分割點。
如果我們 print(pipe)
,我們可以看見
GraphModule(
(submod_0): GraphModule(
(emb): InterpreterModule()
(layers): Module(
(0): InterpreterModule(
(lin): InterpreterModule()
)
)
)
(submod_1): GraphModule(
(layers): Module(
(1): InterpreterModule(
(lin): InterpreterModule()
)
)
(lm): InterpreterModule(
(proj): InterpreterModule()
)
)
)
def forward(self, x):
submod_0 = self.submod_0(x); x = None
submod_1 = self.submod_1(submod_0); submod_0 = None
return (submod_1,)
「模型分割區」由子模組 (submod_0
, submod_1
) 表示,每個子模組都使用原始模型操作、權重和層級結構重建。 此外,還重建了一個「根層級」 forward
函數,以捕捉這些分割區之間的資料流。 此類資料流稍後將由管線運行時以分散式方式重播。
Pipe
物件提供了一種檢索「模型分割區」的方法
stage_mod : nn.Module = pipe.get_stage_module(stage_idx)
傳回的 stage_mod
是一個 nn.Module
,您可以使用它來建立最佳化器、儲存或載入檢查點或應用其他並行處理。
Pipe
還允許您在給定 ProcessGroup
的情況下在裝置上建立分散式階段運行時
stage = pipe.build_stage(stage_idx, device, group)
或者,如果您想在修改 stage_mod
之後才建置 stage runtime,您可以使用 build_stage
API 的函數版本。例如:
from torch.distributed.pipelining import build_stage
from torch.nn.parallel import DistributedDataParallel
dp_mod = DistributedDataParallel(stage_mod)
info = pipe.info()
stage = build_stage(dp_mod, stage_idx, info, device, group)
注意
pipeline
前端使用追蹤器 (torch.export
) 將您的模型擷取到單一圖形中。如果您的模型無法完整圖形化,您可以使用下方的手動前端。
Hugging Face 範例¶
在最初建立此套件的 PiPPy 儲存庫中,我們保留了基於未修改的 Hugging Face 模型的範例。 請參閱 examples/huggingface 目錄。
範例包括
技術深入探討¶
pipeline
API 如何分割模型?¶
首先,pipeline
API 通過追蹤模型將我們的模型轉換為有向無環圖 (DAG)。 它使用 torch.export
(PyTorch 2 的完整圖形擷取工具) 來追蹤模型。
然後,它將 stage 所需的**操作和參數**分組到一個重建的子模組中:submod_0
、submod_1
、...
與傳統的子模組存取方法 (例如 Module.children()
) 不同,pipeline
API 不僅切割模型的模組結構,還切割模型的 **forward** 函數。
這是必要的,因為像 Module.children()
這樣的模型結構僅在 Module.__init__()
期間捕獲資訊,而不捕獲有關 Module.forward()
的任何資訊。 換句話說,Module.children()
缺乏關於以下對 pipeline 至關重要的方面的資訊:
forward
中子模組的執行順序子模組之間的 activation 流程
子模組之間是否存在任何函數式運算符(例如,
relu
或add
操作不會被Module.children()
捕獲)。
相反,pipeline
API 確保真正保留了 forward
行為。 它還捕獲分區之間的 activation 流程,幫助分散式 runtime 在沒有人工干預的情況下進行正確的發送/接收呼叫。
pipeline
API 的另一個靈活性是,分割點可以在模型層次結構中的任意層級。 在分割的分區中,與該分區相關的原始模型層次結構將以零成本重建。 因此,指向子模組或參數的完全限定名稱 (FQNs) 仍然有效,並且依賴 FQNs 的服務 (例如 FSDP、TP 或檢查點) 仍然可以透過幾乎零程式碼變更來運行您的分割模組。
實作您自己的排程¶
您可以透過擴展以下兩個類別之一來實作您自己的 pipeline 排程
PipelineScheduleSingle
PipelineScheduleMulti
PipelineScheduleSingle
用於每個 rank 僅分配一個 stage 的排程。PipelineScheduleMulti
用於每個 rank 分配多個 stage 的排程。
例如,ScheduleGPipe
和 Schedule1F1B
是 PipelineScheduleSingle
的子類別。 而 ScheduleInterleaved1F1B
、ScheduleLoopedBFS
、ScheduleInterleavedZeroBubble
和 ScheduleZBVZeroBubble
是 PipelineScheduleMulti
的子類別。
日誌紀錄¶
您可以使用來自 [torch._logging](https://pytorch.dev.org.tw/docs/main/logging.html#module-torch._logging) 的 TORCH_LOGS 環境變數來開啟額外的日誌紀錄
TORCH_LOGS=+pp 將顯示 logging.DEBUG 訊息和所有高於它的層級。
TORCH_LOGS=pp 將顯示 logging.INFO 訊息和以上層級。
TORCH_LOGS=-pp 將顯示 logging.WARNING 訊息和以上層級。
API 參考¶
模型分割 API¶
以下 API 集將您的模型轉換為 pipeline 表示法。
- torch.distributed.pipelining.pipeline(module, mb_args, mb_kwargs=None, split_spec=None, split_policy=None)[原始碼][原始碼]¶
根據規格拆分模組。
詳情請參閱 Pipe。
- 參數
module (Module) – 要拆分的模組。
mb_kwargs (Optional[Dict[str, Any]]) – 微批次形式的範例關鍵字輸入。(預設值:None)
split_spec (Optional[Dict[str, SplitPoint]]) – 使用子模組名稱作為拆分標記的字典。(預設值:None)
split_policy (Optional[Callable[[GraphModule], GraphModule]]) – 用於拆分模組的策略。(預設值:None)
- 返回類型
類別 Pipe 的管線表示。
微批次實用工具¶
- torch.distributed.pipelining.microbatch.split_args_kwargs_into_chunks(args, kwargs, chunks, args_chunk_spec=None, kwargs_chunk_spec=None)[原始碼][原始碼]¶
給定一系列 args 和 kwargs,根據它們各自的分塊規格將它們拆分成多個 chunks。
- 參數
chunks (int) – 將 args 和 kwargs 拆分成的 chunk 數量
args_chunk_spec (Optional[Tuple[TensorChunkSpec, ...]]) – args 的分塊規格,與 args 的形狀相同
kwargs_chunk_spec (Optional[Dict[str, TensorChunkSpec]]) – kwargs 的分塊規格 (chunking specs),形狀與 kwargs 相同
- 回傳值
分片後的 args kwargs_split: 分片後的 kwargs 列表
- 返回類型
args_split
Pipeline Stages¶
- class torch.distributed.pipelining.stage.PipelineStage(submodule, stage_index, num_stages, device, input_args=None, output_args=None, group=None, dw_builder=None)[原始碼][原始碼]¶
一個類別,表示管線並行 (pipeline parallelism) 設定中的一個管線階段 (pipeline stage)。
PipelineStage 假設模型循序分割,也就是說,模型被分割成多個塊 (chunk),其中一個塊的輸出饋入下一個塊的輸入,沒有跳躍連接 (skip connection)。
PipelineStage 透過以線性順序將 stage0 的輸出傳播到 stage1,依此類推,自動執行執行時形狀/dtype 推斷。若要繞過形狀推斷,請將 input_args 和 output_args 傳遞給每個 PipelineStage 實例。
- 參數
submodule (nn.Module) – 此階段包裝的 PyTorch 模組。
stage_index (int) – 此階段的 ID。
num_stages (int) – 階段總數。
device (torch.device) – 此階段所在的裝置。
input_args (Union[torch.Tensor, Tuple[torch.tensor]], optional) – 子模組的輸入參數。
output_args (Union[torch.Tensor, Tuple[torch.tensor]], optional) – 子模組的輸出參數。
group (dist.ProcessGroup, optional) – 用於分散式訓練的進程組 (process group)。如果為 None,則使用預設組。
dw_builder (Optional[Callable[[], Callable[[...], None]]]) – TODO 清理註解
- torch.distributed.pipelining.stage.build_stage(stage_module, stage_index, pipe_info, device, group=None)[原始碼][原始碼]¶
建立一個管線階段 (pipeline stage),給定一個要由此階段包裝的 stage_module 和管線資訊。
- 參數
stage_module (torch.nn.Module) – 要由此階段包裝的模組
stage_index (int) – 此階段在管線中的索引
pipe_info (PipeInfo) – 關於管線的資訊,可以使用 pipe.info() 檢索
device (torch.device) – 此階段要使用的裝置
group (Optional[dist.ProcessGroup]) – 此階段要使用的進程組 (process group)
- 回傳值
一個可以與 PipelineSchedules 一起運行的管線階段 (pipeline stage)。
- 返回類型
_PipelineStage
Pipeline Schedules¶
- class torch.distributed.pipelining.schedules.ScheduleGPipe(stage, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None)[source][source]¶
GPipe 排程。將以填充-排空(fill-drain)的方式處理所有微批次。
- class torch.distributed.pipelining.schedules.Schedule1F1B(stage, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None)[source][source]¶
1F1B 排程。在穩定狀態下,將對微批次執行一次前向傳播和一次反向傳播。
- class torch.distributed.pipelining.schedules.ScheduleInterleaved1F1B(stages, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None)[source][source]¶
交錯式 1F1B 排程。詳情請參閱 https://arxiv.org/pdf/2104.04473。在穩定狀態下,將對微批次執行一次前向傳播和一次反向傳播,並支援每個 rank 多個階段。當微批次準備好用於多個本地階段時,交錯式 1F1B 會優先處理較早的微批次(也稱為「深度優先」)。
此排程與原始論文非常相似。不同之處在於放寬了 num_microbatch % pp_size == 0 的要求。 使用 flex_pp 排程,我們將有 num_rounds = max(1, n_microbatches // pp_group_size),只要 n_microbatches % num_rounds 為 0 即可。 作為幾個範例,支援
pp_group_size = 4, n_microbatches = 10。我們將有 num_rounds = 2 且 n_microbatches % 2 為 0。
pp_group_size = 4, n_microbatches = 3。我們將有 num_rounds = 1 且 n_microbatches % 1 為 0。
- class torch.distributed.pipelining.schedules.ScheduleLoopedBFS(stages, n_microbatches, loss_fn=None, output_merge_spec=None)[source][source]¶
廣度優先 Pipeline Parallelism。詳情請參閱 https://arxiv.org/abs/2211.05953。與交錯式 1F1B 類似,Looped BFS 支援每個 rank 多個階段。不同之處在於,當微批次準備好用於多個本地階段時,Loops BFS 會優先處理較早的階段,一次執行所有可用的微批次。
- class torch.distributed.pipelining.schedules.ScheduleInterleavedZeroBubble(stages, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None)[source][source]¶
交錯式零氣泡(Zero Bubble)排程。詳情請參閱 https://arxiv.org/pdf/2401.10241。將在穩定狀態下對微批次的輸入執行一次前向傳播和一次反向傳播,並支援每個 rank 多個階段。使用權重的反向傳播來填補 pipeline 氣泡。
特別是,這實現了論文中的 ZB1P 排程。
- class torch.distributed.pipelining.schedules.ScheduleZBVZeroBubble(stages, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, stage_index_to_group_rank=None)[原始碼][原始碼]¶
Zero Bubble 排程 (ZBV 變體)。詳情請參閱 https://arxiv.org/pdf/2401.10241 第 6 節。
此排程要求每個 rank 恰好有兩個 stage。
此排程將在穩定狀態下對 microbatch 的輸入執行一次前向傳播和一次反向傳播,並支援每個 rank 多個 stage。使用相對於權重的反向傳播來填補 pipeline bubble。
僅當時間前向傳播 == 時間反向輸入 == 時間反向權重時,此 ZB-V 排程才具有「zero bubble」特性。實際上,對於真實模型而言,這不太可能成立,因此可以實作一個貪婪排程器來處理不相等/不平衡的時間。
- class torch.distributed.pipelining.schedules.PipelineScheduleSingle(stage, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None)[原始碼][原始碼]¶
單一 stage 排程的基底類別。實作 step 方法。衍生類別應實作 _step_microbatches。