• 文件 >
  • 分散式數據平行
快捷方式

分散式數據平行

警告

torch.nn.parallel.DistributedDataParallel 的實作會隨著時間演進。此設計筆記是基於 v1.4 的狀態撰寫。

torch.nn.parallel.DistributedDataParallel (DDP) 以透明方式執行分散式數據平行訓練。本頁描述其運作方式並揭露實作細節。

範例

讓我們從一個簡單的 torch.nn.parallel.DistributedDataParallel 範例開始。此範例使用 torch.nn.Linear 作為本地模型,並用 DDP 包裝它,然後在 DDP 模型上執行一次正向傳遞、一次反向傳遞和一個最佳化器步驟。之後,本地模型上的參數將被更新,並且不同進程上的所有模型應該完全相同。

import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn as nn
import torch.optim as optim
import os
from torch.nn.parallel import DistributedDataParallel as DDP


def example(rank, world_size):
    # create default process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)
    # create local model
    model = nn.Linear(10, 10).to(rank)
    # construct DDP model
    ddp_model = DDP(model, device_ids=[rank])
    # define loss function and optimizer
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    # forward pass
    outputs = ddp_model(torch.randn(20, 10).to(rank))
    labels = torch.randn(20, 10).to(rank)
    # backward pass
    loss_fn(outputs, labels).backward()
    # update parameters
    optimizer.step()

def main():
    world_size = 2
    mp.spawn(example,
        args=(world_size,),
        nprocs=world_size,
        join=True)

if __name__=="__main__":
    # Environment variables which need to be
    # set when using c10d's default "env"
    # initialization mode.
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29500"
    main()

DDP 適用於 TorchDynamo。當與 TorchDynamo 一起使用時,請在編譯模型之前套用 DDP 模型封裝器,以便 torchdynamo 可以根據 DDP bucket 大小套用 DDPOptimizer (圖形中斷優化)。(有關更多資訊,請參閱TorchDynamo DDPOptimizer。)

ddp_model = DDP(model, device_ids=[rank])
ddp_model = torch.compile(ddp_model)

內部設計

本節深入探討 torch.nn.parallel.DistributedDataParallel 的底層運作方式,詳細說明單次迭代中的每個步驟。

  • 先決條件:DDP 依賴 c10d ProcessGroup 進行通訊。因此,應用程式必須在建構 DDP 之前建立 ProcessGroup 實例。

  • 建構:DDP 建構函式接受對本地模組的引用,並將 state_dict() 從 rank 0 的進程廣播到群組中的所有其他進程,以確保所有模型副本都從完全相同的狀態開始。然後,每個 DDP 進程都會建立一個本地 Reducer,稍後它將負責反向傳播期間的梯度同步。為了提高通訊效率,Reducer 將參數梯度組織成 bucket,並一次減少一個 bucket。可以通過在 DDP 建構函式中設置 bucket_cap_mb 參數來配置 Bucket 大小。從參數梯度到 bucket 的映射在建構時確定,基於 bucket 大小限制和參數大小。模型參數以(大致)與給定模型中的 Model.parameters() 相反的順序分配到 bucket 中。使用相反順序的原因是因為 DDP 期望梯度在反向傳播期間以大約該順序準備好。下圖顯示了一個例子。請注意,grad0grad1bucket1 中,而其他兩個梯度在 bucket0 中。當然,這種假設可能並不總是正確的,當發生這種情況時,可能會損害 DDP 的反向速度,因為 Reducer 無法儘早啟動通訊。除了 bucketing 之外,Reducer 還在建構期間註冊 autograd hook,每個參數一個 hook。當梯度準備好時,這些 hook 將在反向傳播期間被觸發。

  • 前向傳播:DDP 獲取輸入並將其傳遞到本地模型,然後,如果 find_unused_parameters 設置為 True,則分析來自本地模型的輸出。此模式允許在模型的子圖上運行反向傳播,DDP 通過從模型輸出遍歷 autograd 圖並將所有未使用的參數標記為已準備好進行縮減,從而找出哪些參數參與了反向傳播。在反向傳播期間,Reducer 只會等待未準備好的參數,但它仍會減少所有 bucket。將參數梯度標記為已準備好並不能幫助 DDP 跳過 bucket,但它可以防止 DDP 在反向傳播期間永遠等待不存在的梯度。請注意,遍歷 autograd 圖會引入額外的開銷,因此應用程式應僅在必要時才將 find_unused_parameters 設置為 True

  • 反向傳播backward() 函數直接在 loss Tensor 上調用,這超出了 DDP 的控制範圍,DDP 使用在建構時註冊的 autograd hook 來觸發梯度同步。當一個梯度準備好時,其在該梯度累加器上的相應 DDP hook 將觸發,然後 DDP 將該參數梯度標記為已準備好進行縮減。當一個 bucket 中的梯度都準備好時,Reducer 會在該 bucket 上啟動異步 allreduce,以計算所有進程中梯度的平均值。當所有 bucket 都準備好時,Reducer 將阻止等待所有 allreduce 操作完成。完成後,平均梯度將被寫入所有參數的 param.grad 字段。因此,在反向傳播之後,不同 DDP 進程上相同對應參數的 grad 字段應該是相同的。

  • 優化器步驟:從優化器的角度來看,它正在優化本地模型。所有 DDP 進程上的模型副本都可以保持同步,因為它們都從相同的狀態開始,並且在每次迭代中都具有相同的平均梯度。

ddp_grad_sync.png

注意

DDP 需要所有進程上的 Reducer 實例以完全相同的順序調用 allreduce,這是通過始終以 bucket 索引順序而不是實際的 bucket 就緒順序運行 allreduce 來完成的。跨進程的不匹配的 allreduce 順序可能導致錯誤的結果或 DDP 反向掛起。

實作

以下是指向 DDP 實作元件的指標。堆疊圖顯示了程式碼的結構。

ProcessGroup

  • ProcessGroup.hpp:包含所有進程組實作的抽象 API。c10d 庫提供了 3 種現成的實作,即 ProcessGroupGlooProcessGroupNCCLProcessGroupMPIDistributedDataParallel 使用 ProcessGroup::broadcast() 在初始化期間將模型狀態從 rank 0 的進程發送到其他進程,並使用 ProcessGroup::allreduce() 對梯度求和。

  • Store.hpp:協助進程組實例的 rendezvous 服務以找到彼此。

DistributedDataParallel

  • distributed.py:是 DDP 的 Python 進入點。它實現了初始化步驟以及 nn.parallel.DistributedDataParallel 模組的 forward 函數,該函數會呼叫 C++ 函式庫。它的 _sync_param 函數在一個 DDP 處理程序在多個裝置上工作時執行進程內參數同步,並且它還將模型緩衝區從 rank 0 的處理程序廣播到所有其他處理程序。進程間參數同步發生在 Reducer.cpp 中。

  • comm.h:實現了 coalesced broadcast 輔助函數,該函數被調用以在初始化期間廣播模型狀態,並在 forward 傳遞之前同步模型緩衝區。

  • reducer.h:為 backward 傳遞中的梯度同步提供核心實現。它有三個進入點函數

    • Reducer:建構函式在 distributed.py 中被呼叫,它將 Reducer::autograd_hook() 註冊到梯度累加器。

    • autograd_hook() 函數將在梯度準備就緒時由 autograd 引擎調用。

    • prepare_for_backward()distributed.py 中 DDP forward 傳遞結束時被呼叫。當 DDP 建構函式中 find_unused_parameters 設定為 True 時,它會遍歷 autograd 圖以尋找未使用的參數。

ddp_code.png

TorchDynamo DDPOptimizer

DDP 的性能優勢來自於在 backward 期間將 allreduce collectives 與計算重疊。當 AotAutograd 與 TorchDynamo 一起使用,編譯整個 forward 和整個 backward 圖時,會阻止這種重疊,因為 allreduce 操作是在整個最佳化 backward 計算完成_後_由 autograd hooks 啟動的。

TorchDynamo 的 DDPOptimizer 透過在 backward 期間於 DDP 的 allreduce buckets 的邏輯邊界處中斷 forward 圖來提供幫助。注意:目標是在 backward 期間中斷圖,最簡單的實現是中斷 forward 圖,然後對每個部分呼叫 AotAutograd 並進行編譯。這允許 DDP 的 allreduce hooks 在 backward 的各個部分之間觸發,並安排通訊與計算重疊。

有關更深入的解釋和實驗結果,請參閱此部落格文章,或閱讀 torch/_dynamo/optimizations/distributed.py 上的文檔和程式碼。

要除錯 DDPOptimizer,請設定 TORCH_LOGS='ddp_graphs' 以取得完整的圖形傾印。對於沒有圖形的日誌,將 'dynamo'、'distributed' 或 'dist_ddp' 中的任何一個新增到 TORCH_LOGS(以取得有關 bucket 邊界的基本資訊)。要停用 DDPOptimizer,請設定 torch._dynamo.config.optimize_ddp=False。DDP 和 TorchDynamo 仍然可以在沒有 DDPOptimizer 的情況下正確工作,但性能會降低。

文件

訪問 PyTorch 的綜合開發者文檔

查看文檔

教學課程

獲取初學者和高級開發人員的深入教學課程

查看教學課程

資源

查找開發資源並獲得問題解答

查看資源