捷徑

分散式資料平行

class torch.nn.parallel.DistributedDataParallel(module, device_ids=None, output_device=None, dim=0, broadcast_buffers=True, process_group=None, bucket_cap_mb=None, find_unused_parameters=False, check_reduction=False, gradient_as_bucket_view=False, static_graph=False, delay_all_reduce_named_params=None, param_to_hook_all_reduce=None, mixed_precision=None, device_mesh=None)[source][source]

基於模組層級的 torch.distributed 實現分散式資料平行處理。

此容器透過同步每個模型副本之間的梯度來提供資料平行處理。要同步的裝置由輸入的 process_group 指定,預設為整個世界。請注意,DistributedDataParallel 不會對輸入進行分塊或以其他方式在參與的 GPU 之間進行分片;使用者有責任定義如何執行此操作,例如透過使用 DistributedSampler

另請參閱:基礎知識使用 nn.parallel.DistributedDataParallel 而不是 multiprocessing 或 nn.DataParallel。與 torch.nn.DataParallel 中相同的輸入限制適用。

建立此類別需要已經初始化 torch.distributed,方法是呼叫 torch.distributed.init_process_group()

經驗證,對於單節點多 GPU 資料平行訓練,DistributedDataParalleltorch.nn.DataParallel 快得多。

若要在具有 N 個 GPU 的主機上使用 DistributedDataParallel,您應該啟動 N 個進程,確保每個進程都專門在從 0 到 N-1 的單個 GPU 上工作。這可以透過為每個進程設定 CUDA_VISIBLE_DEVICES 或呼叫來完成

>>> torch.cuda.set_device(i)

其中 i 從 0 到 N-1。在每個進程中,您應該參考以下內容來建構此模組

>>> torch.distributed.init_process_group(
>>>     backend='nccl', world_size=N, init_method='...'
>>> )
>>> model = DistributedDataParallel(model, device_ids=[i], output_device=i)

為了在每個節點上啟動多個進程,您可以使用 torch.distributed.launchtorch.multiprocessing.spawn

注意

請參閱 PyTorch 分散式概觀,以簡要介紹與分散式訓練相關的所有功能。

注意

DistributedDataParallel 可以與 torch.distributed.optim.ZeroRedundancyOptimizer 結合使用,以減少每個 rank 的最佳化器狀態記憶體佔用。有關更多詳細資訊,請參閱 ZeroRedundancyOptimizer 配方

注意

當使用 GPU 時,nccl 後端是目前最快且強烈建議的後端。這適用於單節點和多節點分散式訓練。

注意

此模組還支援混合精度分散式訓練。這表示您的模型可以具有不同類型的參數,例如 fp16fp32 的混合類型,對這些混合類型參數的梯度縮減將正常工作。

注意

如果您在一個進程上使用 torch.save 來檢查點模組,並在其他一些進程上使用 torch.load 來恢復它,請確保為每個進程正確配置 map_location。如果沒有 map_locationtorch.load 會將模組恢復到從中儲存模組的裝置。

注意

當模型在具有 batch=NM 個節點上訓練時,如果損失在批次中的實例之間被加總(通常 *不* 是平均),則梯度將比在單個節點上使用 batch=M*N 訓練的相同模型小 M 倍(因為不同節點之間的梯度被平均)。當您想要獲得與本地訓練對應物在數學上等效的訓練過程時,您應該考慮這一點。但在大多數情況下,您可以將 DistributedDataParallel 包裝的模型、DataParallel 包裝的模型以及單個 GPU 上的普通模型視為相同(例如,對於等效批次大小使用相同的學習率)。

注意

參數永遠不會在進程之間廣播。此模組對梯度執行 all-reduce 步驟,並假設它們會以相同的方式在所有進程中被優化器修改。緩衝區 (例如 BatchNorm 統計數據) 會在每次迭代中從 rank 0 進程中的模組廣播到系統中的所有其他副本。

注意

如果您將 DistributedDataParallel 與 Distributed RPC 框架 一起使用,您應該始終使用 torch.distributed.autograd.backward() 來計算梯度,並使用 torch.distributed.optim.DistributedOptimizer 來優化參數。

範例

>>> import torch.distributed.autograd as dist_autograd
>>> from torch.nn.parallel import DistributedDataParallel as DDP
>>> import torch
>>> from torch import optim
>>> from torch.distributed.optim import DistributedOptimizer
>>> import torch.distributed.rpc as rpc
>>> from torch.distributed.rpc import RRef
>>>
>>> t1 = torch.rand((3, 3), requires_grad=True)
>>> t2 = torch.rand((3, 3), requires_grad=True)
>>> rref = rpc.remote("worker1", torch.add, args=(t1, t2))
>>> ddp_model = DDP(my_model)
>>>
>>> # Setup optimizer
>>> optimizer_params = [rref]
>>> for param in ddp_model.parameters():
>>>     optimizer_params.append(RRef(param))
>>>
>>> dist_optim = DistributedOptimizer(
>>>     optim.SGD,
>>>     optimizer_params,
>>>     lr=0.05,
>>> )
>>>
>>> with dist_autograd.context() as context_id:
>>>     pred = ddp_model(rref.to_here())
>>>     loss = loss_func(pred, target)
>>>     dist_autograd.backward(context_id, [loss])
>>>     dist_optim.step(context_id)

注意

DistributedDataParallel 目前對使用 torch.utils.checkpoint() 進行梯度檢查點的支持有限。如果檢查點是使用 use_reentrant=False (建議) 完成的,則 DDP 將按預期工作,沒有任何限制。但是,如果檢查點是使用 use_reentrant=True (預設) 完成的,則當模型中沒有未使用的參數,並且每個層最多檢查一次時,DDP 將按預期工作 (請確保您沒有將 find_unused_parameters=True 傳遞給 DDP)。我們目前不支持一個層被檢查多次,或者在檢查模型中存在未使用的參數的情況。

注意

為了讓非 DDP 模型從 DDP 模型載入 state dict,需要應用 consume_prefix_in_state_dict_if_present() 來去除 DDP state dict 中的前綴 "module.",然後再載入。

警告

建構子、forward 方法和輸出的微分 (或此模組輸出的函數) 是分散式同步點。如果不同的進程可能正在執行不同的代碼,請考慮這一點。

警告

此模組假定所有參數在建立時已在模型中註冊。之後不應添加或刪除任何參數。同樣適用於緩衝區。

警告

此模組假定每個分散式進程的模型中註冊的所有參數順序相同。該模組本身將按照模型註冊參數的相反順序執行梯度 allreduce。換句話說,用戶有責任確保每個分散式進程具有完全相同的模型,因此具有完全相同的參數註冊順序。

警告

此模組允許具有非 rowmajor-contiguous strides 的參數。例如,您的模型可能包含一些參數,它們的 torch.memory_formattorch.contiguous_format,而另一些參數的格式是 torch.channels_last。但是,不同進程中的對應參數必須具有相同的 strides。

警告

此模組不適用於 torch.autograd.grad() (也就是說,它僅在梯度將累積在參數的 .grad 屬性中時才有效)。

警告

如果您計劃將此模組與 nccl 後端或 gloo 後端 (使用 Infiniband) 一起使用,並且 DataLoader 使用多個 worker,請將多進程啟動方法更改為 forkserver (僅限 Python 3) 或 spawn。不幸的是,Gloo (使用 Infiniband) 和 NCCL2 不是 fork 安全的,如果您不更改此設定,您可能會遇到死鎖。

警告

在用 DistributedDataParallel 包裹模型後,您永遠不應嘗試更改模型的參數。因為,當使用 DistributedDataParallel 包裹模型時,DistributedDataParallel 的建構子會在建構時在模型的所有參數上註冊額外的梯度縮減函數。如果您之後更改模型的參數,梯度縮減函數將不再與正確的參數集匹配。

警告

DistributedDataParallelDistributed RPC 框架 結合使用是實驗性的,可能會發生變化。

參數
  • module (Module) – 要並行化的模組

  • device_ids (list of int or torch.device) –

    CUDA 裝置。1) 對於單裝置模組,device_ids 可以包含恰好一個裝置 id,它代表此進程對應的輸入模組所在的唯一 CUDA 裝置。或者,device_ids 也可以是 None。2) 對於多裝置模組和 CPU 模組,device_ids 必須為 None

    當兩種情況下的 device_ids 都是 None 時,forward 傳遞的輸入數據和實際模組都必須放置在正確的裝置上。(預設: None)

  • output_device (int or torch.device) – 單裝置 CUDA 模組輸出的裝置位置。對於多裝置模組和 CPU 模組,它必須為 None,並且模組本身決定輸出位置。(預設: 單裝置模組的 device_ids[0])

  • broadcast_buffers (bool) – 一個標誌,用於在 forward 函數開始時啟用同步 (廣播) 模組的緩衝區。(預設: True)

  • process_group – 用於分散式資料全歸約 (all-reduction) 的處理程序群組。如果為 None,則會使用預設的處理程序群組,該群組由 torch.distributed.init_process_group() 建立。(預設值:None)

  • bucket_cap_mbDistributedDataParallel 會將參數分組到多個 buckets 中,以便每個 bucket 的梯度歸約 (gradient reduction) 可以與反向傳播計算潛在地重疊。bucket_cap_mb 控制 bucket 的大小,單位為 MebiBytes (MiB)。如果為 None,則會使用 25 MiB 的預設大小。(預設值:None)

  • find_unused_parameters (bool) – 從已封裝模組的 forward 函數傳回值中包含的所有張量,遍歷 autograd 圖。未接收到此圖一部分的梯度的參數會被搶先標記為已準備好進行歸約。此外,可能已在已封裝模組的 forward 函數中使用,但不是損失計算的一部分,因此也不會接收梯度的參數也會被搶先標記為已準備好進行歸約。(預設值:False)

  • check_reduction – 此參數已棄用。

  • gradient_as_bucket_view (bool) – 當設定為 True 時,梯度將是指向 allreduce 通訊 buckets 不同偏移的視圖。這可以減少峰值記憶體使用量,節省的記憶體大小將等於總梯度大小。此外,它避免了在梯度和 allreduce 通訊 buckets 之間複製的開銷。當梯度是視圖時,無法在梯度上呼叫 detach_()。如果遇到此類錯誤,請參考 zero_grad() 函數,該函數位於 torch/optim/optimizer.py 中,以作為解決方案。請注意,梯度將在第一次迭代後成為視圖,因此應在第一次迭代後檢查峰值記憶體節省。

  • static_graph (bool) –

    當設定為 True 時,DDP 知道訓練圖是靜態的。靜態圖表示:1) 已使用和未使用參數的集合在整個訓練迴圈中不會改變;在這種情況下,使用者是否設定 find_unused_parameters = True 並不重要。2) 圖的訓練方式在整個訓練迴圈中不會改變(表示沒有依賴於迭代的控制流程)。當 static_graph 設定為 True 時,DDP 將支援過去無法支援的情況:1) 可重入的反向傳播。2) 多次啟用檢查點。3) 模型具有未使用參數時啟用檢查點。4) 有模型參數位於 forward 函數之外。5) 當存在未使用參數時,可能會提高效能,因為當 static_graph 設定為 True 時,DDP 將不會在每次迭代中搜尋圖以偵測未使用參數。若要檢查是否可以將 static_graph 設定為 True,一種方法是在先前模型訓練結束時檢查 ddp 記錄資料,如果 ddp_logging_data.get("can_set_static_graph") == True,則大多數情況下也可以設定 static_graph = True

    範例:
    >>> model_DDP = torch.nn.parallel.DistributedDataParallel(model)
    >>> # Training loop
    >>> ...
    >>> ddp_logging_data = model_DDP._get_ddp_logging_data()
    >>> static_graph = ddp_logging_data.get("can_set_static_graph")
    

  • delay_all_reduce_named_params (list of tuple of str and torch.nn.Parameter) – 一個命名參數列表,當 param_to_hook_all_reduce 中指定的參數的梯度準備就緒時,其 all reduce 將會被延遲。DDP 的其他參數不適用於此參數中指定的命名參數,因為這些命名參數將被 DDP reducer 忽略。

  • param_to_hook_all_reduce (torch.nn.Parameter) – 一個參數,用於掛鉤 delay_all_reduce_named_params 中指定的參數的延遲 all reduce。

變數

module (Module) – 要平行化的模組。

範例

>>> torch.distributed.init_process_group(backend='nccl', world_size=4, init_method='...')
>>> net = torch.nn.parallel.DistributedDataParallel(model)
join(divide_by_initial_world_size=True, enable=True, throw_on_early_termination=False)[來源][來源]

用於在 DDP 中跨程序進行非均勻輸入訓練的情境管理器。

此情境管理器將追蹤已加入的 DDP 程序,並透過插入集體通訊操作來「陰影」forward 和 backward 傳遞,以與非加入的 DDP 程序建立的通訊操作相匹配。這將確保每個集體呼叫都有已加入的 DDP 程序的相應呼叫,從而防止在跨程序進行非均勻輸入訓練時可能發生的掛起或錯誤。或者,如果將旗標 throw_on_early_termination 指定為 True,則一旦一個 rank 的輸入耗盡,所有訓練器將會拋出錯誤,從而允許根據應用程式邏輯捕獲和處理這些錯誤。

一旦所有 DDP 程序都已加入,情境管理器將廣播對應於最後加入程序的模型到所有程序,以確保所有程序中的模型都相同(DDP 保證)。

若要使用此功能來啟用跨程序進行非均勻輸入的訓練,只需將此情境管理器包裝在訓練迴圈周圍即可。無需對模型或資料載入進行進一步修改。

警告

如果此情境管理器包裝的模型或訓練迴圈具有其他分散式集體操作,例如模型 forward 傳遞中的 SyncBatchNorm,則必須啟用旗標 throw_on_early_termination。這是因為此情境管理器不知道非 DDP 集體通訊。此旗標將導致所有 rank 在任何一個 rank 耗盡輸入時拋出,從而允許跨所有 rank 捕獲和恢復這些錯誤。

參數
  • divide_by_initial_world_size (bool) – 如果為 True,則將梯度除以啟動 DDP 訓練時的初始 world_size。 如果為 False,則計算有效的 world size(尚未耗盡輸入的 rank 數量),並在 allreduce 期間將梯度除以該值。 設定 divide_by_initial_world_size=True 以確保每個輸入樣本,包括不均勻的輸入,在對全域梯度的貢獻方面具有相同的權重。 這是透過始終將梯度除以初始 world_size 來實現的,即使遇到不均勻的輸入也是如此。 如果您將此設定為 False,則將梯度除以剩餘節點的數量。 這樣可以確保與在較小的 world_size 上訓練的一致性,儘管這也意味著不均勻的輸入將對全域梯度產生更大的影響。 通常,對於訓練任務的最後幾個輸入不均勻的情況,您會希望將此設定為 True。 在極端情況下,輸入數量存在很大差異時,將此設定為 False 可能會提供更好的結果。

  • enable (bool) – 是否啟用不均勻輸入檢測。 如果您知道參與進程中的輸入是均勻的,請傳入 enable=False 以停用。 預設值為 True

  • throw_on_early_termination (bool) – 當至少有一個 rank 耗盡輸入時,是否拋出錯誤或繼續訓練。 如果為 True,則會在第一個 rank 到達資料末尾時拋出錯誤。 如果為 False,則將繼續使用較小的有效 world size 進行訓練,直到所有 rank 都加入。 請注意,如果指定此標誌,則將忽略標誌 divide_by_initial_world_size。 預設值為 False

範例

>>> import torch
>>> import torch.distributed as dist
>>> import os
>>> import torch.multiprocessing as mp
>>> import torch.nn as nn
>>> # On each spawned worker
>>> def worker(rank):
>>>     dist.init_process_group("nccl", rank=rank, world_size=2)
>>>     torch.cuda.set_device(rank)
>>>     model = nn.Linear(1, 1, bias=False).to(rank)
>>>     model = torch.nn.parallel.DistributedDataParallel(
>>>         model, device_ids=[rank], output_device=rank
>>>     )
>>>     # Rank 1 gets one more input than rank 0.
>>>     inputs = [torch.tensor([1]).float() for _ in range(10 + rank)]
>>>     with model.join():
>>>         for _ in range(5):
>>>             for inp in inputs:
>>>                 loss = model(inp).sum()
>>>                 loss.backward()
>>>     # Without the join() API, the below synchronization will hang
>>>     # blocking for rank 1's allreduce to complete.
>>>     torch.cuda.synchronize(device=rank)
join_hook(**kwargs)[source][source]

DDP join hook 透過鏡像 forward 和 backward 過程中的通訊,實現對不均勻輸入的訓練。

參數

kwargs (dict) – 一個 dict,包含用於在運行時修改 join hook 行為的任何關鍵字引數;所有共享相同 join context manager 的 Joinable 實例都會收到相同的 kwargs 值。

此 hook 支援以下關鍵字引數
divide_by_initial_world_size (bool, optional)

如果為 True,則梯度除以啟動 DDP 時的初始 world size。 如果為 False,則梯度除以有效的 world size(即未加入的進程數),這意味著不均勻的輸入對全域梯度的貢獻更大。 通常,如果不均勻程度較小,則應將此設定為 True,但在極端情況下可以將其設定為 False,以獲得可能更好的結果。 預設值為 True

no_sync()[source][source]

Context manager 用於停用 DDP 進程之間的梯度同步。

在此上下文中,梯度將累積在模組變數上,這些變數將在退出上下文的第一次 forward-backward 過程中同步。

範例

>>> ddp = torch.nn.parallel.DistributedDataParallel(model, pg)
>>> with ddp.no_sync():
>>>     for input in inputs:
>>>         ddp(input).backward()  # no synchronization, accumulate grads
>>> ddp(another_input).backward()  # synchronize grads

警告

forward 過程應包含在 context manager 內,否則梯度仍將同步。

register_comm_hook(state, hook)[source][source]

註冊通訊 hook,用於跨多個 worker 進行使用者定義的 DDP 梯度聚合。

此 hook 對於研究人員嘗試新想法非常有用。 例如,此 hook 可用於實現多種演算法,例如 GossipGrad 和梯度壓縮,這些演算法涉及不同的通訊策略,用於在運行 Distributed DataParallel 訓練時同步參數。

參數
  • state (object) –

    傳遞給 hook 以在訓練過程中維護任何狀態資訊。 範例包括梯度壓縮中的錯誤回饋、在 GossipGrad 中接下來要通訊的 peer 等。

    它由每個 worker 本地儲存,並由 worker 上的所有梯度 tensor 共享。

  • hook (Callable) –

    具有以下簽章的可呼叫物件: hook(state: object, bucket: dist.GradBucket) -> torch.futures.Future[torch.Tensor]

    此函數會在 bucket 準備就緒後呼叫。Hook 可以執行任何所需的處理,並傳回一個 Future,表示任何非同步工作(例如 allreduce)已完成。如果 hook 不執行任何通訊,它仍然必須傳回一個已完成的 Future。此 Future 應該包含 grad bucket 的 tensors 的新值。一旦 bucket 準備好,c10d reducer 將呼叫此 hook 並使用 Future 傳回的 tensors,並將 grads 複製到個別的參數。請注意,future 的傳回類型必須是單個 tensor。

    我們還提供一個名為 get_future 的 API,用於檢索與 c10d.ProcessGroup.Work 完成相關聯的 Future。get_future 目前支援 NCCL,也支援 GLOO 和 MPI 的大多數操作,除了點對點操作(send/recv)。

警告

Grad bucket 的 tensors 不會預先除以 world_size。使用者有責任在像 allreduce 這樣的操作中除以 world_size。

警告

DDP 通訊 hook 只能註冊一次,並且應該在呼叫 backward 之前註冊。

警告

Hook 傳回的 Future 物件應該包含一個單個 tensor,其形狀與 grad bucket 內的 tensors 相同。

警告

get_future API 支援 NCCL,以及部分 GLOO 和 MPI 後端(不支援點對點操作,如 send/recv),並將傳回一個 torch.futures.Future

範例:

以下是一個 noop hook 的範例,它傳回相同的 tensor。

>>> def noop(state: object, bucket: dist.GradBucket) -> torch.futures.Future[torch.Tensor]:
>>>     fut = torch.futures.Future()
>>>     fut.set_result(bucket.buffer())
>>>     return fut
>>> ddp.register_comm_hook(state=None, hook=noop)
範例:

以下是一個平行 SGD 演算法的範例,其中梯度在 allreduce 之前進行編碼,然後在 allreduce 之後進行解碼。

>>> def encode_and_decode(state: object, bucket: dist.GradBucket) -> torch.futures.Future[torch.Tensor]:
>>>     encoded_tensor = encode(bucket.buffer())  # encode gradients
>>>     fut = torch.distributed.all_reduce(encoded_tensor).get_future()
>>>     # Define the then callback to decode.
>>>     def decode(fut):
>>>         decoded_tensor = decode(fut.value()[0])  # decode gradients
>>>         return decoded_tensor
>>>     return fut.then(decode)
>>> ddp.register_comm_hook(state=None, hook=encode_and_decode)

文件

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources