捷徑

管線並行

注意

torch.distributed.pipelining 目前處於 alpha 狀態並正在開發中。 API 變更是有可能的。 它從 PiPPy 專案遷移而來。

為什麼需要管線並行?

管線並行是深度學習的 基本 並行方法之一。 它允許對模型的 執行 進行分割,以便多個 微批次 可以同時執行模型程式碼的不同部分。 管線並行對於以下情況可能是一種有效的技術:

  • 大規模訓練

  • 頻寬受限的叢集

  • 大型模型推論

以上場景的一個共同點是,每個裝置的計算量無法隱藏傳統並行的通訊,例如 FSDP 的權重全收集。

torch.distributed.pipelining 是什麼?

雖然管線並行對於擴展很有前景,但它通常難以實現,因為它除了模型權重之外,還需要對模型的 執行進行分割。 執行分割通常需要對您的模型進行侵入式程式碼變更。 複雜性的另一個方面來自於 在分散式環境中排程微批次,並考慮到 資料流依賴性

pipelining 套件提供了一個工具組,能夠 自動 執行上述功能,方便在 一般 模型上實作管線並行 (pipeline parallelism)。

它包含兩個部分:一個 分割前端 和一個 分散式運行時。分割前端會直接接收你的模型程式碼,將其分割成「模型分割區 (model partitions)」,並捕捉資料流關係。分散式運行時則會在不同的裝置上平行執行管線階段 (pipeline stages),處理微批次分割 (micro-batch splitting)、排程 (scheduling)、通訊和梯度傳播等事宜。

總體而言,pipelining 套件提供以下功能:

  • 根據簡單的規範分割模型程式碼。

  • 豐富的管線排程支援,包括 GPipe、1F1B、交錯式 1F1B (Interleaved 1F1B) 和迴圈 BFS (Looped BFS),並提供編寫自定義排程的基礎架構。

  • 一流的跨主機管線並行支援,因為管線並行通常用於此(透過較慢的互連)。

  • 可與其他 PyTorch 並行技術組合使用,例如資料並行 (DDP、FSDP) 或張量並行。 TorchTitan 專案展示了 Llama 模型上的「3D 並行」應用。

步驟 1:建立 PipelineStage

在使用 PipelineSchedule 之前,我們需要建立 PipelineStage 物件,以封裝在該階段中執行的模型部分。 PipelineStage 負責分配通訊緩衝區,並建立 send/recv 操作以與其對等節點進行通訊。它管理中間緩衝區,例如尚未消耗的前向傳播輸出,並提供一個實用工具來執行階段模型的反向傳播。

PipelineStage 需要知道階段模型的輸入和輸出形狀,以便它可以正確地分配通訊緩衝區。形狀必須是靜態的,例如,在運行時,形狀不能一步一步地改變。如果運行時形狀與預期形狀不符,將會引發 PipeliningShapeError 類別。在與其他並行處理或應用混合精度組合時,必須考慮這些技術,以便 PipelineStage 知道階段模組在運行時輸出的正確形狀(和 dtype)。

使用者可以直接建構 PipelineStage 實例,方法是傳入一個 nn.Module,該模組代表應在該階段運行的模型部分。 這可能需要變更原始模型程式碼。 請參閱選項 1:手動分割模型中的範例。

或者,分割前端可以使用圖分割自動將您的模型分割成一系列 nn.Module。 此技術要求可以使用 torch.Export 追蹤模型。 將產生的 nn.Module 與其他並行技術組合使用是實驗性的,可能需要一些變通方法。 如果使用者無法輕易變更模型程式碼,則使用此前端可能更具吸引力。 有關更多資訊,請參閱選項 2:自動分割模型

步驟 2:使用 PipelineSchedule 執行

我們現在可以將 PipelineStage 連接到管線排程,並使用輸入資料執行排程。 這是一個 GPipe 範例

from torch.distributed.pipelining import ScheduleGPipe

# Create a schedule
schedule = ScheduleGPipe(stage, n_microbatches)

# Input data (whole batch)
x = torch.randn(batch_size, in_dim, device=device)

# Run the pipeline with input `x`
# `x` will be divided into microbatches automatically
if rank == 0:
    schedule.step(x)
else:
    output = schedule.step()

請注意,上述程式碼需要在每個 worker 上啟動,因此我們使用啟動器服務來啟動多個進程

torchrun --nproc_per_node=2 example.py

分割模型的選項

選項 1:手動分割模型

為了直接建構 PipelineStage,使用者負責提供一個單一的 nn.Module 實例,該實例擁有相關的 nn.Parametersnn.Buffers,並定義一個 forward() 方法來執行該階段的相關操作。 例如,Torchtitan 中定義的 Transformer 類別的精簡版本顯示了建構易於分割模型的模式。

class Transformer(nn.Module):
    def __init__(self, model_args: ModelArgs):
        super().__init__()

        self.tok_embeddings = nn.Embedding(...)

        # Using a ModuleDict lets us delete layers without affecting names,
        # ensuring checkpoints will correctly save and load.
        self.layers = torch.nn.ModuleDict()
        for layer_id in range(model_args.n_layers):
            self.layers[str(layer_id)] = TransformerBlock(...)

        self.output = nn.Linear(...)

    def forward(self, tokens: torch.Tensor):
        # Handling layers being 'None' at runtime enables easy pipeline splitting
        h = self.tok_embeddings(tokens) if self.tok_embeddings else tokens

        for layer in self.layers.values():
            h = layer(h, self.freqs_cis)

        h = self.norm(h) if self.norm else h
        output = self.output(h).float() if self.output else h
        return output

以這種方式定義的模型可以很容易地針對每個階段進行配置,方法是首先初始化整個模型(使用 meta-device 以避免 OOM 錯誤),刪除該階段不需要的層,然後建立一個封裝模型的 PipelineStage。 例如

with torch.device("meta"):
    assert num_stages == 2, "This is a simple 2-stage example"

    # we construct the entire model, then delete the parts we do not need for this stage
    # in practice, this can be done using a helper function that automatically divides up layers across stages.
    model = Transformer()

    if stage_index == 0:
        # prepare the first stage model
        del model.layers["1"]
        model.norm = None
        model.output = None

    elif stage_index == 1:
        # prepare the second stage model
        model.tok_embeddings = None
        del model.layers["0"]

    from torch.distributed.pipelining import PipelineStage
    stage = PipelineStage(
        model,
        stage_index,
        num_stages,
        device,
        input_args=example_input_microbatch,
    )

PipelineStage 需要一個範例參數 input_args,代表階段的運行時輸入,這將是一個微批次的輸入資料。 此參數會傳遞到階段模組的 forward 方法中,以確定通訊所需的輸入和輸出形狀。

當與其他資料或模型並行技術組合時,如果模型區塊的輸出形狀/dtype 受到影響,則可能還需要 output_args

選項 2:自動分割模型

如果您有一個完整的模型,並且不想花時間將其修改為一系列「模型分割區」,那麼 pipeline API 可以提供協助。 這是一個簡短的範例

class Model(torch.nn.Module):
    def __init__(self) -> None:
        super().__init__()
        self.emb = torch.nn.Embedding(10, 3)
        self.layers = torch.nn.ModuleList(
            Layer() for _ in range(2)
        )
        self.lm = LMHead()

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.emb(x)
        for layer in self.layers:
            x = layer(x)
        x = self.lm(x)
        return x

如果我們列印模型,我們可以看到多個層級結構,這使得手動分割變得困難

Model(
  (emb): Embedding(10, 3)
  (layers): ModuleList(
    (0-1): 2 x Layer(
      (lin): Linear(in_features=3, out_features=3, bias=True)
    )
  )
  (lm): LMHead(
    (proj): Linear(in_features=3, out_features=3, bias=True)
  )
)

讓我們看看 pipeline API 如何運作

from torch.distributed.pipelining import pipeline, SplitPoint

# An example micro-batch input
x = torch.LongTensor([1, 2, 4, 5])

pipe = pipeline(
    module=mod,
    mb_args=(x,),
    split_spec={
        "layers.1": SplitPoint.BEGINNING,
    }
)

pipeline API 根據 split_spec 分割您的模型,其中 SplitPoint.BEGINNING 代表在 forward 函數中執行特定子模組之前添加分割點,類似地,SplitPoint.END 代表在之後添加分割點。

如果我們 print(pipe),我們可以看見

GraphModule(
  (submod_0): GraphModule(
    (emb): InterpreterModule()
    (layers): Module(
      (0): InterpreterModule(
        (lin): InterpreterModule()
      )
    )
  )
  (submod_1): GraphModule(
    (layers): Module(
      (1): InterpreterModule(
        (lin): InterpreterModule()
      )
    )
    (lm): InterpreterModule(
      (proj): InterpreterModule()
    )
  )
)

def forward(self, x):
    submod_0 = self.submod_0(x);  x = None
    submod_1 = self.submod_1(submod_0);  submod_0 = None
    return (submod_1,)

「模型分割區」由子模組 (submod_0, submod_1) 表示,每個子模組都使用原始模型操作、權重和層級結構重建。 此外,還重建了一個「根層級」 forward 函數,以捕捉這些分割區之間的資料流。 此類資料流稍後將由管線運行時以分散式方式重播。

Pipe 物件提供了一種檢索「模型分割區」的方法

stage_mod : nn.Module = pipe.get_stage_module(stage_idx)

傳回的 stage_mod 是一個 nn.Module,您可以使用它來建立最佳化器、儲存或載入檢查點或應用其他並行處理。

Pipe 還允許您在給定 ProcessGroup 的情況下在裝置上建立分散式階段運行時

stage = pipe.build_stage(stage_idx, device, group)

或者,如果您想在修改 stage_mod 之後才建置 stage runtime,您可以使用 build_stage API 的函數版本。例如:

from torch.distributed.pipelining import build_stage
from torch.nn.parallel import DistributedDataParallel

dp_mod = DistributedDataParallel(stage_mod)
info = pipe.info()
stage = build_stage(dp_mod, stage_idx, info, device, group)

注意

pipeline 前端使用追蹤器 (torch.export) 將您的模型擷取到單一圖形中。如果您的模型無法完整圖形化,您可以使用下方的手動前端。

Hugging Face 範例

在最初建立此套件的 PiPPy 儲存庫中,我們保留了基於未修改的 Hugging Face 模型的範例。 請參閱 examples/huggingface 目錄。

範例包括

技術深入探討

pipeline API 如何分割模型?

首先,pipeline API 通過追蹤模型將我們的模型轉換為有向無環圖 (DAG)。 它使用 torch.export (PyTorch 2 的完整圖形擷取工具) 來追蹤模型。

然後,它將 stage 所需的**操作和參數**分組到一個重建的子模組中:submod_0submod_1、...

與傳統的子模組存取方法 (例如 Module.children()) 不同,pipeline API 不僅切割模型的模組結構,還切割模型的 **forward** 函數。

這是必要的,因為像 Module.children() 這樣的模型結構僅在 Module.__init__() 期間捕獲資訊,而不捕獲有關 Module.forward() 的任何資訊。 換句話說,Module.children() 缺乏關於以下對 pipeline 至關重要的方面的資訊:

  • forward 中子模組的執行順序

  • 子模組之間的 activation 流程

  • 子模組之間是否存在任何函數式運算符(例如,reluadd 操作不會被 Module.children() 捕獲)。

相反,pipeline API 確保真正保留了 forward 行為。 它還捕獲分區之間的 activation 流程,幫助分散式 runtime 在沒有人工干預的情況下進行正確的發送/接收呼叫。

pipeline API 的另一個靈活性是,分割點可以在模型層次結構中的任意層級。 在分割的分區中,與該分區相關的原始模型層次結構將以零成本重建。 因此,指向子模組或參數的完全限定名稱 (FQNs) 仍然有效,並且依賴 FQNs 的服務 (例如 FSDP、TP 或檢查點) 仍然可以透過幾乎零程式碼變更來運行您的分割模組。

實作您自己的排程

您可以透過擴展以下兩個類別之一來實作您自己的 pipeline 排程

  • PipelineScheduleSingle

  • PipelineScheduleMulti

PipelineScheduleSingle 用於每個 rank 僅分配一個 stage 的排程。PipelineScheduleMulti 用於每個 rank 分配多個 stage 的排程。

例如,ScheduleGPipeSchedule1F1BPipelineScheduleSingle 的子類別。 而 ScheduleInterleaved1F1BScheduleLoopedBFSScheduleInterleavedZeroBubbleScheduleZBVZeroBubblePipelineScheduleMulti 的子類別。

日誌紀錄

您可以使用來自 [torch._logging](https://pytorch.dev.org.tw/docs/main/logging.html#module-torch._logging) 的 TORCH_LOGS 環境變數來開啟額外的日誌紀錄

  • TORCH_LOGS=+pp 將顯示 logging.DEBUG 訊息和所有高於它的層級。

  • TORCH_LOGS=pp 將顯示 logging.INFO 訊息和以上層級。

  • TORCH_LOGS=-pp 將顯示 logging.WARNING 訊息和以上層級。

API 參考

模型分割 API

以下 API 集將您的模型轉換為 pipeline 表示法。

class torch.distributed.pipelining.SplitPoint(value)[來源][來源]

一個列舉。

torch.distributed.pipelining.pipeline(module, mb_args, mb_kwargs=None, split_spec=None, split_policy=None)[原始碼][原始碼]

根據規格拆分模組。

詳情請參閱 Pipe

參數
  • module (Module) – 要拆分的模組。

  • mb_args (Tuple[Any, ...]) – 微批次 (micro-batch) 形式的範例位置輸入。

  • mb_kwargs (Optional[Dict[str, Any]]) – 微批次形式的範例關鍵字輸入。(預設值:None

  • split_spec (Optional[Dict[str, SplitPoint]]) – 使用子模組名稱作為拆分標記的字典。(預設值:None

  • split_policy (Optional[Callable[[GraphModule], GraphModule]]) – 用於拆分模組的策略。(預設值:None

返回類型

類別 Pipe 的管線表示。

class torch.distributed.pipelining.Pipe(split_gm, num_stages, has_loss_and_backward, loss_spec)[原始碼][原始碼]
torch.distributed.pipelining.pipe_split()[原始碼][原始碼]

pipe_split 是一個特殊運算子,用於標記模組中階段之間的邊界。它用於將模組拆分為多個階段。如果您的註釋模組以 eager 模式執行,它將是一個空操作 (no-op)。

範例

>>> def forward(self, x):
>>>     x = torch.mm(x, self.mm_param)
>>>     x = torch.relu(x)
>>>     pipe_split()
>>>     x = self.lin(x)
>>>     return x

上面的範例將被拆分為兩個階段。

微批次實用工具

class torch.distributed.pipelining.microbatch.TensorChunkSpec(split_dim)[原始碼][原始碼]

用於指定輸入分塊的類別

torch.distributed.pipelining.microbatch.split_args_kwargs_into_chunks(args, kwargs, chunks, args_chunk_spec=None, kwargs_chunk_spec=None)[原始碼][原始碼]

給定一系列 args 和 kwargs,根據它們各自的分塊規格將它們拆分成多個 chunks。

參數
  • args (Tuple[Any, ...]) – args 的元組

  • kwargs (Optional[Dict[str, Any]]) – kwargs 的字典

  • chunks (int) – 將 args 和 kwargs 拆分成的 chunk 數量

  • args_chunk_spec (Optional[Tuple[TensorChunkSpec, ...]]) – args 的分塊規格,與 args 的形狀相同

  • kwargs_chunk_spec (Optional[Dict[str, TensorChunkSpec]]) – kwargs 的分塊規格 (chunking specs),形狀與 kwargs 相同

回傳值

分片後的 args kwargs_split: 分片後的 kwargs 列表

返回類型

args_split

torch.distributed.pipelining.microbatch.merge_chunks(chunks, chunk_spec)[原始碼][原始碼]

給定一個分塊 (chunk) 列表,根據分塊規格 (chunk spec) 將它們合併為單一值。

參數
  • chunks (List[Any]) – 分塊 (chunk) 列表

  • chunk_spec – 分塊 (chunk) 的分塊規格 (chunking spec)

回傳值

合併後的值

返回類型

value

Pipeline Stages

class torch.distributed.pipelining.stage.PipelineStage(submodule, stage_index, num_stages, device, input_args=None, output_args=None, group=None, dw_builder=None)[原始碼][原始碼]

一個類別,表示管線並行 (pipeline parallelism) 設定中的一個管線階段 (pipeline stage)。

PipelineStage 假設模型循序分割,也就是說,模型被分割成多個塊 (chunk),其中一個塊的輸出饋入下一個塊的輸入,沒有跳躍連接 (skip connection)。

PipelineStage 透過以線性順序將 stage0 的輸出傳播到 stage1,依此類推,自動執行執行時形狀/dtype 推斷。若要繞過形狀推斷,請將 input_argsoutput_args 傳遞給每個 PipelineStage 實例。

參數
  • submodule (nn.Module) – 此階段包裝的 PyTorch 模組。

  • stage_index (int) – 此階段的 ID。

  • num_stages (int) – 階段總數。

  • device (torch.device) – 此階段所在的裝置。

  • input_args (Union[torch.Tensor, Tuple[torch.tensor]], optional) – 子模組的輸入參數。

  • output_args (Union[torch.Tensor, Tuple[torch.tensor]], optional) – 子模組的輸出參數。

  • group (dist.ProcessGroup, optional) – 用於分散式訓練的進程組 (process group)。如果為 None,則使用預設組。

  • dw_builder (Optional[Callable[[], Callable[[...], None]]]) – TODO 清理註解

torch.distributed.pipelining.stage.build_stage(stage_module, stage_index, pipe_info, device, group=None)[原始碼][原始碼]

建立一個管線階段 (pipeline stage),給定一個要由此階段包裝的 stage_module 和管線資訊。

參數
  • stage_module (torch.nn.Module) – 要由此階段包裝的模組

  • stage_index (int) – 此階段在管線中的索引

  • pipe_info (PipeInfo) – 關於管線的資訊,可以使用 pipe.info() 檢索

  • device (torch.device) – 此階段要使用的裝置

  • group (Optional[dist.ProcessGroup]) – 此階段要使用的進程組 (process group)

回傳值

一個可以與 PipelineSchedules 一起運行的管線階段 (pipeline stage)。

返回類型

_PipelineStage

Pipeline Schedules

class torch.distributed.pipelining.schedules.ScheduleGPipe(stage, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None)[source][source]

GPipe 排程。將以填充-排空(fill-drain)的方式處理所有微批次。

class torch.distributed.pipelining.schedules.Schedule1F1B(stage, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None)[source][source]

1F1B 排程。在穩定狀態下,將對微批次執行一次前向傳播和一次反向傳播。

class torch.distributed.pipelining.schedules.ScheduleInterleaved1F1B(stages, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None)[source][source]

交錯式 1F1B 排程。詳情請參閱 https://arxiv.org/pdf/2104.04473。在穩定狀態下,將對微批次執行一次前向傳播和一次反向傳播,並支援每個 rank 多個階段。當微批次準備好用於多個本地階段時,交錯式 1F1B 會優先處理較早的微批次(也稱為「深度優先」)。

此排程與原始論文非常相似。不同之處在於放寬了 num_microbatch % pp_size == 0 的要求。 使用 flex_pp 排程,我們將有 num_rounds = max(1, n_microbatches // pp_group_size),只要 n_microbatches % num_rounds 為 0 即可。 作為幾個範例,支援

  1. pp_group_size = 4, n_microbatches = 10。我們將有 num_rounds = 2 且 n_microbatches % 2 為 0。

  2. pp_group_size = 4, n_microbatches = 3。我們將有 num_rounds = 1 且 n_microbatches % 1 為 0。

class torch.distributed.pipelining.schedules.ScheduleLoopedBFS(stages, n_microbatches, loss_fn=None, output_merge_spec=None)[source][source]

廣度優先 Pipeline Parallelism。詳情請參閱 https://arxiv.org/abs/2211.05953。與交錯式 1F1B 類似,Looped BFS 支援每個 rank 多個階段。不同之處在於,當微批次準備好用於多個本地階段時,Loops BFS 會優先處理較早的階段,一次執行所有可用的微批次。

class torch.distributed.pipelining.schedules.ScheduleInterleavedZeroBubble(stages, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None)[source][source]

交錯式零氣泡(Zero Bubble)排程。詳情請參閱 https://arxiv.org/pdf/2401.10241。將在穩定狀態下對微批次的輸入執行一次前向傳播和一次反向傳播,並支援每個 rank 多個階段。使用權重的反向傳播來填補 pipeline 氣泡。

特別是,這實現了論文中的 ZB1P 排程。

class torch.distributed.pipelining.schedules.ScheduleZBVZeroBubble(stages, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, stage_index_to_group_rank=None)[原始碼][原始碼]

Zero Bubble 排程 (ZBV 變體)。詳情請參閱 https://arxiv.org/pdf/2401.10241 第 6 節。

此排程要求每個 rank 恰好有兩個 stage。

此排程將在穩定狀態下對 microbatch 的輸入執行一次前向傳播和一次反向傳播,並支援每個 rank 多個 stage。使用相對於權重的反向傳播來填補 pipeline bubble。

僅當時間前向傳播 == 時間反向輸入 == 時間反向權重時,此 ZB-V 排程才具有「zero bubble」特性。實際上,對於真實模型而言,這不太可能成立,因此可以實作一個貪婪排程器來處理不相等/不平衡的時間。

class torch.distributed.pipelining.schedules.PipelineScheduleSingle(stage, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None)[原始碼][原始碼]

單一 stage 排程的基底類別。實作 step 方法。衍生類別應實作 _step_microbatches

step(*args, target=None, losses=None, **kwargs)[原始碼][原始碼]

使用完整批次輸入執行 pipeline 排程的一次迭代。會自動將輸入分塊為 microbatch,並根據排程實作來處理 microbatch。

args: 模型的位置引數(如同非 pipeline 的情況)。 kwargs: 模型的關鍵字引數(如同非 pipeline 的情況)。 target: 損失函數的目標。 losses: 用於儲存每個 microbatch 的損失的列表。

class torch.distributed.pipelining.schedules.PipelineScheduleMulti(stages, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, stage_index_to_group_rank=None, use_full_backward=None)[原始碼][原始碼]

多重 stage 排程的基底類別。實作 step 方法。

step(*args, target=None, losses=None, **kwargs)[原始碼][原始碼]

使用完整批次輸入執行 pipeline 排程的一次迭代。會自動將輸入分塊為 microbatch,並根據排程實作來處理 microbatch。

args: 模型的位置引數(如同非 pipeline 的情況)。 kwargs: 模型的關鍵字引數(如同非 pipeline 的情況)。 target: 損失函數的目標。 losses: 用於儲存每個 microbatch 的損失的列表。

文件

取得 PyTorch 的完整開發者文件

檢視文件

教學

取得適合初學者和進階開發者的深入教學

檢視教學

資源

尋找開發資源並獲得您的問題解答

檢視資源