分散式通訊套件 - torch.distributed¶
注意
請參閱 PyTorch 分散式概觀,簡要介紹與分散式訓練相關的所有功能。
後端¶
torch.distributed
支援三個內建後端,每個後端都有不同的功能。下表顯示哪些函數可用於 CPU / CUDA Tensor。MPI 僅在用於建置 PyTorch 的實作支援 CUDA 時才支援 CUDA。
後端 |
|
|
|
|||
---|---|---|---|---|---|---|
裝置 |
CPU |
GPU |
CPU |
GPU |
CPU |
GPU |
send |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
recv |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
broadcast |
✓ |
✓ |
✓ |
? |
✘ |
✓ |
all_reduce |
✓ |
✓ |
✓ |
? |
✘ |
✓ |
reduce |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
all_gather |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
gather |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
scatter |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
reduce_scatter |
✘ |
✘ |
✘ |
✘ |
✘ |
✓ |
all_to_all |
✘ |
✘ |
✓ |
? |
✘ |
✓ |
barrier |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
PyTorch 隨附的後端¶
PyTorch 分散式套件支援 Linux (穩定版)、MacOS (穩定版) 和 Windows (原型版)。預設情況下,對於 Linux,Gloo 和 NCCL 後端會被建置並包含在 PyTorch 分散式套件中 (NCCL 僅在使用 CUDA 建置時才會包含)。MPI 是一個可選的後端,只有在您從原始碼建置 PyTorch 時才能包含 (例如,在已安裝 MPI 的主機上建置 PyTorch)。
注意
從 PyTorch v1.8 開始,Windows 支援所有 collective communications 後端,但 NCCL 除外。如果 init_method 參數 init_process_group()
指向檔案,則它必須符合以下架構:
本機檔案系統,
init_method="file:///d:/tmp/some_file"
共享檔案系統,
init_method="file://////{machine_name}/{share_folder_name}/some_file"
與 Linux 平台相同,您可以透過設定環境變數 MASTER_ADDR 和 MASTER_PORT 來啟用 TcpStore。
該使用哪個後端?¶
過去,我們經常被問到:「我應該使用哪個後端?」
經驗法則
對於分散式 GPU 訓練,請使用 NCCL 後端。
對於分散式 CPU 訓練,請使用 Gloo 後端。
具有 InfiniBand 互連的 GPU 主機
使用 NCCL,因為它是目前唯一支援 InfiniBand 和 GPUDirect 的後端。
具有乙太網路互連的 GPU 主機
使用 NCCL,因為它目前提供最佳的分散式 GPU 訓練效能,尤其是對於多進程單節點或多節點分散式訓練。如果您在使用 NCCL 時遇到任何問題,請使用 Gloo 作為備用選項。(請注意,Gloo 目前在 GPU 上執行速度比 NCCL 慢)。
具有 InfiniBand 互連的 CPU 主機
如果您的 InfiniBand 已啟用 IP over IB,請使用 Gloo,否則請改用 MPI。我們計劃在即將發佈的版本中新增對 Gloo 的 InfiniBand 支援。
具有乙太網路互連的 CPU 主機
使用 Gloo,除非您有特殊理由使用 MPI。
常見的環境變數¶
選擇要使用的網路介面¶
預設情況下,NCCL 和 Gloo 後端都會嘗試找到要使用的正確網路介面。如果自動偵測到的介面不正確,您可以使用以下環境變數覆寫它 (適用於各自的後端):
NCCL_SOCKET_IFNAME,例如
export NCCL_SOCKET_IFNAME=eth0
GLOO_SOCKET_IFNAME,例如
export GLOO_SOCKET_IFNAME=eth0
如果您正在使用 Gloo 後端,您可以透過逗號分隔來指定多個介面,如下所示:export GLOO_SOCKET_IFNAME=eth0,eth1,eth2,eth3
。後端將以循環方式跨這些介面分派操作。所有進程都必須在此變數中指定相同數量的介面。
其他 NCCL 環境變數¶
除錯 - 如果 NCCL 失敗,您可以設定 NCCL_DEBUG=INFO
來列印明確的警告訊息以及基本的 NCCL 初始化資訊。
您也可以使用 NCCL_DEBUG_SUBSYS
來取得關於 NCCL 特定方面的更多詳細資訊。例如,NCCL_DEBUG_SUBSYS=COLL
將列印 collective 呼叫的日誌,這可能有助於除錯掛起,尤其是那些由 collective 類型或訊息大小不符引起的掛起。如果拓撲偵測失敗,設定 NCCL_DEBUG_SUBSYS=GRAPH
來檢查詳細的偵測結果並儲存為參考,以便在需要 NCCL 團隊的進一步協助時使用,將會很有幫助。
效能調校 - NCCL 根據其拓撲偵測執行自動調校,以節省使用者的調校工作。在某些基於 socket 的系統上,使用者仍然可以嘗試調校 NCCL_SOCKET_NTHREADS
和 NCCL_NSOCKS_PERTHREAD
以增加 socket 網路頻寬。NCCL 已經為某些雲端供應商 (例如 AWS 或 GCP) 預先調校了這兩個環境變數。
有關 NCCL 環境變數的完整清單,請參閱 NVIDIA NCCL 的官方文件
基礎¶
torch.distributed 套件為在一個或多個機器上運行的多個計算節點提供 PyTorch 支援和通訊基元,以實現多進程並行。torch.nn.parallel.DistributedDataParallel()
類別基於此功能建構,以提供同步分散式訓練,作為任何 PyTorch 模型的封裝器。這與 多進程套件 - torch.multiprocessing 和 torch.nn.DataParallel()
提供的並行類型不同,因為它支援多個網路連接的機器,並且使用者必須為每個進程顯式啟動主訓練腳本的單獨副本。
在單機同步案例中,torch.distributed 或 torch.nn.parallel.DistributedDataParallel()
封裝器可能仍然比其他資料並行方法 (包括 torch.nn.DataParallel()
) 具有優勢。
每個進程都維護自己的最佳化器,並在每次迭代時執行完整的最佳化步驟。雖然這看起來可能是多餘的,因為梯度已經被收集在一起並在各個進程中平均,因此對於每個進程都是相同的,但這意味著不需要參數廣播步驟,從而減少了在節點之間傳輸張量的時間。
每個進程都包含一個獨立的 Python 直譯器,消除了額外的直譯器開銷以及來自驅動多個執行緒、模型副本或來自單個 Python 進程的 GPU 的 "GIL-thrashing"。對於大量使用 Python 執行時期的模型 (包括具有循環層或許多小型組件的模型) 而言,這尤其重要。
初始化¶
在呼叫任何其他方法之前,需要使用 torch.distributed.init_process_group()
或 torch.distributed.device_mesh.init_device_mesh()
函數初始化套件。 兩個函數都會阻塞,直到所有程序都加入。
警告
初始化並非執行緒安全。程序群組的建立應從單一執行緒執行,以防止各個 rank 之間出現不一致的 'UUID' 分配,並防止初始化期間的競爭情況,可能導致程序掛起。
- torch.distributed.is_available()[來源][來源]¶
如果 distributed 套件可用,則傳回
True
。否則,
torch.distributed
不會公開任何其他 API。 目前,torch.distributed
在 Linux、MacOS 和 Windows 上可用。 將USE_DISTRIBUTED=1
設定為從原始碼建置 PyTorch 時啟用它。 目前,Linux 和 Windows 的預設值為USE_DISTRIBUTED=1
,MacOS 的預設值為USE_DISTRIBUTED=0
。- 回傳類型
- torch.distributed.init_process_group(backend=None, init_method=None, timeout=None, world_size=-1, rank=-1, store=None, group_name='', pg_options=None, device_id=None)[來源][來源]¶
初始化預設的 distributed 程序群組。
這也會初始化 distributed 套件。
- 有兩種主要方式可以初始化程序群組
明確指定
store
、rank
和world_size
。指定
init_method
(一個 URL 字串),指示在哪裡/如何發現 peers。 選擇性地指定rank
和world_size
,或者在 URL 中編碼所有需要的參數並省略它們。
如果兩者都未指定,則
init_method
預設為 "env://"。- 參數
backend (str or Backend, optional) – 要使用的後端。 根據建置時的設定,有效值包括
mpi
、gloo
、nccl
、ucc
,或由第三方外掛程式註冊的後端。 從 2.6 開始,如果未提供backend
,則 c10d 將使用為 device_id kwarg 指示的裝置類型註冊的後端 (如果提供)。 目前已知的預設註冊為:nccl
用於cuda
,gloo
用於cpu
。 如果未提供backend
和device_id
,則 c10d 將偵測執行時機器上的加速器,並使用為該偵測到的加速器 (或cpu
) 註冊的後端。 此欄位可以作為小寫字串給出 (例如,"gloo"
),也可以透過Backend
屬性存取 (例如,Backend.GLOO
)。 如果使用每台機器多個程序的nccl
後端,則每個程序都必須具有對其使用的每個 GPU 的獨佔存取權,因為在程序之間共享 GPU 可能會導致死鎖或 NCCL 無效使用。ucc
後端是實驗性的。init_method (str, optional) – URL,指定如何初始化程序群組。 如果未指定
init_method
或store
,則預設為 "env://"。 與store
互斥。world_size (int, optional) – 參與作業的程序數量。 如果指定了
store
,則為必填項。rank (int, optional) – 目前process的rank(應為介於0到
world_size
-1之間的數字)。如果指定了store
,則為必填項。store (Store, optional) – 可供所有worker存取的key/value store,用於交換連線/位址資訊。與
init_method
互斥。timeout (timedelta, optional) – 針對 process group 執行的操作的逾時時間。NCCL 的預設值為 10 分鐘,其他 backend 的預設值為 30 分鐘。這是非同步中止 collectives 且程序將崩潰的時間長度。這是因為 CUDA 執行是異步的,並且繼續執行使用者程式碼不再安全,因為失敗的異步 NCCL 操作可能會導致後續 CUDA 操作在損毀的資料上執行。當設定 TORCH_NCCL_BLOCKING_WAIT 時,程序將會封鎖並等待此逾時時間。
group_name (str, optional, deprecated) – 群組名稱。此參數會被忽略。
pg_options (ProcessGroupOptions, optional) – process group 選項,指定在建構特定 process group 期間需要傳入哪些其他選項。目前,我們唯一支援的選項是
nccl
backend 的ProcessGroupNCCL.Options
,可以指定is_high_priority_stream
,以便 nccl backend 可以在有運算核心等待時挑選高優先順序的 cuda streams。 如需配置 nccl 的其他可用選項,請參閱 https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-t。device_id (torch.device, optional) – 用於將此 process「綁定」到的單個特定設備,以實現 backend 特定的最佳化。 目前,這僅在 NCCL 下有兩個作用:立即形成 communicator(立即呼叫
ncclCommInit*
,而不是正常的延遲呼叫),並且子群組將在可能的情況下使用ncclCommSplit
,以避免不必要的群組建立開銷。 如果您想盡早知道 NCCL 初始化錯誤,也可以使用此欄位。
注意
要啟用
backend == Backend.MPI
,需要在支援 MPI 的系統上從原始碼建構 PyTorch。注意
對多個 backend 的支援是實驗性的。 目前,如果未指定任何 backend,則將建立
gloo
和nccl
backend。gloo
backend 將用於具有 CPU tensors 的 collectives,而nccl
backend 將用於具有 CUDA tensors 的 collectives。 可以通過傳入格式為“<device_type>:<backend_name>,<device_type>:<backend_name>”的字串來指定自訂 backend,例如“cpu:gloo,cuda:custom_backend”。
- torch.distributed.device_mesh.init_device_mesh(device_type, mesh_shape, *, mesh_dim_names=None)[source][source]¶
基於 device_type、mesh_shape 和 mesh_dim_names 參數初始化 DeviceMesh。
這會建立具有 n 維陣列佈局的 DeviceMesh,其中 n 是 mesh_shape 的長度。 如果提供 mesh_dim_names,則每個維度都會標記為 mesh_dim_names[i]。
注意
init_device_mesh 遵循 SPMD 程式設計模型,這表示相同的 PyTorch Python 程式會在叢集中的所有 processes/ranks 上執行。 確保 mesh_shape(描述設備佈局的 nD 陣列的維度)在所有 ranks 上都相同。 不一致的 mesh_shape 可能導致程式卡住。
注意
如果找不到任何 process group,init_device_mesh 將初始化在後端進行分散式通訊所需的分散式 process group。
- 參數
- Returns
代表設備佈局的
DeviceMesh
物件。- 回傳類型
- Example:
>>> from torch.distributed.device_mesh import init_device_mesh >>> >>> mesh_1d = init_device_mesh("cuda", mesh_shape=(8,)) >>> mesh_2d = init_device_mesh("cuda", mesh_shape=(2, 8), mesh_dim_names=("dp", "tp"))
- torch.distributed.is_torchelastic_launched()[原始碼][原始碼]¶
檢查此行程是否使用
torch.distributed.elastic
(又名 torchelastic) 啟動。使用
TORCHELASTIC_RUN_ID
環境變數的存在作為代理來確定目前的行程是否使用 torchelastic 啟動。 這是一個合理的代理,因為TORCHELASTIC_RUN_ID
映射到 rendezvous id,它始終是一個非空值,指示用於對等探索的作業 ID。- 回傳類型
目前支援三種初始化方法
TCP 初始化¶
有兩種方法可以使用 TCP 進行初始化,這兩種方法都需要所有行程都可存取的網路位址和所需的 world_size
。第一種方法需要指定屬於 rank 0 行程的位址。此初始化方法要求所有行程都手動指定 rank。
請注意,最新的 distributed 套件不再支援多播位址。 group_name
也已棄用。
import torch.distributed as dist
# Use address of one of the machines
dist.init_process_group(backend, init_method='tcp://10.1.1.20:23456',
rank=args.rank, world_size=4)
環境變數初始化¶
此方法將從環境變數中讀取組態,允許您完全自訂如何取得資訊。 要設定的變數為
MASTER_PORT
- 必需;必須是 rank 0 機器上的可用連接埠MASTER_ADDR
- 必需 (rank 0 除外);rank 0 節點的位址WORLD_SIZE
- 必需;可以在此處或在呼叫 init 函數時設定RANK
- 必需;可以在此處或在呼叫 init 函數時設定
rank 0 的機器將用於設定所有連線。
這是預設方法,這表示不必指定 init_method
(或者可以是 env://
)。
初始化後¶
執行 torch.distributed.init_process_group()
後,可以使用以下函數。 若要檢查進程組是否已初始化,請使用 torch.distributed.is_initialized()
。
- class torch.distributed.Backend(name)[原始碼][原始碼]¶
用於 backends 的類似枚舉的類別。
可用的 backends:GLOO、NCCL、UCC、MPI、XCCL 和其他已註冊的 backends。
此類別的值為小寫字串,例如,
"gloo"
。它們可以作為屬性存取,例如,Backend.NCCL
。可以直接呼叫此類別來解析字串,例如,
Backend(backend_str)
將檢查backend_str
是否有效,如果有效,則傳回已解析的小寫字串。它也接受大寫字串,例如,Backend("GLOO")
傳回"gloo"
。注意
條目
Backend.UNDEFINED
存在,但僅用作某些欄位的初始值。使用者不應直接使用它,也不應假設它的存在。- classmethod register_backend(name, func, extended_api=False, devices=None)[原始碼][原始碼]¶
使用給定的名稱和實例化函數註冊新的 backend。
此類別方法由第三方
ProcessGroup
擴充功能用於註冊新的 backends。- 參數
name (str) –
ProcessGroup
擴充功能的 Backend 名稱。它應與init_process_group()
中的名稱相符。func (function) – 實例化 backend 的函數處理器。該函數應在 backend 擴充功能中實作,並接受四個引數,包括
store
、rank
、world_size
和timeout
。extended_api (bool, optional) – Backend 是否支援擴充的引數結構。預設值:
False
。如果設定為True
,backend 將取得c10d::DistributedBackendOptions
的實例,以及由 backend 實作定義的程序群組選項物件。device (str 或 str 的 list, optional) – 此 backend 支援的裝置類型,例如“cpu”、“cuda”等。如果為 None,則假設同時支援“cpu”和“cuda”
注意
此第三方 backend 的支援是實驗性的,可能會發生變更。
- torch.distributed.get_backend(group=None)[原始碼][原始碼]¶
傳回給定程序群組的 backend。
- 參數
group (ProcessGroup, optional) – 要處理的程序群組。預設值為一般主程序群組。如果指定了另一個特定群組,則呼叫程序必須是
group
的一部分。- Returns
給定程序群組的 backend,以小寫字串表示。
- 回傳類型
關機¶
務必在退出時透過呼叫 destroy_process_group()
清理資源。
最簡單的模式是呼叫 destroy_process_group()
並使用 None 作為 group 參數的預設值,來銷毀每個 process group 和 backend。這個呼叫應該在訓練腳本中不再需要通訊的點進行,通常是在 main() 接近尾聲的地方。每次訓練器進程 (trainer-process) 應該呼叫一次,而不是在最外層的進程啟動器層級呼叫。
如果在逾時時間內,process group (pg) 中的所有 rank 都沒有呼叫 destroy_process_group()
,特別是當應用程式中有多個 process group 時(例如,用於 N 維並行處理),可能會在退出時發生停滯。這是因為 ProcessGroupNCCL 的解構子會呼叫 ncclCommAbort,必須集體呼叫該函式,但如果 Python 的 GC 呼叫 ProcessGroupNCCL 的解構子的順序是不確定的。呼叫 destroy_process_group()
有助於確保 ncclCommAbort 在所有 rank 中以一致的順序呼叫,並避免在 ProcessGroupNCCL 的解構子中呼叫 ncclCommAbort。
重新初始化¶
destroy_process_group 也可以用於銷毀個別的 process group。一個使用案例可能是容錯訓練,其中一個 process group 可能被銷毀,然後在執行期間初始化一個新的 process group。在這種情況下,在呼叫 destroy *之後* 和後續初始化*之前*,使用 torch.distributed 之外的其他方法同步訓練器進程至關重要。由於實現這種同步的困難,目前不支持/未測試此行為,並且被認為是已知問題。如果這個使用案例阻礙了您的工作,請提交 github issue 或 RFC。
群組 (Groups)¶
預設情況下,collectives 在預設群組(也稱為 world)上運作,並要求所有進程進入分散式函式呼叫。但是,某些工作負載可以從更細微的通訊中受益。這就是分散式群組發揮作用的地方。new_group()
函式可用於建立新的群組,包含所有進程的任意子集。它會傳回一個不透明的群組句柄,可以作為 group
參數傳遞給所有 collectives(collectives 是分散式函式,用於以某些廣為人知的程式設計模式交換資訊)。
- torch.distributed.new_group(ranks=None, timeout=None, backend=None, pg_options=None, use_local_synchronization=False, group_desc=None, device_id=None)[原始碼][原始碼]¶
建立新的分散式群組。
此函式要求主群組中的所有進程(即,作為分散式任務一部分的所有進程)都進入此函式,即使它們不打算成為群組的成員。此外,應該以相同的順序在所有進程中建立群組。
警告
安全並行使用:當使用具有
NCCL
backend 的多個 process group 時,使用者必須確保所有 rank 的 collectives 具有全域一致的執行順序。如果一個進程中的多個執行緒發出 collectives,則需要顯式同步以確保一致的順序。
當使用 torch.distributed 通訊 API 的非同步變體時,會傳回一個 work 物件,並且通訊核心會在單獨的 CUDA stream 上排隊,允許通訊和計算重疊。一旦在一個 process group 上發出一個或多個非同步操作,就必須透過呼叫 work.wait() 將它們與其他 cuda stream 同步,然後才能使用另一個 process group。
更多詳細資訊請參閱 同時使用多個 NCCL communicators。
- 參數
ranks (list[int]) – 群組成員的 rank 列表。如果為
None
,則會設定為所有 rank。預設值為None
。timeout (timedelta, optional) – 有關詳細資訊和預設值,請參閱 init_process_group。
backend (str or Backend, optional) – 要使用的 backend。根據建置時的設定,有效值為
gloo
和nccl
。預設情況下,使用與全域群組相同的 backend。此欄位應以小寫字串的形式給定(例如,"gloo"
),也可以透過Backend
屬性(例如,Backend.GLOO
)存取。如果傳入None
,則將使用與預設 process group 對應的 backend。預設值為None
。pg_options (ProcessGroupOptions, optional) – process group 選項,指定在建構特定 process group 期間需要傳入哪些其他選項。例如,對於
nccl
backend,可以指定is_high_priority_stream
,以便 process group 可以選擇高優先順序的 cuda stream。有關配置 nccl 的其他可用選項,請參閱 https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-tuse_local_synchronization (bool, optional) – 在 process group 建立結束時執行群組本地 barrier。這與非成員 rank 不需要呼叫 API 並且不加入 barrier 的情況不同。
group_desc (str, optional) – 用於描述處理程序群組的字串。
device_id (torch.device, optional) – 單個、特定的裝置,用於將此處理程序「繫結」到該裝置。如果提供此欄位,new_group 呼叫會嘗試立即為該裝置初始化一個通訊後端。
- Returns
可傳遞給 collective 呼叫的分布式群組的句柄,如果 rank 不是
ranks
的一部分,則為 GroupMember.NON_GROUP_MEMBER。
注意:use_local_synchronization 不適用於 MPI。
注意:雖然 use_local_synchronization=True 在較大的叢集和小的處理程序群組中可以顯著更快,但必須小心,因為它會更改叢集行為,因為非成員 rank 不會加入群組 barrier()。
注意:當每個 rank 創建多個重疊的處理程序群組時,use_local_synchronization=True 可能會導致死鎖。 為了避免這種情況,請確保所有 rank 遵循相同的全域創建順序。
- torch.distributed.get_group_rank(group, global_rank)[source][source]¶
將全域 rank 轉換為群組 rank。
global_rank
必須是group
的一部分,否則會引發 RuntimeError。- 參數
group (ProcessGroup) – 尋找相對 rank 的 ProcessGroup。
global_rank (int) – 要查詢的全域 rank。
- Returns
global_rank
相對於group
的群組 rank- 回傳類型
注意:在預設處理程序群組上呼叫此函式會傳回 identity
DeviceMesh¶
DeviceMesh 是一種更高等級的抽象概念,用於管理處理程序群組(或 NCCL 通訊器)。 它允許使用者輕鬆創建節點間和節點內的處理程序群組,而無需擔心如何為不同的子處理程序群組正確設定 rank,並且它有助於輕鬆管理這些分散式處理程序群組。 init_device_mesh()
函數可用於創建新的 DeviceMesh,其網格形狀描述了裝置拓撲。
- class torch.distributed.device_mesh.DeviceMesh(device_type, mesh, *, mesh_dim_names=None, _init_backend=True)[source][source]¶
DeviceMesh 代表裝置的網格,其中裝置的佈局可以表示為 n 維陣列,並且 n 維陣列的每個值都是預設處理程序群組 rank 的全域 ID。
DeviceMesh 可用於描述叢集中裝置的佈局,並充當叢集中裝置清單之間通訊的代理。
DeviceMesh 可以用作上下文管理器。
注意
DeviceMesh 遵循 SPMD 編程模型,這意味著相同的 PyTorch Python 程式在叢集中的所有處理程序/rank 上執行。 因此,使用者需要確保 mesh 陣列(描述裝置的佈局)在所有 rank 中都相同。 不一致的 mesh 將導致靜默掛起。
- 參數
device_type (str) – 網格的裝置類型。 目前支援:"cpu"、"cuda/cuda-like"。
mesh (ndarray) – 多維陣列或整數張量,用於描述裝置的佈局,其中 ID 是預設處理程序群組的全域 ID。
- Returns
代表設備佈局的
DeviceMesh
物件。- 回傳類型
以下程式以 SPMD 方式在每個處理程序/rank 上執行。 在此範例中,我們有 2 個主機,每個主機有 4 個 GPU。 對於網格的第一個維度進行縮減將跨列 (0, 4)、.. 和 (3, 7) 進行縮減,對於網格的第二個維度進行縮減將跨行 (0, 1, 2, 3) 和 (4, 5, 6, 7) 進行縮減。
- Example:
>>> from torch.distributed.device_mesh import DeviceMesh >>> >>> # Initialize device mesh as (2, 4) to represent the topology >>> # of cross-host(dim 0), and within-host (dim 1). >>> mesh = DeviceMesh(device_type="cuda", mesh=[[0, 1, 2, 3],[4, 5, 6, 7]])
- static from_group(group, device_type, mesh=None, *, mesh_dim_names=None)[source][source]¶
從現有的
ProcessGroup
構建一個具有device_type
的DeviceMesh
。構建的設備網格的維度數量等於傳遞的群組數量。 如果傳遞了多個群組,則需要
mesh
參數。- 回傳類型
- get_all_groups()[source][source]¶
返回所有網格維度的 ProcessGroup 的列表。
- Returns
ProcessGroup
物件的列表。- 回傳類型
List[ProcessGroup]
- get_group(mesh_dim=None)[source][source]¶
返回由 mesh_dim 指定的單個 ProcessGroup,或者,如果未指定 mesh_dim 並且 DeviceMesh 是 1 維的,則返回網格中唯一的 ProcessGroup。
- 參數
mesh_dim (str/python:int, optional) – 它可以是網格維度的名稱或索引
None. (網格維度的,預設為) –
- Returns
一個
ProcessGroup
物件。- 回傳類型
ProcessGroup
- get_local_rank(mesh_dim=None)[source][source]¶
返回 DeviceMesh 的給定 mesh_dim 的本地 rank。
- 參數
mesh_dim (str/python:int, optional) – 它可以是網格維度的名稱或索引
None. (網格維度的,預設為) –
- Returns
一個表示本地 rank 的整數。
- 回傳類型
以下程式在 SPMD 模式下的每個程序/rank 上運行。 在這個例子中,我們有 2 台主機,每台主機有 4 個 GPU。 在 rank 0、1、2、3 上呼叫 mesh_2d.get_local_rank(mesh_dim=0) 將返回 0。 在 rank 4、5、6、7 上呼叫 mesh_2d.get_local_rank(mesh_dim=0) 將返回 1。 在 rank 0、4 上呼叫 mesh_2d.get_local_rank(mesh_dim=1) 將返回 0。 在 rank 1、5 上呼叫 mesh_2d.get_local_rank(mesh_dim=1) 將返回 1。 在 rank 2、6 上呼叫 mesh_2d.get_local_rank(mesh_dim=1) 將返回 2。 在 rank 3、7 上呼叫 mesh_2d.get_local_rank(mesh_dim=1) 將返回 3。
- Example:
>>> from torch.distributed.device_mesh import DeviceMesh >>> >>> # Initialize device mesh as (2, 4) to represent the topology >>> # of cross-host(dim 0), and within-host (dim 1). >>> mesh = DeviceMesh(device_type="cuda", mesh=[[0, 1, 2, 3],[4, 5, 6, 7]])
點對點通訊¶
- torch.distributed.send(tensor, dst=None, group=None, tag=0, group_dst=None)[source][source]¶
同步傳送一個 Tensor。
警告
NCCL 後端不支援
tag
。
- torch.distributed.recv(tensor, src=None, group=None, tag=0, group_src=None)[source][source]¶
同步接收一個 tensor。
警告
NCCL 後端不支援
tag
。- 參數
- Returns
如果不在群組中,則發送者排名為 -1
- 回傳類型
當使用 isend()
和 irecv()
時,會傳回分散式請求物件。 一般來說,此物件的類型是不確定的,因為它們不應該手動建立,但保證它們支援兩種方法
is_completed()
- 如果操作已完成,則傳回 Truewait()
- 將阻塞程序直到操作完成。 保證is_completed()
在其傳回後傳回 True。
- torch.distributed.isend(tensor, dst=None, group=None, tag=0, group_dst=None)[source][source]¶
非同步發送一個 tensor。
警告
在請求完成之前修改
tensor
會導致未定義的行為。警告
NCCL 後端不支援
tag
。與會阻塞的 send 不同,isend 允許 src == dst 排名,即傳送到自己。
- torch.distributed.irecv(tensor, src=None, group=None, tag=0, group_src=None)[source][source]¶
非同步接收一個 tensor。
警告
NCCL 後端不支援
tag
。與會阻塞的 recv 不同,irecv 允許 src == dst 排名,即從自己接收。
- 參數
- Returns
一個分散式請求物件。 如果不在群組中,則為 None
- 回傳類型
- torch.distributed.send_object_list(object_list, dst=None, group=None, device=None, group_dst=None)[source][source]¶
同步發送
object_list
中可 picklable 的物件。類似於
send()
,但可以傳入 Python 物件。 請注意,object_list
中的所有物件都必須是可 picklable 才能被發送。- 參數
object_list (List[Any]) – 要傳送的輸入物件列表。每個物件都必須是可 pickle 的。接收端必須提供相同大小的列表。
dst (int) – 傳送
object_list
的目標 rank。目標 rank 是基於全域處理序群組 (無論group
參數為何)group (Optional[ProcessGroup]) – (ProcessGroup, optional): 要處理的處理序群組。如果為 None,將使用預設處理序群組。預設值為
None
。device (
torch.device
, optional) – 如果不為 None,則物件將被序列化並轉換為 tensors,這些 tensors 會在傳送前移動到device
。預設值為None
。group_dst (int, optional) –
group
上的目標 rank。必須指定dst
和group_dst
其中一個,但不能同時指定。
- Returns
None
.
注意
對於基於 NCCL 的處理序群組,物件的內部 tensor 表示必須在通訊發生前移動到 GPU 裝置。在這種情況下,使用的裝置由
torch.cuda.current_device()
給出,使用者有責任確保已進行設定,以便每個 rank 都有一個單獨的 GPU,透過torch.cuda.set_device()
。警告
send_object_list()
隱式地使用pickle
模組,已知它是不安全的。可以構建惡意的 pickle 資料,這些資料在 unpickling 期間會執行任意程式碼。僅在您信任的資料上呼叫此函數。警告
使用 GPU tensors 呼叫
send_object_list()
並不支援,且效率低下,因為它會產生 GPU -> CPU 的傳輸,因為 tensors 將被 pickle。請考慮改用send()
。- Example:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> # Assumes backend is not NCCL >>> device = torch.device("cpu") >>> if dist.get_rank() == 0: >>> # Assumes world_size of 2. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> dist.send_object_list(objects, dst=1, device=device) >>> else: >>> objects = [None, None, None] >>> dist.recv_object_list(objects, src=0, device=device) >>> objects ['foo', 12, {1: 2}]
- torch.distributed.recv_object_list(object_list, src=None, group=None, device=None, group_src=None)[source][source]¶
同步接收
object_list
中的可 pickle 物件。類似於
recv()
,但可以接收 Python 物件。- 參數
object_list (List[Any]) – 要接收到的物件列表。必須提供一個大小等於所傳送列表大小的列表。
src (int, optional) – 要從中接收
object_list
的來源 rank。來源 rank 是基於全域處理序群組 (無論group
參數為何)。如果設定為 None,將從任何 rank 接收。預設值為None
。group (Optional[ProcessGroup]) – (ProcessGroup, optional): 要處理的處理序群組。如果為 None,將使用預設處理序群組。預設值為
None
。device (
torch.device
, optional) – 如果不為 None,則在此裝置上接收。預設值為None
。group_src (int, optional) –
group
上的目標排名。 指定src
和group_src
無效。
- Returns
傳送者 rank。如果 rank 不是群組的一部分,則為 -1。如果 rank 是群組的一部分,則
object_list
將包含來自src
rank 的已傳送物件。
注意
對於基於 NCCL 的處理序群組,物件的內部 tensor 表示必須在通訊發生前移動到 GPU 裝置。在這種情況下,使用的裝置由
torch.cuda.current_device()
給出,使用者有責任確保已進行設定,以便每個 rank 都有一個單獨的 GPU,透過torch.cuda.set_device()
。警告
recv_object_list()
隱式地使用pickle
模組,已知它是不安全的。可以構建惡意的 pickle 資料,這些資料在 unpickling 期間會執行任意程式碼。僅在您信任的資料上呼叫此函數。警告
使用 GPU tensors 呼叫
recv_object_list()
並不支援,且效率低下,因為它會產生 GPU -> CPU 的傳輸,因為 tensors 將被 pickle。請考慮改用recv()
。- Example:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> # Assumes backend is not NCCL >>> device = torch.device("cpu") >>> if dist.get_rank() == 0: >>> # Assumes world_size of 2. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> dist.send_object_list(objects, dst=1, device=device) >>> else: >>> objects = [None, None, None] >>> dist.recv_object_list(objects, src=0, device=device) >>> objects ['foo', 12, {1: 2}]
- torch.distributed.batch_isend_irecv(p2p_op_list)[source][source]¶
非同步地傳送或接收一批 tensors 並傳回請求列表。
處理
p2p_op_list
中的每個操作並傳回相應的請求。目前支援 NCCL、Gloo 和 UCC 後端。- 參數
p2p_op_list (List[P2POp]) – 點對點操作列表(每個運算子的類型都是
torch.distributed.P2POp
)。列表中 isend/irecv 的順序很重要,它需要與遠端的相應 isend/irecv 相匹配。- Returns
透過呼叫 op_list 中相應的 op 傳回的分散式請求物件列表。
- 回傳類型
範例
>>> send_tensor = torch.arange(2, dtype=torch.float32) + 2 * rank >>> recv_tensor = torch.randn(2, dtype=torch.float32) >>> send_op = dist.P2POp(dist.isend, send_tensor, (rank + 1)%world_size) >>> recv_op = dist.P2POp(dist.irecv, recv_tensor, (rank - 1 + world_size)%world_size) >>> reqs = batch_isend_irecv([send_op, recv_op]) >>> for req in reqs: >>> req.wait() >>> recv_tensor tensor([2, 3]) # Rank 0 tensor([0, 1]) # Rank 1
注意
請注意,當此 API 與 NCCL PG 後端一起使用時,使用者必須使用 torch.cuda.set_device 設定目前的 GPU 裝置,否則會導致意外的掛起問題。
此外,如果此 API 是傳遞給
dist.P2POp
的group
中的第一個集合呼叫,則group
的所有 rank 都必須參與此 API 呼叫;否則,行為是未定義的。如果此 API 呼叫不是group
中的第一個集合呼叫,則允許僅涉及group
的 rank 子集的批次 P2P 操作。
- class torch.distributed.P2POp(op, tensor, peer=None, group=None, tag=0, group_peer=None)[source][source]¶
一個用於為
batch_isend_irecv
構建點對點操作的類別。這個類別構建了 P2P 操作的類型、通訊緩衝區、對等 rank、進程組和標籤。這個類別的實例將傳遞給
batch_isend_irecv
以進行點對點通訊。- 參數
op (Callable) – 一個將資料傳送至或從對等進程接收資料的函式。
op
的類型是torch.distributed.isend
或torch.distributed.irecv
。tensor (Tensor) – 要傳送或接收的 Tensor。
peer (int, optional) – 目的地或來源 rank。
group (ProcessGroup, optional) – 要處理的程序群組。如果為 None,將使用預設程序群組。
tag (int, optional) – 將傳送與接收進行匹配的標籤。
group_peer (int, optional) – 目的地或來源 rank。
同步和非同步集合操作¶
每個集合操作函式都支援以下兩種操作,具體取決於傳遞到集合中的 async_op
標誌的設定
同步操作 - 預設模式,當 async_op
設定為 False
時。當函式返回時,保證執行集合操作。在 CUDA 操作的情況下,不保證 CUDA 操作完成,因為 CUDA 操作是非同步的。對於 CPU 集合,任何利用集合呼叫輸出的進一步函式呼叫都將按預期執行。對於 CUDA 集合,在同一 CUDA stream 上利用輸出的函式呼叫將按預期執行。使用者必須注意在不同 stream 下執行時的同步。有關 CUDA 語義(例如 stream 同步)的詳細資訊,請參閱CUDA 語義。請參閱下面的腳本,以查看 CPU 和 CUDA 操作在這些語義上的差異範例。
非同步操作 - 當 async_op
設定為 True 時。集合操作函式返回一個分散式請求物件。通常,您不需要手動建立它,並且保證它支援兩種方法
is_completed()
- 在 CPU 集合的情況下,如果完成則返回True
。在 CUDA 操作的情況下,如果操作已成功排隊到 CUDA stream 上,並且可以在預設 stream 上使用輸出而無需進一步同步,則返回True
。wait()
- 在 CPU 集合的情況下,將阻塞進程直到操作完成。在 CUDA 集合的情況下,將阻塞直到操作已成功排隊到 CUDA stream 上,並且可以在預設 stream 上使用輸出而無需進一步同步。get_future()
- 返回torch._C.Future
物件。支援 NCCL,也支援 GLOO 和 MPI 上的大多數操作,除了點對點操作。注意:隨著我們繼續採用 Futures 並合併 API,get_future()
呼叫可能會變得多餘。
範例
以下程式碼可用作使用分散式集合時 CUDA 操作語義的參考。它顯示了在不同 CUDA stream 上使用集合輸出時的顯式同步需求
# Code runs on each rank.
dist.init_process_group("nccl", rank=rank, world_size=2)
output = torch.tensor([rank]).cuda(rank)
s = torch.cuda.Stream()
handle = dist.all_reduce(output, async_op=True)
# Wait ensures the operation is enqueued, but not necessarily complete.
handle.wait()
# Using result on non-default stream.
with torch.cuda.stream(s):
s.wait_stream(torch.cuda.default_stream())
output.add_(100)
if rank == 0:
# if the explicit call to wait_stream was omitted, the output below will be
# non-deterministically 1 or 101, depending on whether the allreduce overwrote
# the value after the add completed.
print(output)
集合函式¶
- torch.distributed.broadcast(tensor, src=None, group=None, async_op=False, group_src=None)[原始碼][原始碼]¶
將 tensor 廣播至整個群組。
tensor
在參與集合運算的各個進程中,必須具有相同的元素數量。- 參數
- Returns
如果 async_op 設為 True,則為非同步工作控制代碼。如果不是 async_op 或不屬於群組,則為 None
- torch.distributed.broadcast_object_list(object_list, src=None, group=None, device=None, group_src=None)[原始碼][原始碼]¶
將
object_list
中的可 pickle 物件廣播至整個群組。類似於
broadcast()
,但可以傳入 Python 物件。請注意,object_list
中的所有物件都必須是可 pickle 的,才能進行廣播。- 參數
object_list (List[Any]) – 要廣播的輸入物件清單。每個物件都必須是可 pickle 的。只有
src
rank 上的物件會被廣播,但每個 rank 都必須提供大小相同的清單。src (int) – 要從中廣播
object_list
的來源 rank。來源 rank 是根據全域進程群組 (不論group
引數為何)group (Optional[ProcessGroup]) – (ProcessGroup, optional): 要處理的處理序群組。如果為 None,將使用預設處理序群組。預設值為
None
。device (
torch.device
, optional) – 如果不是 None,則物件會序列化並轉換為 tensor,然後移至device
後再進行廣播。預設值為None
。group_src (int) –
group
上的來源 rank。不得指定group_src
和src
其中一個,但不能同時指定。
- Returns
None
。如果 rank 是群組的一部分,則object_list
將包含來自src
rank 的廣播物件。
注意
對於基於 NCCL 的處理序群組,物件的內部 tensor 表示必須在通訊發生前移動到 GPU 裝置。在這種情況下,使用的裝置由
torch.cuda.current_device()
給出,使用者有責任確保已進行設定,以便每個 rank 都有一個單獨的 GPU,透過torch.cuda.set_device()
。注意
請注意,這個 API 與
broadcast()
集合運算略有不同,因為它未提供async_op
控制代碼,因此將會是阻塞呼叫。警告
broadcast_object_list()
隱含地使用pickle
模組,已知它是不安全的。可以建構惡意的 pickle 資料,在 unpickling 期間執行任意程式碼。僅使用您信任的資料呼叫此函式。警告
使用 GPU tensor 呼叫
broadcast_object_list()
並未受到良好支援且效率不彰,因為它會產生 GPU -> CPU 傳輸,因為 tensor 會被 pickled。請考慮改用broadcast()
。- Example:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> if dist.get_rank() == 0: >>> # Assumes world_size of 3. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> else: >>> objects = [None, None, None] >>> # Assumes backend is not NCCL >>> device = torch.device("cpu") >>> dist.broadcast_object_list(objects, src=0, device=device) >>> objects ['foo', 12, {1: 2}]
- torch.distributed.all_reduce(tensor, op=<RedOpType.SUM: 0>, group=None, async_op=False)[source][source]¶
在所有機器上歸約(Reduce)張量資料,使所有機器都獲得最終結果。
呼叫
tensor
後,所有進程中的數值將完全相同。支援複數張量。
- 參數
- Returns
如果 async_op 設為 True,則為非同步工作控制代碼。如果不是 async_op 或不屬於群組,則為 None
範例
>>> # All tensors below are of torch.int64 type. >>> # We have 2 process groups, 2 ranks. >>> device = torch.device(f'cuda:{rank}') >>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank >>> tensor tensor([1, 2], device='cuda:0') # Rank 0 tensor([3, 4], device='cuda:1') # Rank 1 >>> dist.all_reduce(tensor, op=ReduceOp.SUM) >>> tensor tensor([4, 6], device='cuda:0') # Rank 0 tensor([4, 6], device='cuda:1') # Rank 1
>>> # All tensors below are of torch.cfloat type. >>> # We have 2 process groups, 2 ranks. >>> tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat, device=device) + 2 * rank * (1+1j) >>> tensor tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0 tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1 >>> dist.all_reduce(tensor, op=ReduceOp.SUM) >>> tensor tensor([4.+4.j, 6.+6.j], device='cuda:0') # Rank 0 tensor([4.+4.j, 6.+6.j], device='cuda:1') # Rank 1
- torch.distributed.reduce(tensor, dst=None, op=<RedOpType.SUM: 0>, group=None, async_op=False, group_dst=None)[source][source]¶
在所有機器上歸約張量資料。
只有 rank 為
dst
的進程會收到最終結果。- 參數
tensor (Tensor) – 集體運算的輸入和輸出。此函數執行原地(in-place)操作。
dst (int) – 全域處理程序群組中的目標排名 (無論
group
參數如何)op (optional) –
torch.distributed.ReduceOp
列舉中的其中一個值。指定用於逐元素歸約的運算。group (ProcessGroup, optional) – 要處理的程序群組。如果為 None,將使用預設程序群組。
async_op (bool, optional) – 這個運算是否應為非同步運算
group_dst (int) –
group
上的目標 rank。必須指定group_dst
和dst
之一,但不能同時指定兩者。
- Returns
如果 async_op 設為 True,則為非同步工作控制代碼。如果不是 async_op 或不屬於群組,則為 None
- torch.distributed.all_gather(tensor_list, tensor, group=None, async_op=False)[source][source]¶
將整個群組的張量收集到一個列表中。
支援複數和大小不均勻的張量。
- 參數
- Returns
如果 async_op 設為 True,則為非同步工作控制代碼。如果不是 async_op 或不屬於群組,則為 None
範例
>>> # All tensors below are of torch.int64 dtype. >>> # We have 2 process groups, 2 ranks. >>> device = torch.device(f'cuda:{rank}') >>> tensor_list = [torch.zeros(2, dtype=torch.int64, device=device) for _ in range(2)] >>> tensor_list [tensor([0, 0], device='cuda:0'), tensor([0, 0], device='cuda:0')] # Rank 0 [tensor([0, 0], device='cuda:1'), tensor([0, 0], device='cuda:1')] # Rank 1 >>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank >>> tensor tensor([1, 2], device='cuda:0') # Rank 0 tensor([3, 4], device='cuda:1') # Rank 1 >>> dist.all_gather(tensor_list, tensor) >>> tensor_list [tensor([1, 2], device='cuda:0'), tensor([3, 4], device='cuda:0')] # Rank 0 [tensor([1, 2], device='cuda:1'), tensor([3, 4], device='cuda:1')] # Rank 1
>>> # All tensors below are of torch.cfloat dtype. >>> # We have 2 process groups, 2 ranks. >>> tensor_list = [torch.zeros(2, dtype=torch.cfloat, device=device) for _ in range(2)] >>> tensor_list [tensor([0.+0.j, 0.+0.j], device='cuda:0'), tensor([0.+0.j, 0.+0.j], device='cuda:0')] # Rank 0 [tensor([0.+0.j, 0.+0.j], device='cuda:1'), tensor([0.+0.j, 0.+0.j], device='cuda:1')] # Rank 1 >>> tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat, device=device) + 2 * rank * (1+1j) >>> tensor tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0 tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1 >>> dist.all_gather(tensor_list, tensor) >>> tensor_list [tensor([1.+1.j, 2.+2.j], device='cuda:0'), tensor([3.+3.j, 4.+4.j], device='cuda:0')] # Rank 0 [tensor([1.+1.j, 2.+2.j], device='cuda:1'), tensor([3.+3.j, 4.+4.j], device='cuda:1')] # Rank 1
- torch.distributed.all_gather_into_tensor(output_tensor, input_tensor, group=None, async_op=False)[source][source]¶
從所有 rank 收集張量,並將它們放入單個輸出張量中。
此函數要求每個進程上的所有張量大小相同。
- 參數
output_tensor (Tensor) – 輸出張量,用於容納來自所有 rank 的張量元素。它必須正確調整大小,才能具有以下形式之一:(i)沿主要維度連接所有輸入張量;有關「連接」的定義,請參閱
torch.cat()
;(ii)沿主要維度堆疊所有輸入張量;有關「堆疊」的定義,請參閱torch.stack()
。下面的範例可以更好地說明支援的輸出形式。input_tensor (Tensor) – 要從目前 rank 收集的張量。與
all_gather
API 不同,此 API 中的輸入張量在所有 rank 中必須具有相同的大小。group (ProcessGroup, optional) – 要處理的程序群組。如果為 None,將使用預設程序群組。
async_op (bool, optional) – 這個運算是否應為非同步運算
- Returns
如果 async_op 設為 True,則為非同步工作控制代碼。如果不是 async_op 或不屬於群組,則為 None
範例
>>> # All tensors below are of torch.int64 dtype and on CUDA devices. >>> # We have two ranks. >>> device = torch.device(f'cuda:{rank}') >>> tensor_in = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank >>> tensor_in tensor([1, 2], device='cuda:0') # Rank 0 tensor([3, 4], device='cuda:1') # Rank 1 >>> # Output in concatenation form >>> tensor_out = torch.zeros(world_size * 2, dtype=torch.int64, device=device) >>> dist.all_gather_into_tensor(tensor_out, tensor_in) >>> tensor_out tensor([1, 2, 3, 4], device='cuda:0') # Rank 0 tensor([1, 2, 3, 4], device='cuda:1') # Rank 1 >>> # Output in stack form >>> tensor_out2 = torch.zeros(world_size, 2, dtype=torch.int64, device=device) >>> dist.all_gather_into_tensor(tensor_out2, tensor_in) >>> tensor_out2 tensor([[1, 2], [3, 4]], device='cuda:0') # Rank 0 tensor([[1, 2], [3, 4]], device='cuda:1') # Rank 1
警告
Gloo 後端不支援此 API。
- torch.distributed.all_gather_object(object_list, obj, group=None)[source][source]¶
將整個群組中可 pickle 的物件收集到一個列表中。
與
all_gather()
類似,但可以傳入 Python 物件。請注意,物件必須可 pickle 才能被收集。- 參數
object_list (list[Any]) – 輸出列表。其大小應與此集體運算群組的大小相同,並且將包含輸出。
obj (Any) – 要從目前進程廣播的可 pickle Python 物件。
group (ProcessGroup, optional) – 要使用的處理序群組。如果為 None,將使用預設的處理序群組。預設值為
None
。
- Returns
None。如果呼叫的 rank 屬於此群組,則 collective 的輸出將會填入輸入的
object_list
中。如果呼叫的 rank 不屬於此群組,則傳入的object_list
將不會被修改。
注意
請注意,此 API 與
all_gather()
collective 略有不同,因為它不提供async_op
控制代碼,因此將會是一個阻塞呼叫。注意
對於基於 NCCL 的處理序群組,物件的內部張量表示必須在進行通訊之前移至 GPU 裝置。在這種情況下,使用的裝置由
torch.cuda.current_device()
給定,並且使用者有責任確保已進行設定,以便每個 rank 都具有一個單獨的 GPU,透過torch.cuda.set_device()
進行設定。警告
all_gather_object()
隱式使用pickle
模組,已知該模組是不安全的。可以建構惡意的 pickle 資料,這些資料會在反序列化期間執行任意程式碼。僅使用您信任的資料呼叫此函數。警告
使用 GPU 張量呼叫
all_gather_object()
並不支援,且效率低下,因為會產生 GPU -> CPU 傳輸,因為張量將被 pickled。請考慮改用all_gather()
。- Example:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> # Assumes world_size of 3. >>> gather_objects = ["foo", 12, {1: 2}] # any picklable object >>> output = [None for _ in gather_objects] >>> dist.all_gather_object(output, gather_objects[dist.get_rank()]) >>> output ['foo', 12, {1: 2}]
- torch.distributed.gather(tensor, gather_list=None, dst=None, group=None, async_op=False, group_dst=None)[source][source]¶
在單個處理序中收集張量清單。
此函數要求每個進程上的所有張量大小相同。
- 參數
tensor (Tensor) – 輸入張量。
gather_list (list[Tensor], optional) – 用於收集資料的適當的相同大小的張量清單 (預設值為 None,必須在目標 rank 上指定)
dst (int, optional) – 全域處理序群組中的目標 rank (不論
group
參數)。 (如果dst
和group_dst
都為 None,則預設值為全域 rank 0)group (ProcessGroup, optional) – 要處理的程序群組。如果為 None,將使用預設程序群組。
async_op (bool, optional) – 這個運算是否應為非同步運算
group_dst (int, optional) –
group
上的目標排名。 指定dst
和group_dst
無效
- Returns
如果 async_op 設為 True,則為非同步工作控制代碼。如果不是 async_op 或不屬於群組,則為 None
注意
請注意,gather_list 中的所有張量都必須具有相同的大小。
- Example:
>>> # We have 2 process groups, 2 ranks. >>> tensor_size = 2 >>> device = torch.device(f'cuda:{rank}') >>> tensor = torch.ones(tensor_size, device=device) + rank >>> if dist.get_rank() == 0: >>> gather_list = [torch.zeros_like(tensor, device=device) for i in range(2)] >>> else: >>> gather_list = None >>> dist.gather(tensor, gather_list, dst=0) >>> # Rank 0 gets gathered data. >>> gather_list [tensor([1., 1.], device='cuda:0'), tensor([2., 2.], device='cuda:0')] # Rank 0 None # Rank 1
- torch.distributed.gather_object(obj, object_gather_list=None, dst=None, group=None, group_dst=None)[source][source]¶
在單個處理序中從整個群組收集可 picklable 的物件。
類似於
gather()
,但可以傳入 Python 物件。請注意,物件必須是可 picklable 的才能被收集。- 參數
obj (Any) – 輸入物件。必須是可 picklable 的。
object_gather_list (list[Any]) – 輸出清單。在
dst
rank 上,它應該正確地調整為此 collective 群組的大小,並將包含輸出。在非 dst rank 上必須為None
。(預設值為None
)dst (int, optional) – 全域處理序群組中的目標 rank (不論
group
參數)。 (如果dst
和group_dst
都為 None,則預設值為全域 rank 0)group (Optional[ProcessGroup]) – (ProcessGroup, optional): 要處理的處理序群組。如果為 None,將使用預設處理序群組。預設值為
None
。group_dst (int, optional) –
group
上的目標排名。 指定dst
和group_dst
無效
- Returns
None。在
dst
rank 上,object_gather_list
將包含 collective 的輸出。
注意
請注意,此 API 與 gather collective 略有不同,因為它不提供 async_op 控制代碼,因此將會是一個阻塞呼叫。
注意
對於基於 NCCL 的處理序群組,物件的內部張量表示必須在進行通訊之前移至 GPU 裝置。在這種情況下,使用的裝置由
torch.cuda.current_device()
給定,並且使用者有責任確保已進行設定,以便每個 rank 都具有一個單獨的 GPU,透過torch.cuda.set_device()
進行設定。警告
gather_object()
隱式使用pickle
模組,已知該模組是不安全的。可以建構惡意的 pickle 資料,這些資料會在反序列化期間執行任意程式碼。僅使用您信任的資料呼叫此函數。警告
使用 GPU 張量呼叫
gather_object()
並未獲得良好支援,且效率不佳,因為這會導致 GPU -> CPU 傳輸,因為張量將被 pickle 序列化。請考慮改用gather()
。- Example:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> # Assumes world_size of 3. >>> gather_objects = ["foo", 12, {1: 2}] # any picklable object >>> output = [None for _ in gather_objects] >>> dist.gather_object( ... gather_objects[dist.get_rank()], ... output if dist.get_rank() == 0 else None, ... dst=0 ... ) >>> # On rank 0 >>> output ['foo', 12, {1: 2}]
- torch.distributed.scatter(tensor, scatter_list=None, src=None, group=None, async_op=False, group_src=None)[原始碼][原始碼]¶
將張量列表分散到群組中的所有進程。
每個進程將接收到一個張量,並將其資料儲存在
tensor
引數中。支援複數張量。
- 參數
tensor (Tensor) – 輸出張量。
scatter_list (list[Tensor]) – 要分散的張量列表 (預設為 None,必須在來源排名上指定)
src (int) – 全域進程群組上的來源排名 (無論
group
引數如何)。 (如果src
和group_src
均為 None,則預設為全域排名 0)group (ProcessGroup, optional) – 要處理的程序群組。如果為 None,將使用預設程序群組。
async_op (bool, optional) – 這個運算是否應為非同步運算
group_src (int, optional) –
group
上的來源排名。同時指定src
和group_src
無效
- Returns
如果 async_op 設為 True,則為非同步工作控制代碼。如果不是 async_op 或不屬於群組,則為 None
注意
請注意,scatter_list 中的所有張量都必須具有相同的大小。
- Example:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> tensor_size = 2 >>> device = torch.device(f'cuda:{rank}') >>> output_tensor = torch.zeros(tensor_size, device=device) >>> if dist.get_rank() == 0: >>> # Assumes world_size of 2. >>> # Only tensors, all of which must be the same size. >>> t_ones = torch.ones(tensor_size, device=device) >>> t_fives = torch.ones(tensor_size, device=device) * 5 >>> scatter_list = [t_ones, t_fives] >>> else: >>> scatter_list = None >>> dist.scatter(output_tensor, scatter_list, src=0) >>> # Rank i gets scatter_list[i]. >>> output_tensor tensor([1., 1.], device='cuda:0') # Rank 0 tensor([5., 5.], device='cuda:1') # Rank 1
- torch.distributed.scatter_object_list(scatter_object_output_list, scatter_object_input_list=None, src=None, group=None, group_src=None)[原始碼][原始碼]¶
將
scatter_object_input_list
中的可 pickle 物件分散到整個群組。類似於
scatter()
,但可以傳入 Python 物件。在每個排名上,分散的物件將儲存為scatter_object_output_list
的第一個元素。請注意,scatter_object_input_list
中的所有物件都必須是可 pickle 序列化的,才能被分散。- 參數
scatter_object_output_list (List[Any]) – 非空列表,其第一個元素將儲存分散到此排名的物件。
scatter_object_input_list (List[Any], optional) – 要分散的輸入物件列表。每個物件都必須是可 pickle 序列化的。只有
src
排名上的物件才將被分散,並且非 src 排名的引數可以為None
。src (int) – 從中分散
scatter_object_input_list
的來源排名。來源排名基於全域進程群組 (無論group
引數如何)。 (如果src
和group_src
均為 None,則預設為全域排名 0)group (Optional[ProcessGroup]) – (ProcessGroup, optional): 要處理的處理序群組。如果為 None,將使用預設處理序群組。預設值為
None
。group_src (int, optional) –
group
上的來源排名。同時指定src
和group_src
無效
- Returns
None
。如果排名是群組的一部分,則scatter_object_output_list
的第一個元素將設定為此排名的分散物件。
注意
請注意,此 API 與分散集合略有不同,因為它不提供
async_op
控制代碼,因此將是一個封鎖呼叫。警告
scatter_object_list()
隱式使用pickle
模組,而該模組已知是不安全的。可以建構惡意的 pickle 資料,這些資料將在解 pickle 期間執行任意程式碼。僅使用您信任的資料呼叫此函式。警告
使用 GPU 張量呼叫
scatter_object_list()
並未獲得良好支援,且效率不佳,因為這會導致 GPU -> CPU 傳輸,因為張量將被 pickle 序列化。請考慮改用scatter()
。- Example:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> if dist.get_rank() == 0: >>> # Assumes world_size of 3. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> else: >>> # Can be any list on non-src ranks, elements are not used. >>> objects = [None, None, None] >>> output_list = [None] >>> dist.scatter_object_list(output_list, objects, src=0) >>> # Rank i gets objects[i]. For example, on rank 2: >>> output_list [{1: 2}]
- torch.distributed.reduce_scatter(output, input_list, op=<RedOpType.SUM: 0>, group=None, async_op=False)[原始碼][原始碼]¶
縮減,然後將張量列表分散到群組中的所有進程。
- 參數
- Returns
如果 async_op 設定為 True,則為非同步工作控制代碼 (Async work handle)。 如果不是 async_op 或不屬於群組,則為 None。
- torch.distributed.reduce_scatter_tensor(output, input, op=<RedOpType.SUM: 0>, group=None, async_op=False)[source][source]¶
縮減,然後將 tensor 分散到群組中的所有 rank。
- 參數
output (Tensor) – 輸出 tensor。 它在所有 rank 中應具有相同的大小。
input (Tensor) – 要縮減和分散的輸入 tensor。 其大小應為輸出 tensor 大小乘以 world size。 輸入 tensor 可以具有以下形狀之一:(i)沿主要維度的輸出 tensor 的串聯,或(ii)沿主要維度的輸出 tensor 的堆疊。 有關“串聯”的定義,請參見
torch.cat()
。 有關“堆疊”的定義,請參見torch.stack()
。group (ProcessGroup, optional) – 要處理的程序群組。如果為 None,將使用預設程序群組。
async_op (bool, optional) – 此操作是否應該為非同步操作 (async op)。
- Returns
如果 async_op 設定為 True,則為非同步工作控制代碼 (Async work handle)。 如果不是 async_op 或不屬於群組,則為 None。
範例
>>> # All tensors below are of torch.int64 dtype and on CUDA devices. >>> # We have two ranks. >>> device = torch.device(f'cuda:{rank}') >>> tensor_out = torch.zeros(2, dtype=torch.int64, device=device) >>> # Input in concatenation form >>> tensor_in = torch.arange(world_size * 2, dtype=torch.int64, device=device) >>> tensor_in tensor([0, 1, 2, 3], device='cuda:0') # Rank 0 tensor([0, 1, 2, 3], device='cuda:1') # Rank 1 >>> dist.reduce_scatter_tensor(tensor_out, tensor_in) >>> tensor_out tensor([0, 2], device='cuda:0') # Rank 0 tensor([4, 6], device='cuda:1') # Rank 1 >>> # Input in stack form >>> tensor_in = torch.reshape(tensor_in, (world_size, 2)) >>> tensor_in tensor([[0, 1], [2, 3]], device='cuda:0') # Rank 0 tensor([[0, 1], [2, 3]], device='cuda:1') # Rank 1 >>> dist.reduce_scatter_tensor(tensor_out, tensor_in) >>> tensor_out tensor([0, 2], device='cuda:0') # Rank 0 tensor([4, 6], device='cuda:1') # Rank 1
警告
Gloo 後端不支援此 API。
- torch.distributed.all_to_all_single(output, input, output_split_sizes=None, input_split_sizes=None, group=None, async_op=False)[source][source]¶
分割輸入 tensor,然後將分割的列表分散到群組中的所有進程。
稍後,接收到的 tensors 會從群組中的所有進程串聯起來,並作為單個輸出 tensor 回傳。
支援複數張量。
- 參數
output (Tensor) – 收集的串聯輸出 tensor。
input (Tensor) – 要分散的輸入 tensor。
output_split_sizes – (list[Int], optional): 如果指定為 None 或空,則為 dim 0 的輸出分割大小,
output
tensor 的 dim 0 必須能被world_size
整除。input_split_sizes – (list[Int], optional): 如果指定為 None 或空,則為 dim 0 的輸入分割大小,
input
tensor 的 dim 0 必須能被world_size
整除。group (ProcessGroup, optional) – 要處理的程序群組。如果為 None,將使用預設程序群組。
async_op (bool, optional) – 此操作是否應該為非同步操作 (async op)。
- Returns
如果 async_op 設定為 True,則為非同步工作控制代碼 (Async work handle)。 如果不是 async_op 或不屬於群組,則為 None。
警告
all_to_all_single 是實驗性的,並且可能會變更。
範例
>>> input = torch.arange(4) + rank * 4 >>> input tensor([0, 1, 2, 3]) # Rank 0 tensor([4, 5, 6, 7]) # Rank 1 tensor([8, 9, 10, 11]) # Rank 2 tensor([12, 13, 14, 15]) # Rank 3 >>> output = torch.empty([4], dtype=torch.int64) >>> dist.all_to_all_single(output, input) >>> output tensor([0, 4, 8, 12]) # Rank 0 tensor([1, 5, 9, 13]) # Rank 1 tensor([2, 6, 10, 14]) # Rank 2 tensor([3, 7, 11, 15]) # Rank 3
>>> # Essentially, it is similar to following operation: >>> scatter_list = list(input.chunk(world_size)) >>> gather_list = list(output.chunk(world_size)) >>> for i in range(world_size): >>> dist.scatter(gather_list[i], scatter_list if i == rank else [], src = i)
>>> # Another example with uneven split >>> input tensor([0, 1, 2, 3, 4, 5]) # Rank 0 tensor([10, 11, 12, 13, 14, 15, 16, 17, 18]) # Rank 1 tensor([20, 21, 22, 23, 24]) # Rank 2 tensor([30, 31, 32, 33, 34, 35, 36]) # Rank 3 >>> input_splits [2, 2, 1, 1] # Rank 0 [3, 2, 2, 2] # Rank 1 [2, 1, 1, 1] # Rank 2 [2, 2, 2, 1] # Rank 3 >>> output_splits [2, 3, 2, 2] # Rank 0 [2, 2, 1, 2] # Rank 1 [1, 2, 1, 2] # Rank 2 [1, 2, 1, 1] # Rank 3 >>> output = ... >>> dist.all_to_all_single(output, input, output_splits, input_splits) >>> output tensor([ 0, 1, 10, 11, 12, 20, 21, 30, 31]) # Rank 0 tensor([ 2, 3, 13, 14, 22, 32, 33]) # Rank 1 tensor([ 4, 15, 16, 23, 34, 35]) # Rank 2 tensor([ 5, 17, 18, 24, 36]) # Rank 3
>>> # Another example with tensors of torch.cfloat type. >>> input = torch.tensor([1+1j, 2+2j, 3+3j, 4+4j], dtype=torch.cfloat) + 4 * rank * (1+1j) >>> input tensor([1+1j, 2+2j, 3+3j, 4+4j]) # Rank 0 tensor([5+5j, 6+6j, 7+7j, 8+8j]) # Rank 1 tensor([9+9j, 10+10j, 11+11j, 12+12j]) # Rank 2 tensor([13+13j, 14+14j, 15+15j, 16+16j]) # Rank 3 >>> output = torch.empty([4], dtype=torch.int64) >>> dist.all_to_all_single(output, input) >>> output tensor([1+1j, 5+5j, 9+9j, 13+13j]) # Rank 0 tensor([2+2j, 6+6j, 10+10j, 14+14j]) # Rank 1 tensor([3+3j, 7+7j, 11+11j, 15+15j]) # Rank 2 tensor([4+4j, 8+8j, 12+12j, 16+16j]) # Rank 3
- torch.distributed.all_to_all(output_tensor_list, input_tensor_list, group=None, async_op=False)[source][source]¶
將輸入 tensors 列表分散到群組中的所有進程,並在輸出列表中回傳收集的 tensors 列表。
支援複數張量。
- 參數
- Returns
如果 async_op 設定為 True,則為非同步工作控制代碼 (Async work handle)。 如果不是 async_op 或不屬於群組,則為 None。
警告
all_to_all 是實驗性的,並且可能會變更。
範例
>>> input = torch.arange(4) + rank * 4 >>> input = list(input.chunk(4)) >>> input [tensor([0]), tensor([1]), tensor([2]), tensor([3])] # Rank 0 [tensor([4]), tensor([5]), tensor([6]), tensor([7])] # Rank 1 [tensor([8]), tensor([9]), tensor([10]), tensor([11])] # Rank 2 [tensor([12]), tensor([13]), tensor([14]), tensor([15])] # Rank 3 >>> output = list(torch.empty([4], dtype=torch.int64).chunk(4)) >>> dist.all_to_all(output, input) >>> output [tensor([0]), tensor([4]), tensor([8]), tensor([12])] # Rank 0 [tensor([1]), tensor([5]), tensor([9]), tensor([13])] # Rank 1 [tensor([2]), tensor([6]), tensor([10]), tensor([14])] # Rank 2 [tensor([3]), tensor([7]), tensor([11]), tensor([15])] # Rank 3
>>> # Essentially, it is similar to following operation: >>> scatter_list = input >>> gather_list = output >>> for i in range(world_size): >>> dist.scatter(gather_list[i], scatter_list if i == rank else [], src=i)
>>> input tensor([0, 1, 2, 3, 4, 5]) # Rank 0 tensor([10, 11, 12, 13, 14, 15, 16, 17, 18]) # Rank 1 tensor([20, 21, 22, 23, 24]) # Rank 2 tensor([30, 31, 32, 33, 34, 35, 36]) # Rank 3 >>> input_splits [2, 2, 1, 1] # Rank 0 [3, 2, 2, 2] # Rank 1 [2, 1, 1, 1] # Rank 2 [2, 2, 2, 1] # Rank 3 >>> output_splits [2, 3, 2, 2] # Rank 0 [2, 2, 1, 2] # Rank 1 [1, 2, 1, 2] # Rank 2 [1, 2, 1, 1] # Rank 3 >>> input = list(input.split(input_splits)) >>> input [tensor([0, 1]), tensor([2, 3]), tensor([4]), tensor([5])] # Rank 0 [tensor([10, 11, 12]), tensor([13, 14]), tensor([15, 16]), tensor([17, 18])] # Rank 1 [tensor([20, 21]), tensor([22]), tensor([23]), tensor([24])] # Rank 2 [tensor([30, 31]), tensor([32, 33]), tensor([34, 35]), tensor([36])] # Rank 3 >>> output = ... >>> dist.all_to_all(output, input) >>> output [tensor([0, 1]), tensor([10, 11, 12]), tensor([20, 21]), tensor([30, 31])] # Rank 0 [tensor([2, 3]), tensor([13, 14]), tensor([22]), tensor([32, 33])] # Rank 1 [tensor([4]), tensor([15, 16]), tensor([23]), tensor([34, 35])] # Rank 2 [tensor([5]), tensor([17, 18]), tensor([24]), tensor([36])] # Rank 3
>>> # Another example with tensors of torch.cfloat type. >>> input = torch.tensor([1+1j, 2+2j, 3+3j, 4+4j], dtype=torch.cfloat) + 4 * rank * (1+1j) >>> input = list(input.chunk(4)) >>> input [tensor([1+1j]), tensor([2+2j]), tensor([3+3j]), tensor([4+4j])] # Rank 0 [tensor([5+5j]), tensor([6+6j]), tensor([7+7j]), tensor([8+8j])] # Rank 1 [tensor([9+9j]), tensor([10+10j]), tensor([11+11j]), tensor([12+12j])] # Rank 2 [tensor([13+13j]), tensor([14+14j]), tensor([15+15j]), tensor([16+16j])] # Rank 3 >>> output = list(torch.empty([4], dtype=torch.int64).chunk(4)) >>> dist.all_to_all(output, input) >>> output [tensor([1+1j]), tensor([5+5j]), tensor([9+9j]), tensor([13+13j])] # Rank 0 [tensor([2+2j]), tensor([6+6j]), tensor([10+10j]), tensor([14+14j])] # Rank 1 [tensor([3+3j]), tensor([7+7j]), tensor([11+11j]), tensor([15+15j])] # Rank 2 [tensor([4+4j]), tensor([8+8j]), tensor([12+12j]), tensor([16+16j])] # Rank 3
- torch.distributed.barrier(group=None, async_op=False, device_ids=None)[source][source]¶
同步所有進程。
如果 async_op 為 False,或者如果在 wait() 上呼叫了非同步工作控制代碼,則此 collective 會阻塞進程,直到整個群組進入此函式。
- 參數
group (ProcessGroup, optional) – 要處理的程序群組。如果為 None,將使用預設程序群組。
async_op (bool, optional) – 這個運算是否應為非同步運算
device_ids ([int], optional) – 設備/GPU ID 列表。
- Returns
如果 async_op 設為 True,則為非同步工作控制代碼。如果不是 async_op 或不屬於群組,則為 None
注意
ProcessGroupNCCL 現在會阻塞 CPU 執行緒,直到 barrier collective 完成。
- torch.distributed.monitored_barrier(group=None, timeout=None, wait_all_ranks=False)[source][source]¶
同步進程,類似於
torch.distributed.barrier
,但會考慮可配置的逾時。它能夠回報在提供的逾時時間內,未通過此障礙點的 rank。具體來說,對於非零 rank,將會阻塞直到從 rank 0 處理了 send/recv。Rank 0 將會阻塞直到所有來自其他 rank 的 send/recv 被處理完畢,並且會回報在時間內未回應的 rank 的失敗情況。請注意,如果一個 rank 沒有到達 monitored_barrier (例如由於死鎖),則所有其他 rank 都會在 monitored_barrier 中失敗。
這個集合操作會阻塞群組中的所有進程/rank,直到整個群組成功退出該函數,使其對於除錯和同步非常有用。然而,它可能會對效能產生影響,應僅用於除錯或需要主機端完全同步點的情況。為了除錯的目的,可以在應用程式的集合呼叫之前插入此障礙點,以檢查是否有任何 rank 不同步。
注意
請注意,此集合操作僅支援 GLOO 後端。
- 參數
group (ProcessGroup, optional) – 要使用的進程群組。如果
None
,則會使用預設的進程群組。timeout (datetime.timedelta, optional) – monitored_barrier 的逾時時間。如果
None
,則會使用預設的進程群組逾時時間。wait_all_ranks (bool, optional) – 是否收集所有失敗的 rank。預設情況下,這是
False
,並且 rank 0 上的monitored_barrier
將會在遇到第一個失敗的 rank 時拋出錯誤,以便快速失敗。透過設定wait_all_ranks=True
,monitored_barrier
將會收集所有失敗的 rank,並拋出包含所有失敗 rank 資訊的錯誤。
- Returns
None
.
- Example:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> if dist.get_rank() != 1: >>> dist.monitored_barrier() # Raises exception indicating that >>> # rank 1 did not call into monitored_barrier. >>> # Example with wait_all_ranks=True >>> if dist.get_rank() == 0: >>> dist.monitored_barrier(wait_all_ranks=True) # Raises exception >>> # indicating that ranks 1, 2, ... world_size - 1 did not call into >>> # monitored_barrier.
- class torch.distributed.Work¶
Work 物件代表 PyTorch 分散式套件中待處理的非同步操作的控制代碼。 它由非阻塞集合操作返回,例如 dist.all_reduce(tensor, async_op=True)。
- exception(self: torch._C._distributed_c10d.Work) std::__exception_ptr::exception_ptr ¶
- get_future(self: torch._C._distributed_c10d.Work) torch.Future ¶
- Returns
一個與
Work
完成相關聯的torch.futures.Future
物件。例如,可以透過fut = process_group.allreduce(tensors).get_future()
檢索 future 物件。
- Example:
以下是一個簡單的 allreduce DDP 通訊 hook 的範例,該範例使用
get_future` API 來檢索與 ``allreduce``
完成相關聯的 Future。>>> def allreduce(process_group: dist.ProcessGroup, bucket: dist.GradBucket): -> torch.futures.Future >>> group_to_use = process_group if process_group is not None else torch.distributed.group.WORLD >>> tensor = bucket.buffer().div_(group_to_use.size()) >>> return torch.distributed.all_reduce(tensor, group=group_to_use, async_op=True).get_future() >>> ddp_model.register_comm_hook(state=None, hook=allreduce)
警告
get_future
API 支援 NCCL,以及部分 GLOO 和 MPI 後端(不支援點對點操作,例如 send/recv),並將傳回torch.futures.Future
。在上面的範例中,
allreduce
工作將使用 NCCL 後端在 GPU 上完成,fut.wait()
將在將適當的 NCCL streams 與 PyTorch 的當前裝置 streams 同步後傳回,以確保我們可以進行非同步 CUDA 執行,並且它不會等待整個操作在 GPU 上完成。請注意,CUDAFuture
不支援TORCH_NCCL_BLOCKING_WAIT
標誌或 NCCL 的barrier()
。此外,如果透過fut.then()
添加了回呼函式,它將等待直到WorkNCCL
的 NCCL streams 與ProcessGroupNCCL
的專用回呼 stream 同步,並在回呼 stream 上執行回呼後,內聯呼叫回呼。fut.then()
將傳回另一個CUDAFuture
,它持有回呼的傳回值和一個記錄回呼 stream 的CUDAEvent
。對於 CPU 工作,當工作完成且 value() tensors 準備就緒時,
fut.done()
傳回 true。對於 GPU 工作,僅當操作已排隊時,
fut.done()
才會傳回 true。對於混合 CPU-GPU 工作(例如,使用 GLOO 發送 GPU tensors),當 tensors 已到達各自的節點時,
fut.done()
傳回 true,但不一定在各自的 GPU 上同步(與 GPU 工作類似)。
- get_future_result(self: torch._C._distributed_c10d.Work) torch.Future ¶
- Returns
一個 int 类型的
torch.futures.Future
物件,它映射到 WorkResult 的枚舉类型。 例如,可以透過fut = process_group.allreduce(tensor).get_future_result()
檢索 future 物件。
- Example:
使用者可以使用
fut.wait()
進行阻擋式等待,直到工作完成,並透過fut.value()
取得 WorkResult。此外,使用者可以使用fut.then(call_back_func)
註冊一個回呼函式,在工作完成時呼叫,而不會阻擋目前的執行緒。
警告
get_future_result
API 支援 NCCL
- result(self: torch._C._distributed_c10d.Work) list[torch.Tensor] ¶
- wait(self: torch._C._distributed_c10d.Work, timeout: datetime.timedelta = datetime.timedelta(0)) bool ¶
- Returns
true/false (真/假)。
- Example:
- try (嘗試)
work.wait(timeout)
- except (例外)
# 一些處理
警告
在正常情況下,使用者不需要設定 timeout。呼叫 wait() 與呼叫 synchronize() 相同:讓目前的 stream 阻擋直到 NCCL 工作完成。然而,如果設定了 timeout,它會阻擋 CPU 執行緒直到 NCCL 工作完成或超時。如果超時,將會拋出例外。
- class torch.distributed.ReduceOp¶
一個類似 enum 的類別,用於可用的歸約運算:
SUM
、PRODUCT
、MIN
、MAX
、BAND
、BOR
、BXOR
和PREMUL_SUM
。使用
NCCL
後端時,BAND
、BOR
和BXOR
歸約不可用。AVG
會在跨 ranks 進行加總之前,將值除以 world size。AVG
僅在使用NCCL
後端時可用,並且僅適用於 NCCL 2.10 或更高版本。PREMUL_SUM
在歸約之前,會先將輸入值與給定的純量 (scalar) 在本地相乘。PREMUL_SUM
僅在使用NCCL
後端時可用,並且僅適用於 NCCL 2.11 或更高版本。使用者應該使用torch.distributed._make_nccl_premul_sum
。此外,
MAX
、MIN
和PRODUCT
不支援複數張量 (complex tensors)。此類別的值可以作為屬性存取,例如,
ReduceOp.SUM
。它們用於指定歸約集合體的策略,例如,reduce()
。此類別不支援
__members__
屬性。
分散式鍵-值儲存¶
分散式套件配有一個分散式鍵-值儲存,可用於在群組中的程序之間共享資訊,以及在 torch.distributed.init_process_group()
中初始化分散式套件 (透過顯式建立儲存作為指定 init_method
的替代方案)。鍵-值儲存有 3 種選擇:TCPStore
、FileStore
和 HashStore
。
- class torch.distributed.Store¶
所有儲存實作的基底類別,例如 PyTorch 分散式提供的 3 個類別:(
TCPStore
、FileStore
和HashStore
)。- add(self: torch._C._distributed_c10d.Store, arg0: str, arg1: int) int ¶
第一次對給定的
key
呼叫 add 會在儲存中建立與key
相關聯的計數器,並初始化為amount
。後續使用相同的key
呼叫 add 會將計數器增加指定的amount
。使用已經通過set()
在儲存中設定的鍵呼叫add()
將導致例外。- Example:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.add("first_key", 1) >>> store.add("first_key", 6) >>> # Should return 7 >>> store.get("first_key")
- append(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str) None ¶
根據提供的
key
和value
將鍵-值對附加到儲存中。如果key
在儲存中不存在,則將建立它。- Example:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.append("first_key", "po") >>> store.append("first_key", "tato") >>> # Should return "potato" >>> store.get("first_key")
- check(self: torch._C._distributed_c10d.Store, arg0: list[str]) bool ¶
呼叫 check 以檢查給定的
keys
列表是否在儲存中儲存了值。在正常情況下,此呼叫會立即返回,但仍然會遇到一些邊緣死鎖情況,例如在 TCPStore 銷毀後呼叫 check。 使用要檢查是否儲存在儲存中的金鑰列表呼叫check()
。- 參數
keys (lisr[str]) – 要查詢是否儲存在儲存區中的鍵 (keys) 清單。
- Example:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.add("first_key", 1) >>> # Should return 7 >>> store.check(["first_key"])
- compare_set(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str, arg2: str) bytes ¶
根據提供的
key
將鍵值對插入儲存區,並在插入之前執行expected_value
和desired_value
之間的比較。只有當key
的expected_value
已經存在於儲存區中,或者expected_value
是空字串時,才會設定desired_value
。- 參數
- Example:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("key", "first_value") >>> store.compare_set("key", "first_value", "second_value") >>> # Should return "second_value" >>> store.get("key")
- delete_key(self: torch._C._distributed_c10d.Store, arg0: str) bool ¶
從儲存區中刪除與
key
相關聯的鍵值對。如果成功刪除該鍵,則傳回 true,否則傳回 false。- 參數
key (str) – 要從儲存區中刪除的鍵
- Returns
如果已刪除
key
,則為 True,否則為 False。
- Example:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, HashStore can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key") >>> # This should return true >>> store.delete_key("first_key") >>> # This should return false >>> store.delete_key("bad_key")
- get(self: torch._C._distributed_c10d.Store, arg0: str) bytes ¶
檢索儲存區中與給定的
key
相關聯的值。如果key
不存在於儲存區中,該函數將等待timeout
(在初始化儲存區時定義),然後拋出例外。- 參數
key (str) – 該函數將傳回與此鍵相關聯的值。
- Returns
如果
key
在儲存區中,則傳回與key
相關聯的值。
- Example:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # Should return "first_value" >>> store.get("first_key")
- multi_get(self: torch._C._distributed_c10d.Store, arg0: list[str]) list[bytes] ¶
檢索
keys
中的所有值。如果keys
中的任何鍵不存在於儲存區中,則該函數將等待timeout
。- 參數
keys (List[str]) – 要從儲存區檢索的鍵。
- Example:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "po") >>> store.set("second_key", "tato") >>> # Should return [b"po", b"tato"] >>> store.multi_get(["first_key", "second_key"])
- multi_set(self: torch._C._distributed_c10d.Store, arg0: list[str], arg1: list[str]) None ¶
根據提供的
keys
和values
,將鍵值對列表插入到儲存區中。- Example:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.multi_set(["first_key", "second_key"], ["po", "tato"]) >>> # Should return b"po" >>> store.get("first_key")
- num_keys(self: torch._C._distributed_c10d.Store) int ¶
傳回儲存區中設定的鍵的數量。請注意,此數字通常比
set()
和add()
新增的鍵的數量大 1,因為有一個鍵用於協調使用儲存區的所有工作人員。警告
當與
TCPStore
一起使用時,num_keys
傳回寫入底層檔案的鍵的數量。如果儲存區被銷毀,並且使用相同檔案建立了另一個儲存區,則原始鍵將被保留。- Returns
儲存區中存在的鍵的數量。
- Example:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # This should return 2 >>> store.num_keys()
- set(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str) None ¶
根據提供的
key
和value
,將鍵值對插入到儲存區中。如果key
已經存在於儲存區中,它將使用新提供的value
覆寫舊值。- Example:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # Should return "first_value" >>> store.get("first_key")
- set_timeout(self: torch._C._distributed_c10d.Store, arg0: datetime.timedelta) None ¶
設定儲存區的預設逾時。此逾時用於初始化,以及
wait()
和get()
中。- 參數
timeout (timedelta) – 要在儲存區中設定的逾時。
- Example:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set_timeout(timedelta(seconds=10)) >>> # This will throw an exception after 10 seconds >>> store.wait(["bad_key"])
- property timeout¶
取得儲存區的逾時。
- wait(*args, **kwargs)¶
重載函數。
wait(self: torch._C._distributed_c10d.Store, arg0: list[str]) -> None
等待
keys
中的每個鍵被新增到儲存區。如果不是所有鍵都在timeout
(在儲存區初始化期間設定) 之前設定,則wait
將拋出例外。- 參數
keys (list) – 等待在 store 中設定的鍵列表。
- Example:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> # This will throw an exception after 30 seconds >>> store.wait(["bad_key"])
wait(self: torch._C._distributed_c10d.Store, arg0: list[str], arg1: datetime.timedelta) -> None
等待
keys
中的每個鍵被添加到 store,如果鍵在提供的timeout
時間內尚未設定,則拋出例外。- 參數
keys (list) – 等待在 store 中設定的鍵列表。
timeout (timedelta) – 在拋出例外之前,等待鍵被添加的時間長度。
- Example:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> # This will throw an exception after 10 seconds >>> store.wait(["bad_key"], timedelta(seconds=10))
- class torch.distributed.TCPStore¶
基於 TCP 的分散式鍵值儲存實現。伺服器儲存保留資料,而客戶端儲存可以通過 TCP 連接到伺服器儲存並執行諸如
set()
以插入鍵值對,get()
以檢索鍵值對等操作。 應該始終初始化一個伺服器儲存,因為客戶端儲存將等待伺服器建立連線。- 參數
host_name (str) – 伺服器儲存應執行的主機名稱或 IP 位址。
port (int) – 伺服器儲存應監聽傳入請求的連接埠。
world_size (int, optional) – 儲存使用者的總數(客戶端數量 + 伺服器數量 1)。預設值為 None(None 表示儲存使用者的數量不固定)。
is_master (bool, optional) – 初始化伺服器儲存時為 True,客戶端儲存時為 False。預設值為 False。
timeout (timedelta, optional) – 儲存體在初始化期間以及諸如
get()
和wait()
等方法中使用的逾時。預設值為 timedelta(seconds=300)wait_for_workers (bool, optional) – 是否等待所有 worker 連接到伺服器儲存。這僅在 world_size 是固定值時適用。預設值為 True。
multi_tenant (bool, optional) – 如果為 True,則當前進程中具有相同主機/連接埠的所有
TCPStore
實例將使用相同的底層TCPServer
。預設值為 False。master_listen_fd (int, optional) – 如果指定,則底層
TCPServer
將監聽此檔案描述符,該描述符必須是一個已經繫結到port
的 socket。在某些情況下,這對於避免連接埠分配競爭很有用。預設值為 None(表示伺服器建立一個新的 socket 並嘗試將其繫結到port
)。use_libuv (bool, optional) – 如果為 True,則使用 libuv 作為
TCPServer
後端。預設值為 True。
- Example:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Run on process 1 (server) >>> server_store = dist.TCPStore("127.0.0.1", 1234, 2, True, timedelta(seconds=30)) >>> # Run on process 2 (client) >>> client_store = dist.TCPStore("127.0.0.1", 1234, 2, False) >>> # Use any of the store methods from either the client or server after initialization >>> server_store.set("first_key", "first_value") >>> client_store.get("first_key")
- __init__(self: torch._C._distributed_c10d.TCPStore, host_name: str, port: int, world_size: Optional[int] = None, is_master: bool = False, timeout: datetime.timedelta = datetime.timedelta(seconds=300), wait_for_workers: bool = True, multi_tenant: bool = False, master_listen_fd: Optional[int] = None, use_libuv: bool = True) None ¶
建立新的 TCPStore。
- property host¶
取得儲存監聽請求的主機名稱。
- property libuvBackend¶
如果使用 libuv 後端,則傳回 True。
- property port¶
取得 store 監聽請求的連接埠號碼。
- class torch.distributed.HashStore¶
基於底層雜湊表 (hashmap) 的執行緒安全 (thread-safe) store 實作。這個 store 可以在同一個程序 (例如,由其他執行緒) 中使用,但不能跨程序使用。
- Example:
>>> import torch.distributed as dist >>> store = dist.HashStore() >>> # store can be used from other threads >>> # Use any of the store methods after initialization >>> store.set("first_key", "first_value")
- __init__(self: torch._C._distributed_c10d.HashStore) None ¶
建立一個新的 HashStore。
- class torch.distributed.FileStore¶
一個 store 實作,使用檔案來儲存底層的鍵值對 (key-value pairs)。
- 參數
- Example:
>>> import torch.distributed as dist >>> store1 = dist.FileStore("/tmp/filestore", 2) >>> store2 = dist.FileStore("/tmp/filestore", 2) >>> # Use any of the store methods from either the client or server after initialization >>> store1.set("first_key", "first_value") >>> store2.get("first_key")
- __init__(self: torch._C._distributed_c10d.FileStore, file_name: str, world_size: int = -1) None ¶
建立一個新的 FileStore。
- property path¶
取得 FileStore 用來儲存鍵值對的檔案路徑。
- class torch.distributed.PrefixStore¶
任何 3 種鍵值 store (key-value store) 的封裝器 (
TCPStore
、FileStore
和HashStore
),它會將前綴新增到插入 store 的每個鍵。- 參數
prefix (str) – 前綴字串,在插入 store 之前會附加到每個鍵上。
store (torch.distributed.store) – 構成底層鍵值 store 的 store 物件。
- __init__(self: torch._C._distributed_c10d.PrefixStore, prefix: str, store: torch._C._distributed_c10d.Store) None ¶
建立一個新的 PrefixStore。
- property underlying_store¶
取得 PrefixStore 封裝的底層 store 物件。
分析集合式通訊 (Profiling Collective Communication)¶
請注意,您可以使用 torch.profiler
(推薦,僅在 1.8.1 之後版本可用) 或 torch.autograd.profiler
來分析此處提到的集合式通訊和點對點通訊 API。所有現成的後端 (gloo
、nccl
、mpi
) 都受到支援,並且集合式通訊的使用情況將如預期在分析輸出/追蹤中呈現。分析您的程式碼與分析任何常規的 torch 運算符號相同。
import torch
import torch.distributed as dist
with torch.profiler():
tensor = torch.randn(20, 10)
dist.all_reduce(tensor)
有關分析器功能的完整概述,請參閱分析器文件。
多 GPU 集合式函數 (Multi-GPU collective functions)¶
警告
多 GPU 函數(代表每個 CPU 執行緒多個 GPU)已被棄用。 截至今日,PyTorch Distributed 的首選編程模型是每個執行緒一個裝置,如此文件中 API 所例示。 如果您是後端開發人員並且想要支援每個執行緒多個裝置,請聯絡 PyTorch Distributed 的維護人員。
第三方後端 (Third-party backends)¶
除了內建的 GLOO/MPI/NCCL 後端之外,PyTorch distributed 還透過執行階段註冊機制支援第三方後端。 有關如何透過 C++ 擴充功能開發第三方後端的參考資訊,請參閱教學課程 - 自訂 C++ 和 CUDA 擴充功能和 test/cpp_extensions/cpp_c10d_extension.cpp
。 第三方後端的功能由其自身的實作決定。
新的後端衍生自 c10d::ProcessGroup
,並在匯入時透過 torch.distributed.Backend.register_backend()
註冊後端名稱和實例化介面。
當手動匯入此後端並使用對應的後端名稱呼叫 torch.distributed.init_process_group()
時,torch.distributed
套件會在新的後端上執行。
警告
對第三方後端的支援是實驗性的,可能會有所變動。
啟動工具¶
torch.distributed 套件也在 torch.distributed.launch 中提供了一個啟動工具。這個輔助工具可用於為分散式訓練啟動每個節點的多個進程。
模組 torch.distributed.launch
。
torch.distributed.launch
是一個模組,用於在每個訓練節點上產生多個分散式訓練進程。
警告
此模組將被棄用,取而代之的是 torchrun。
該工具可用於單節點分散式訓練,其中將產生每個節點一個或多個進程。該工具可用於 CPU 訓練或 GPU 訓練。 如果該工具用於 GPU 訓練,則每個分散式進程將在單個 GPU 上運行。這可以實現顯著改善的單節點訓練效能。它也可以用於多節點分散式訓練,方法是在每個節點上產生多個進程,以顯著改善多節點分散式訓練效能。這對於具有直接 GPU 支援的多個 Infiniband 介面的系統尤其有利,因為所有介面都可以用於聚合的通訊頻寬。
無論是單節點分散式訓練還是多節點分散式訓練,此工具都將啟動每個節點指定數量的進程 (--nproc-per-node
)。如果用於 GPU 訓練,則此數字需要小於或等於目前系統上的 GPU 數量 (nproc_per_node
),並且每個進程將在單個 GPU 上運行,從 *GPU 0 到 GPU (nproc_per_node - 1)*。
如何使用此模組
單節點多進程分散式訓練
python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 and all other
arguments of your training script)
多節點多進程分散式訓練: (例如,兩個節點)
節點 1: *(IP: 192.168.1.1,並具有一個可用連接埠: 1234)*
python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
--nnodes=2 --node-rank=0 --master-addr="192.168.1.1"
--master-port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
and all other arguments of your training script)
節點 2
python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
--nnodes=2 --node-rank=1 --master-addr="192.168.1.1"
--master-port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
and all other arguments of your training script)
要查詢此模組提供的可選參數
python -m torch.distributed.launch --help
重要注意事項
1. 此工具和多進程分散式(單節點或多節點)GPU 訓練目前僅在使用 NCCL 分散式後端時才能獲得最佳效能。 因此,建議使用 NCCL 後端進行 GPU 訓練。
2. 在您的訓練程式中,您必須解析命令列引數: --local-rank=LOCAL_PROCESS_RANK
,它將由該模組提供。如果您的訓練程式使用 GPU,則應確保您的程式碼僅在 LOCAL_PROCESS_RANK 的 GPU 裝置上執行。這可以透過以下方式完成
解析 local_rank 引數
>>> import argparse
>>> parser = argparse.ArgumentParser()
>>> parser.add_argument("--local-rank", "--local_rank", type=int)
>>> args = parser.parse_args()
使用以下任一方法將您的裝置設定為本機排序
>>> torch.cuda.set_device(args.local_rank) # before your code runs
或
>>> with torch.cuda.device(args.local_rank):
>>> # your code to run
>>> ...
在 2.0.0 版本中變更:啟動器將 --local-rank=<rank>
引數傳遞給您的腳本。 從 PyTorch 2.0.0 開始,帶破折號的 --local-rank
優先於先前使用的帶底線的 --local_rank
。
為了向後相容性,使用者可能需要在其引數解析程式碼中處理這兩種情況。這意味著在引數解析器中同時包含 "--local-rank"
和 "--local_rank"
。如果僅提供 "--local_rank"
,則啟動器將觸發錯誤:「error: unrecognized arguments: –local-rank=<rank>」。對於僅支援 PyTorch 2.0.0+ 的訓練程式碼,包含 "--local-rank"
應該就足夠了。
3. 在您的訓練程式中,您應該在開始時呼叫以下函式來啟動分散式後端。強烈建議使用 init_method=env://
。其他初始化方法 (例如 tcp://
) 可能有效,但 env://
是此模組正式支援的方法。
>>> torch.distributed.init_process_group(backend='YOUR BACKEND',
>>> init_method='env://')
4. 在您的訓練程式中,您可以使用常規的分散式函式或使用 torch.nn.parallel.DistributedDataParallel()
模組。如果您的訓練程式使用 GPU 進行訓練,並且您想使用 torch.nn.parallel.DistributedDataParallel()
模組,以下是如何配置它。
>>> model = torch.nn.parallel.DistributedDataParallel(model,
>>> device_ids=[args.local_rank],
>>> output_device=args.local_rank)
請確保將 device_ids
引數設定為您的程式碼將在其上運作的唯一 GPU 裝置 ID。 這通常是該進程的本機排名。換句話說,為了使用此工具,device_ids
需要是 [args.local_rank]
,而 output_device
需要是 args.local_rank
5. 另一種透過環境變數 LOCAL_RANK
將 local_rank
傳遞給子進程的方法。 當您使用 --use-env=True
啟動腳本時,此行為已啟用。 您必須調整上面的子進程範例,以將 args.local_rank
替換為 os.environ['LOCAL_RANK']
; 當您指定此標誌時,啟動器將不會傳遞 --local-rank
。
警告
local_rank
不是全域唯一的:它僅在機器上的每個進程上是唯一的。 因此,不要使用它來決定您是否應該寫入網路檔案系統等。 有關如果未正確執行此操作可能發生的錯誤範例,請參閱 https://github.com/pytorch/pytorch/issues/12042。
產生工具¶
Multiprocessing package - torch.multiprocessing 套件也提供了一個 spawn
函式,位於 torch.multiprocessing.spawn()
。這個輔助函式可用於產生多個進程。它的運作方式是傳入您想要執行的函式,並產生 N 個進程來執行它。這也可用於多進程分散式訓練。
如需有關如何使用它的參考資訊,請參閱 PyTorch 範例 - ImageNet 實作。
請注意,此函式需要 Python 3.4 或更高版本。
除錯 torch.distributed
應用程式¶
由於難以理解的掛起、崩潰或跨 ranks 的不一致行為,除錯分散式應用程式可能具有挑戰性。torch.distributed
提供了一套工具,以自助方式幫助除錯訓練應用程式。
Python 中斷點¶
在分散式環境中使用 Python 的除錯器非常方便,但由於它無法直接使用,許多人根本不使用它。 PyTorch 提供了一個圍繞 pdb 的客製化 wrapper,簡化了該流程。
torch.distributed.breakpoint 使這個過程變得容易。在內部,它以兩種方式客製化 pdb 的中斷點行為,但在其他方面表現為正常的 pdb。 1. 僅在一個 rank(由使用者指定)上附加除錯器。 2. 透過使用 torch.distributed.barrier() 確保所有其他 ranks 停止,一旦除錯的 rank 發出 continue,它將釋放。 3. 重新導向來自子進程的 stdin,使其連接到您的終端。
若要使用它,只需在所有 ranks 上發出 torch.distributed.breakpoint(rank),在每種情況下都使用相同的值作為 rank。
監控的屏障 (Monitored Barrier)¶
從 v1.10 開始,torch.distributed.monitored_barrier()
作為 torch.distributed.barrier()
的替代方案存在,當崩潰時,它會提供有關哪個 rank 可能有故障的有用資訊,也就是說,並非所有 ranks 都在提供的逾時時間內呼叫 torch.distributed.monitored_barrier()
。 torch.distributed.monitored_barrier()
使用類似於確認的進程中的 send
/recv
通訊原語來實作主機端屏障,允許 rank 0 報告哪個或哪些 rank 未及時確認屏障。 作為範例,請考慮以下函式,其中 rank 1 未呼叫 torch.distributed.monitored_barrier()
(實際上,這可能是由於應用程式錯誤或先前集合中的掛起)。
import os
from datetime import timedelta
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def worker(rank):
dist.init_process_group("nccl", rank=rank, world_size=2)
# monitored barrier requires gloo process group to perform host-side sync.
group_gloo = dist.new_group(backend="gloo")
if rank not in [1]:
dist.monitored_barrier(group=group_gloo, timeout=timedelta(seconds=2))
if __name__ == "__main__":
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29501"
mp.spawn(worker, nprocs=2, args=())
以下錯誤訊息在 rank 0 上產生,允許使用者確定哪個或哪些 rank 可能有故障並進一步調查。
RuntimeError: Rank 1 failed to pass monitoredBarrier in 2000 ms
Original exception:
[gloo/transport/tcp/pair.cc:598] Connection closed by peer [2401:db00:eef0:1100:3560:0:1c05:25d]:8594
TORCH_DISTRIBUTED_DEBUG
¶
透過 TORCH_CPP_LOG_LEVEL=INFO
,可以使用環境變數 TORCH_DISTRIBUTED_DEBUG
來觸發額外的有用記錄和集合同步檢查,以確保所有 ranks 都適當地同步。 TORCH_DISTRIBUTED_DEBUG
可以設定為 OFF
(預設)、INFO
或 DETAIL
,具體取決於所需除錯的層級。 請注意,最詳細的選項 DETAIL
可能會影響應用程式效能,因此應僅在除錯問題時使用。
設定 TORCH_DISTRIBUTED_DEBUG=INFO
將在使用 torch.nn.parallel.DistributedDataParallel()
訓練的模型初始化時,產生額外的除錯記錄,而 TORCH_DISTRIBUTED_DEBUG=DETAIL
將額外記錄運行時效能統計資訊選擇的迭代次數。 這些運行時統計資訊包括正向傳播時間、反向傳播時間、梯度通訊時間等資料。 作為範例,假設有以下應用程式
import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
class TwoLinLayerNet(torch.nn.Module):
def __init__(self):
super().__init__()
self.a = torch.nn.Linear(10, 10, bias=False)
self.b = torch.nn.Linear(10, 1, bias=False)
def forward(self, x):
a = self.a(x)
b = self.b(x)
return (a, b)
def worker(rank):
dist.init_process_group("nccl", rank=rank, world_size=2)
torch.cuda.set_device(rank)
print("init model")
model = TwoLinLayerNet().cuda()
print("init ddp")
ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank])
inp = torch.randn(10, 10).cuda()
print("train")
for _ in range(20):
output = ddp_model(inp)
loss = output[0] + output[1]
loss.sum().backward()
if __name__ == "__main__":
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29501"
os.environ["TORCH_CPP_LOG_LEVEL"]="INFO"
os.environ[
"TORCH_DISTRIBUTED_DEBUG"
] = "DETAIL" # set to DETAIL for runtime logging.
mp.spawn(worker, nprocs=2, args=())
以下記錄在初始化時呈現
I0607 16:10:35.739390 515217 logger.cpp:173] [Rank 0]: DDP Initialized with:
broadcast_buffers: 1
bucket_cap_bytes: 26214400
find_unused_parameters: 0
gradient_as_bucket_view: 0
is_multi_device_module: 0
iteration: 0
num_parameter_tensors: 2
output_device: 0
rank: 0
total_parameter_size_bytes: 440
world_size: 2
backend_name: nccl
bucket_sizes: 440
cuda_visible_devices: N/A
device_ids: 0
dtypes: float
master_addr: localhost
master_port: 29501
module_name: TwoLinLayerNet
nccl_async_error_handling: N/A
nccl_blocking_wait: N/A
nccl_debug: WARN
nccl_ib_timeout: N/A
nccl_nthreads: N/A
nccl_socket_ifname: N/A
torch_distributed_debug: INFO
以下記錄在運行時呈現(當設定 TORCH_DISTRIBUTED_DEBUG=DETAIL
時)
I0607 16:18:58.085681 544067 logger.cpp:344] [Rank 1 / 2] Training TwoLinLayerNet unused_parameter_size=0
Avg forward compute time: 40838608
Avg backward compute time: 5983335
Avg backward comm. time: 4326421
Avg backward comm/comp overlap time: 4207652
I0607 16:18:58.085693 544066 logger.cpp:344] [Rank 0 / 2] Training TwoLinLayerNet unused_parameter_size=0
Avg forward compute time: 42850427
Avg backward compute time: 3885553
Avg backward comm. time: 2357981
Avg backward comm/comp overlap time: 2234674
此外,TORCH_DISTRIBUTED_DEBUG=INFO
會增強 torch.nn.parallel.DistributedDataParallel()
中因模型中未使用的參數所造成的崩潰記錄。目前,如果前向傳遞中可能存在未使用的參數,則必須將 find_unused_parameters=True
傳遞到 torch.nn.parallel.DistributedDataParallel()
初始化中,且從 v1.10 開始,所有模型輸出都必須用於損失計算,因為 torch.nn.parallel.DistributedDataParallel()
不支援反向傳遞中未使用的參數。這些限制特別對大型模型來說是個挑戰,因此,當因錯誤而崩潰時,torch.nn.parallel.DistributedDataParallel()
將記錄所有未使用的參數的完整名稱。例如,在上述應用程式中,如果我們將 loss
修改為 loss = output[1]
,則 TwoLinLayerNet.a
在反向傳遞中不會收到梯度,因此會導致 DDP
失敗。發生崩潰時,使用者會收到有關未使用參數的資訊,對於大型模型來說,手動找到這些參數可能具有挑戰性。
RuntimeError: Expected to have finished reduction in the prior iteration before starting a new one. This error indicates that your module has parameters that were not used in producing loss. You can enable unused parameter detection by passing
the keyword argument `find_unused_parameters=True` to `torch.nn.parallel.DistributedDataParallel`, and by
making sure all `forward` function outputs participate in calculating loss.
If you already have done the above, then the distributed data parallel module wasn't able to locate the output tensors in the return value of your module's `forward` function. Please include the loss function and the structure of the return va
lue of `forward` of your module when reporting this issue (e.g. list, dict, iterable).
Parameters which did not receive grad for rank 0: a.weight
Parameter indices which did not receive grad for rank 0: 0
設定 TORCH_DISTRIBUTED_DEBUG=DETAIL
將觸發對使用者直接或間接發出的每個集合呼叫(例如 DDP allreduce
)的其他一致性和同步檢查。 這是通過創建一個包裝器進程組來實現的,該包裝器進程組包裝了由 torch.distributed.init_process_group()
和 torch.distributed.new_group()
API 返回的所有進程組。 因此,這些 API 將返回一個包裝器進程組,其使用方式與常規進程組完全相同,但在將集合調度到基礎進程組之前執行一致性檢查。 目前,這些檢查包括一個 torch.distributed.monitored_barrier()
,它確保所有 rank 完成其未完成的集合呼叫,並報告卡住的 rank。 接下來,通過確保所有集合函數匹配並以一致的張量形狀調用來檢查集合本身的一致性。 如果不是這種情況,則在應用崩潰時包含詳細的錯誤報告,而不是掛起或提供無意義的錯誤消息。 例如,考慮以下函數,該函數在 torch.distributed.all_reduce()
中具有不匹配的輸入形狀
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def worker(rank):
dist.init_process_group("nccl", rank=rank, world_size=2)
torch.cuda.set_device(rank)
tensor = torch.randn(10 if rank == 0 else 20).cuda()
dist.all_reduce(tensor)
torch.cuda.synchronize(device=rank)
if __name__ == "__main__":
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29501"
os.environ["TORCH_CPP_LOG_LEVEL"]="INFO"
os.environ["TORCH_DISTRIBUTED_DEBUG"] = "DETAIL"
mp.spawn(worker, nprocs=2, args=())
使用 NCCL
後端,這樣的應用程式很可能會導致掛起,這在非平凡的情況下可能難以找到根本原因。 如果使用者啟用 TORCH_DISTRIBUTED_DEBUG=DETAIL
並重新運行應用程式,則以下錯誤訊息會揭示根本原因
work = default_pg.allreduce([tensor], opts)
RuntimeError: Error when verifying shape tensors for collective ALLREDUCE on rank 0. This likely indicates that input shapes into the collective are mismatched across ranks. Got shapes: 10
[ torch.LongTensor{1} ]
注意
為了在運行時對調試級別進行細粒度控制,還可以使用函數 torch.distributed.set_debug_level()
、torch.distributed.set_debug_level_from_env()
和 torch.distributed.get_debug_level()
。
此外,TORCH_DISTRIBUTED_DEBUG=DETAIL 可以與 TORCH_SHOW_CPP_STACKTRACES=1 結合使用,以在檢測到集合解同步時記錄整個調用堆棧。 這些集合解同步檢查將適用於所有使用 c10d
集合呼叫的應用程式,這些集合呼叫由使用 torch.distributed.init_process_group()
和 torch.distributed.new_group()
API 創建的進程組支持。
日誌記錄¶
除了通過 torch.distributed.monitored_barrier()
和 TORCH_DISTRIBUTED_DEBUG
進行顯式的調試支持外,torch.distributed
的底層 C++ 函式庫也會輸出各種級別的日誌訊息。 這些訊息有助於了解分散式訓練作業的執行狀態,以及對網路連線失敗等問題進行疑難排解。 下面的矩陣顯示了如何通過 TORCH_CPP_LOG_LEVEL
和 TORCH_DISTRIBUTED_DEBUG
環境變數的組合來調整日誌級別。
|
|
有效日誌級別 |
---|---|---|
|
忽略 |
錯誤 |
|
忽略 |
警告 |
|
忽略 |
資訊 |
|
|
Debug |
|
|
追蹤 (又名全部) |
分散式元件引發從 RuntimeError 派生的自定義 Exception 類型
torch.distributed.DistError: 這是所有分散式異常的基本類型。
torch.distributed.DistBackendError: 當發生特定於後端的錯誤時,會引發此異常。 例如,如果使用 NCCL 後端,且使用者嘗試使用 NCCL 函式庫無法使用的 GPU。
torch.distributed.DistNetworkError: 當網路函式庫遇到錯誤時,會引發此異常 (例如:連線由對等方重設)
torch.distributed.DistStoreError: 當 Store 遇到錯誤時,會引發此異常 (例如:TCPStore 超時)
- class torch.distributed.DistError¶
在分散式函式庫中發生錯誤時引發的 Exception
- class torch.distributed.DistBackendError¶
當分散式中發生後端錯誤時引發的 Exception
- class torch.distributed.DistNetworkError¶
當分散式中發生網路錯誤時引發的 Exception
- class torch.distributed.DistStoreError¶
當分散式儲存 (distributed store) 發生錯誤時引發的例外。
如果您正在執行單節點訓練,以互動方式在您的腳本中設置中斷點 (breakpoint) 可能會很方便。 我們提供一種方便地為單一 rank 設置中斷點的方法。