分散式數據平行¶
警告
torch.nn.parallel.DistributedDataParallel
的實作會隨著時間演進。此設計筆記是基於 v1.4 的狀態撰寫。
torch.nn.parallel.DistributedDataParallel
(DDP) 以透明方式執行分散式數據平行訓練。本頁描述其運作方式並揭露實作細節。
範例¶
讓我們從一個簡單的 torch.nn.parallel.DistributedDataParallel
範例開始。此範例使用 torch.nn.Linear
作為本地模型,並用 DDP 包裝它,然後在 DDP 模型上執行一次正向傳遞、一次反向傳遞和一個最佳化器步驟。之後,本地模型上的參數將被更新,並且不同進程上的所有模型應該完全相同。
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn as nn
import torch.optim as optim
import os
from torch.nn.parallel import DistributedDataParallel as DDP
def example(rank, world_size):
# create default process group
dist.init_process_group("gloo", rank=rank, world_size=world_size)
# create local model
model = nn.Linear(10, 10).to(rank)
# construct DDP model
ddp_model = DDP(model, device_ids=[rank])
# define loss function and optimizer
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
# forward pass
outputs = ddp_model(torch.randn(20, 10).to(rank))
labels = torch.randn(20, 10).to(rank)
# backward pass
loss_fn(outputs, labels).backward()
# update parameters
optimizer.step()
def main():
world_size = 2
mp.spawn(example,
args=(world_size,),
nprocs=world_size,
join=True)
if __name__=="__main__":
# Environment variables which need to be
# set when using c10d's default "env"
# initialization mode.
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29500"
main()
DDP 適用於 TorchDynamo。當與 TorchDynamo 一起使用時,請在編譯模型之前套用 DDP 模型封裝器,以便 torchdynamo 可以根據 DDP bucket 大小套用 DDPOptimizer
(圖形中斷優化)。(有關更多資訊,請參閱TorchDynamo DDPOptimizer。)
ddp_model = DDP(model, device_ids=[rank])
ddp_model = torch.compile(ddp_model)
內部設計¶
本節深入探討 torch.nn.parallel.DistributedDataParallel
的底層運作方式,詳細說明單次迭代中的每個步驟。
先決條件:DDP 依賴 c10d
ProcessGroup
進行通訊。因此,應用程式必須在建構 DDP 之前建立ProcessGroup
實例。建構:DDP 建構函式接受對本地模組的引用,並將
state_dict()
從 rank 0 的進程廣播到群組中的所有其他進程,以確保所有模型副本都從完全相同的狀態開始。然後,每個 DDP 進程都會建立一個本地Reducer
,稍後它將負責反向傳播期間的梯度同步。為了提高通訊效率,Reducer
將參數梯度組織成 bucket,並一次減少一個 bucket。可以通過在 DDP 建構函式中設置 bucket_cap_mb 參數來配置 Bucket 大小。從參數梯度到 bucket 的映射在建構時確定,基於 bucket 大小限制和參數大小。模型參數以(大致)與給定模型中的Model.parameters()
相反的順序分配到 bucket 中。使用相反順序的原因是因為 DDP 期望梯度在反向傳播期間以大約該順序準備好。下圖顯示了一個例子。請注意,grad0
和grad1
在bucket1
中,而其他兩個梯度在bucket0
中。當然,這種假設可能並不總是正確的,當發生這種情況時,可能會損害 DDP 的反向速度,因為Reducer
無法儘早啟動通訊。除了 bucketing 之外,Reducer
還在建構期間註冊 autograd hook,每個參數一個 hook。當梯度準備好時,這些 hook 將在反向傳播期間被觸發。前向傳播:DDP 獲取輸入並將其傳遞到本地模型,然後,如果
find_unused_parameters
設置為True
,則分析來自本地模型的輸出。此模式允許在模型的子圖上運行反向傳播,DDP 通過從模型輸出遍歷 autograd 圖並將所有未使用的參數標記為已準備好進行縮減,從而找出哪些參數參與了反向傳播。在反向傳播期間,Reducer
只會等待未準備好的參數,但它仍會減少所有 bucket。將參數梯度標記為已準備好並不能幫助 DDP 跳過 bucket,但它可以防止 DDP 在反向傳播期間永遠等待不存在的梯度。請注意,遍歷 autograd 圖會引入額外的開銷,因此應用程式應僅在必要時才將find_unused_parameters
設置為True
。反向傳播:
backward()
函數直接在 lossTensor
上調用,這超出了 DDP 的控制範圍,DDP 使用在建構時註冊的 autograd hook 來觸發梯度同步。當一個梯度準備好時,其在該梯度累加器上的相應 DDP hook 將觸發,然後 DDP 將該參數梯度標記為已準備好進行縮減。當一個 bucket 中的梯度都準備好時,Reducer
會在該 bucket 上啟動異步allreduce
,以計算所有進程中梯度的平均值。當所有 bucket 都準備好時,Reducer
將阻止等待所有allreduce
操作完成。完成後,平均梯度將被寫入所有參數的param.grad
字段。因此,在反向傳播之後,不同 DDP 進程上相同對應參數的 grad 字段應該是相同的。優化器步驟:從優化器的角度來看,它正在優化本地模型。所有 DDP 進程上的模型副本都可以保持同步,因為它們都從相同的狀態開始,並且在每次迭代中都具有相同的平均梯度。

注意
DDP 需要所有進程上的 Reducer
實例以完全相同的順序調用 allreduce
,這是通過始終以 bucket 索引順序而不是實際的 bucket 就緒順序運行 allreduce
來完成的。跨進程的不匹配的 allreduce
順序可能導致錯誤的結果或 DDP 反向掛起。
實作¶
以下是指向 DDP 實作元件的指標。堆疊圖顯示了程式碼的結構。
ProcessGroup¶
ProcessGroup.hpp:包含所有進程組實作的抽象 API。
c10d
庫提供了 3 種現成的實作,即 ProcessGroupGloo、ProcessGroupNCCL 和 ProcessGroupMPI。DistributedDataParallel
使用ProcessGroup::broadcast()
在初始化期間將模型狀態從 rank 0 的進程發送到其他進程,並使用ProcessGroup::allreduce()
對梯度求和。Store.hpp:協助進程組實例的 rendezvous 服務以找到彼此。
DistributedDataParallel¶
distributed.py:是 DDP 的 Python 進入點。它實現了初始化步驟以及
nn.parallel.DistributedDataParallel
模組的forward
函數,該函數會呼叫 C++ 函式庫。它的_sync_param
函數在一個 DDP 處理程序在多個裝置上工作時執行進程內參數同步,並且它還將模型緩衝區從 rank 0 的處理程序廣播到所有其他處理程序。進程間參數同步發生在Reducer.cpp
中。comm.h:實現了 coalesced broadcast 輔助函數,該函數被調用以在初始化期間廣播模型狀態,並在 forward 傳遞之前同步模型緩衝區。
reducer.h:為 backward 傳遞中的梯度同步提供核心實現。它有三個進入點函數
Reducer
:建構函式在distributed.py
中被呼叫,它將Reducer::autograd_hook()
註冊到梯度累加器。autograd_hook()
函數將在梯度準備就緒時由 autograd 引擎調用。prepare_for_backward()
在distributed.py
中 DDP forward 傳遞結束時被呼叫。當 DDP 建構函式中find_unused_parameters
設定為True
時,它會遍歷 autograd 圖以尋找未使用的參數。

TorchDynamo DDPOptimizer¶
DDP 的性能優勢來自於在 backward 期間將 allreduce collectives 與計算重疊。當 AotAutograd 與 TorchDynamo 一起使用,編譯整個 forward 和整個 backward 圖時,會阻止這種重疊,因為 allreduce 操作是在整個最佳化 backward 計算完成_後_由 autograd hooks 啟動的。
TorchDynamo 的 DDPOptimizer 透過在 backward 期間於 DDP 的 allreduce buckets 的邏輯邊界處中斷 forward 圖來提供幫助。注意:目標是在 backward 期間中斷圖,最簡單的實現是中斷 forward 圖,然後對每個部分呼叫 AotAutograd 並進行編譯。這允許 DDP 的 allreduce hooks 在 backward 的各個部分之間觸發,並安排通訊與計算重疊。
有關更深入的解釋和實驗結果,請參閱此部落格文章,或閱讀 torch/_dynamo/optimizations/distributed.py 上的文檔和程式碼。
要除錯 DDPOptimizer,請設定 TORCH_LOGS='ddp_graphs' 以取得完整的圖形傾印。對於沒有圖形的日誌,將 'dynamo'、'distributed' 或 'dist_ddp' 中的任何一個新增到 TORCH_LOGS(以取得有關 bucket 邊界的基本資訊)。要停用 DDPOptimizer,請設定 torch._dynamo.config.optimize_ddp=False。DDP 和 TorchDynamo 仍然可以在沒有 DDPOptimizer 的情況下正確工作,但性能會降低。