Rendezvous¶
在 Torch 分散式彈性的上下文中,我們使用術語 *rendezvous* 來指稱結合了**分散式同步**原始物件與**對等點探索**的特定功能。
Torch 分散式彈性使用它來收集訓練任務的參與者(即節點),以便它們都同意相同的參與者清單以及每個人的角色,並在訓練何時可以開始/恢復時做出一致的集體決定。
Torch 分散式彈性 rendezvous 提供以下關鍵功能
Barrier:
執行 rendezvous 的節點將全部阻塞,直到 rendezvous 被認為已完成 - 當至少 min
個節點已加入 rendezvous barrier(對於同一任務)時,就會發生這種情況。 這也意味著 barrier 不一定是固定大小的。
在達到 min
個節點數量後,還有一個額外的短暫等待時間 - 這用於確保 rendezvous 不會「太快」完成(這可能會排除在大約同一時間嘗試加入的其他節點)。
如果在 barrier 收集了 max
個節點,則 rendezvous 會立即完成。
此外,還有一個整體逾時設定,如果始終未達到節點的min
數量,則會導致 rendezvous 失敗 - 這是一個簡單的防呆機制,用於在資源管理器出現問題時釋放部分分配的工作資源,並且應被視為不可重試的錯誤。
獨佔性:
一個簡單的分散式 barrier 是不夠的,因為我們還需要確保在任何給定時間(對於給定的工作)只存在一組節點。換句話說,新的節點(即稍後加入的節點)不應該能夠為同一工作形成並行的獨立工作節點群組。
Torch Distributed Elastic rendezvous 確保如果一組節點已經完成 rendezvous(因此可能已經在訓練),那麼嘗試進行 rendezvous 的其他「延遲」節點只會聲明它們正在等待,並且必須等到(先前完成的)現有 rendezvous 首先被銷毀。
一致性:
當 rendezvous 完成時,所有成員都將對工作成員資格以及每個成員在其中的角色達成一致。此角色使用一個整數表示,稱為 rank,介於 0 和 world size 之間。
請注意,rank 是不穩定的,因為同一個節點在下一次(重新)rendezvous 中可能會被分配到不同的 rank。
容錯性:
Torch Distributed Elastic rendezvous 旨在容忍 rendezvous 過程中節點的故障。如果一個進程在加入 rendezvous 和完成 rendezvous 之間崩潰(或失去網路連線等),則會自動與剩餘的健康節點重新進行 rendezvous。
節點也可能在完成 rendezvous 之後(或被其他節點觀察到已完成)發生故障 - 這種情況將由 Torch Distributed Elastic train_loop
處理(並且也將觸發重新 rendezvous)。
共享鍵值儲存:
當 rendezvous 完成時,會建立並傳回一個共享鍵值儲存。此儲存實現了 torch.distributed.Store
API(請參閱分散式通訊文件)。
此儲存僅由已完成 rendezvous 的成員共享。它旨在由 Torch Distributed Elastic 用於交換初始化工作控制和資料平面所需的信息。
等待中的 workers 和 rendezvous 關閉:
Torch Distributed Elastic rendezvous 處理物件提供了額外的功能,這些功能在技術上不屬於 rendezvous 過程的一部分
查詢有多少 workers 延遲到達 barrier,他們可以參與下一次 rendezvous。
設定 rendezvous 為關閉,以向所有節點發出訊號,指示不要參與下一次 rendezvous。
DynamicRendezvousHandler:
Torch Distributed Elastic 包含 DynamicRendezvousHandler
類別,該類別實現了上述 rendezvous 機制。它是一種與後端無關的類型,需要一個特定的 RendezvousBackend
實例在建構期間指定。
Torch distributed 使用者可以實作他們自己的後端類型,或者使用 PyTorch 附帶的以下實作之一
C10dRendezvousBackend
:使用 C10d 儲存(預設為TCPStore
)作為 rendezvous 後端。使用 C10d 儲存的主要優點是它不需要任何第三方依賴項(例如 etcd)來建立 rendezvous。EtcdRendezvousBackend
:取代了舊版的EtcdRendezvousHandler
類別。將EtcdRendezvousBackend
實例傳遞給DynamicRendezvousHandler
在功能上等同於實例化EtcdRendezvousHandler
。store = TCPStore("localhost") backend = C10dRendezvousBackend(store, "my_run_id") rdzv_handler = DynamicRendezvousHandler.from_backend( run_id="my_run_id", store=store, backend=backend, min_nodes=2, max_nodes=4 )
以下是一個狀態圖,描述了 rendezvous 的運作方式。

註冊表¶
- class torch.distributed.elastic.rendezvous.RendezvousParameters(backend, endpoint, run_id, min_nodes, max_nodes, local_addr=None, **kwargs)[source][source]¶
保存用於建構
RendezvousHandler
的參數。- 參數
- class torch.distributed.elastic.rendezvous.RendezvousHandlerRegistry[原始碼][原始碼]¶
表示
RendezvousHandler
後端的註冊表。
Handler¶
- class torch.distributed.elastic.rendezvous.RendezvousHandler[原始碼][原始碼]¶
主要 rendezvous 介面。
注意
Distributed Torch 使用者通常**不需要**實作他們自己的
RendezvousHandler
。基於 C10d Store 的實作已經提供,並且建議大多數使用者使用。- abstract get_run_id()[原始碼][原始碼]¶
傳回 rendezvous 的執行 ID。
執行 ID 是一個使用者定義的 ID,它唯一標識分散式應用程式的一個實例。它通常對應於一個作業 ID,用於允許節點加入正確的分散式應用程式。
- 傳回類型
- abstract is_closed()[原始碼][原始碼]¶
檢查 rendezvous 是否已關閉。
關閉的 rendezvous 表示所有未來在同一作業內重新 rendezvous 的嘗試都將失敗。
is_closed()
和set_closed()
具有最終傳播的語意,不應用於同步。其意圖是,如果至少一個節點確定作業已完成,它將關閉 rendezvous,而其他節點將很快觀察到這一點並停止運行。- 傳回類型
- abstract next_rendezvous()[原始碼][原始碼]¶
進入 rendezvous 屏障的主要入口點。
阻塞直到 rendezvous 完成,並且當前進程包含在形成的 worker 群組中,或者發生逾時,或者 rendezvous 被標記為已關閉。
- 傳回
RendezvousInfo
的實例。- 引發
RendezvousClosedError – rendezvous 已關閉。
RendezvousConnectionError – 連接到 rendezvous 後端失敗。
RendezvousStateError – rendezvous 狀態已損毀。
RendezvousTimeoutError – 會合程序未及時完成。
- 傳回類型
- abstract num_nodes_waiting()[source][source]¶
傳回在會合屏障處延遲抵達,因此未被包含在目前工作節點群組中的節點數量。
呼叫者應定期呼叫此方法,以檢查是否有新的節點正在等待加入任務,如果是,則透過呼叫
next_rendezvous()
(重新會合) 來允許它們加入。- 傳回類型
- abstract shutdown()[source][source]¶
關閉為會合程序開啟的所有資源。
範例
rdzv_handler = ... try: store, rank, world_size = rdzv_handler.next_rendezvous() finally: rdzv_handler.shutdown()
- 傳回類型
- property use_agent_store: bool¶
指出
next_rendezvous()
傳回的儲存參考可以與使用者應用程式共享,並在應用程式生命週期內可用。Rendezous handler impl 將以
RendezvousStoreInfo
的實例形式共享儲存詳細資訊。 應用程式按照慣例使用 MASTER_ADDR/MASTER_PORT 環境變數來查詢儲存。
資料類別¶
- class torch.distributed.elastic.rendezvous.RendezvousInfo(store, rank, world_size, bootstrap_store_info)[source][source]¶
保存有關會合程序的資訊。
例外¶
- class torch.distributed.elastic.rendezvous.api.RendezvousClosedError[原始碼][原始碼]¶
當 rendezvous(會合點)關閉時引發。
- class torch.distributed.elastic.rendezvous.api.RendezvousTimeoutError[原始碼][原始碼]¶
當 rendezvous 未及時完成時引發。
- class torch.distributed.elastic.rendezvous.api.RendezvousConnectionError[原始碼][原始碼]¶
當與 rendezvous 後端的連線失敗時引發。
實作¶
動態 Rendezvous¶
- torch.distributed.elastic.rendezvous.dynamic_rendezvous.create_handler(store, backend, params)[原始碼][原始碼]¶
從指定的參數建立一個新的
DynamicRendezvousHandler
。- 參數
store (Store) – C10d store,將作為 rendezvous 的一部分返回。
backend (RendezvousBackend) – 用於保存 rendezvous 狀態的後端。
- 傳回類型
參數
描述
join_timeout
rendezvous 預期完成的總時間(以秒為單位)。預設值為 600 秒。
last_call_timeout
達到最少節點數後,完成 rendezvous 之前的額外等待時間(以秒為單位)。預設值為 30 秒。
close_timeout
在呼叫
RendezvousHandler.set_closed()
或RendezvousHandler.shutdown()
後,rendezvous 預期關閉的時間(以秒為單位)。預設值為 30 秒。
- class torch.distributed.elastic.rendezvous.dynamic_rendezvous.DynamicRendezvousHandler[原始碼][原始碼]¶
表示一個處理器,用於在一組節點之間設定 rendezvous。
- classmethod from_backend(run_id, store, backend, min_nodes, max_nodes, local_addr=None, timeout=None)[原始碼][原始碼]¶
建立一個新的
DynamicRendezvousHandler
。- 參數
run_id (str) – rendezvous 的執行 ID。
store (Store) – C10d store,將作為 rendezvous 的一部分返回。
backend (RendezvousBackend) – 用於保存 rendezvous 狀態的後端。
min_nodes (int) – 允許加入 rendezvous 的最小節點數。
max_nodes (int) – 允許加入 rendezvous 的最大節點數。
timeout (Optional[RendezvousTimeout]) – 交會點的逾時設定。
- class torch.distributed.elastic.rendezvous.dynamic_rendezvous.RendezvousBackend[原始碼][原始碼]¶
表示持有交會點狀態的後端。
- abstract get_state()[原始碼][原始碼]¶
取得交會點狀態。
- 傳回
一個包含編碼後的交會點狀態及其 fencing token 的 tuple,如果後端中沒有找到狀態,則為
None
。- 引發
RendezvousConnectionError – 連接到後端失敗。
RendezvousStateError – rendezvous 狀態已損毀。
- 傳回類型
Optional[Tuple[bytes, Any]]
- abstract property name: str¶
取得後端的名稱。
- abstract set_state(state, token=None)[原始碼][原始碼]¶
設定交會點狀態。
新的交會點狀態是有條件設定的
如果指定的
token
與儲存在後端的 fencing token 匹配,則狀態將被更新。 新狀態將與其 fencing token 一起返回給調用者。如果指定的
token
與儲存在後端的 fencing token 不匹配,則狀態將不會被更新; 而是將現有狀態及其 fencing token 返回給調用者。如果指定的
token
是None
,則僅當後端中沒有現有狀態時,才會設定新狀態。 新狀態或現有狀態及其 fencing token 都將返回給調用者。
- 參數
state (bytes) – 編碼後的交會點狀態。
token (Optional[Any]) – 一個可選的 fencing token,該 token 是通過先前調用
get_state()
或set_state()
檢索的。
- 傳回
一個包含序列化的交會點狀態、其 fencing token 和一個布林值的 tuple,該布林值指示我們的設定嘗試是否成功。
- 引發
RendezvousConnectionError – 連接到後端失敗。
RendezvousStateError – rendezvous 狀態已損毀。
- 傳回類型
Optional[Tuple[bytes, Any, bool]]
- class torch.distributed.elastic.rendezvous.dynamic_rendezvous.RendezvousTimeout(join=None, last_call=None, close=None, heartbeat=None)[原始碼][原始碼]¶
持有交會點的逾時設定。
- 參數
C10d Backend¶
- torch.distributed.elastic.rendezvous.c10d_rendezvous_backend.create_backend(params)[source][source]¶
從指定的參數建立新的
C10dRendezvousBackend
。參數
描述
store_type
C10d 儲存的類型。目前支援的類型為 “tcp” 和 “file”,分別對應於
torch.distributed.TCPStore
和torch.distributed.FileStore
。預設值為 “tcp”。read_timeout
儲存操作的讀取逾時時間(以秒為單位)。預設值為 60 秒。
請注意,這僅適用於
torch.distributed.TCPStore
。它與torch.distributed.FileStore
無關,因為它不將逾時時間作為參數。is_host
一個布林值,指示此 backend 實例是否將託管 C10d 儲存。如果未指定,則會透過將此機器的 hostname 或 IP 位址與指定的 rendezvous 端點比對來進行啟發式推斷。預設值為
None
。請注意,此配置選項僅適用於
torch.distributed.TCPStore
。在正常情況下,您可以安全地跳過它;唯一需要它的時候是無法正確判斷其值的時候 (例如,rendezvous 端點具有 CNAME 作為 hostname 或與機器的 FQDN 不符)。- 傳回類型
Etcd Backend¶
- torch.distributed.elastic.rendezvous.etcd_rendezvous_backend.create_backend(params)[source][source]¶
從指定的參數建立一個新的
EtcdRendezvousBackend
。參數
描述
read_timeout
etcd 操作的讀取逾時時間,以秒為單位。預設值為 60 秒。
protocol
用於與 etcd 通訊的協定。有效值為 “http” 和 “https”。預設值為 “http”。
ssl_cert
與 HTTPS 一起使用的 SSL 客戶端憑證的路徑。預設值為
None
。ssl_cert_key
與 HTTPS 一起使用的 SSL 客戶端憑證私鑰的路徑。預設值為
None
。ca_cert
root SSL 授權憑證的路徑。預設值為
None
。- 傳回類型
Etcd Rendezvous (舊版)¶
警告
DynamicRendezvousHandler
類別取代了 EtcdRendezvousHandler
類別,建議大多數使用者使用。 EtcdRendezvousHandler
處於維護模式,未來將被棄用。
- class torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvousHandler(rdzv_impl, local_addr)[source][source]¶
實作一個由
torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvous
支援的torch.distributed.elastic.rendezvous.RendezvousHandler
介面。EtcdRendezvousHandler
使用 URL 來配置要使用的 rendezvous 類型,並將特定於實作的配置傳遞給 rendezvous 模組。基本的 etcd rendezvous 配置 URL 如下所示etcd://<etcd_address>:<port>/<job_id>?min_workers=<min_workers>&max_workers=<max_workers> # noqa: W605 -- example -- etcd://127.0.0.1:2379/1234?min_workers=1&max_workers=3
上面的 URL 解釋如下
使用使用
etcd
方案註冊的 rendezvous 處理常式要使用的
etcd
端點是localhost:2379
job_id == 1234
用作 etcd 中的前綴(這允許為多個任務共享一個通用的 etcd 伺服器,只要能保證job_ids
的唯一性)。請注意,任務 ID 可以是任何字串(例如,不需要是數字),只要它是唯一的即可。min_workers=1
和max_workers=3
指定了成員數量的範圍 - 只要叢集大小大於或等於min_workers
,Torch Distributed Elastic 就會開始運行任務,並且最多允許max_workers
個工作節點加入叢集。
以下是可以傳遞給 etcd rendezvous 的完整參數列表
參數
描述
min_workers
rendezvous 有效的最小工作節點數
max_workers
允許加入的最大工作節點數
timeout
next_rendezvous 預期成功的總逾時時間(預設為 600 秒)
last_call_timeout
達到最小工作節點數後額外的等待時間(「最後呼叫」)(預設為 30 秒)
etcd_prefix
路徑前綴(從 etcd 根目錄開始),所有 etcd 節點將在其中建立(預設為
/torchelastic/p2p
)
Etcd 儲存¶
EtcdStore
是 C10d Store
實例類型,當 etcd 用作 rendezvous 後端時,由 next_rendezvous()
返回。
- class torch.distributed.elastic.rendezvous.etcd_store.EtcdStore(etcd_client, etcd_store_prefix, timeout=None)[source][source]¶
透過依附在 rendezvous etcd 實例上來實現 c10 Store 介面。
這是
EtcdRendezvous
返回的儲存物件。- add(key, num)[source][source]¶
以原子方式將一個值增加一個整數值。
整數使用以 10 為基底的字串表示。如果 key 不存在,則預設值將為
0
。- 傳回
新的(遞增的)值
- 傳回類型
Etcd 伺服器¶
EtcdServer
是一個方便的類別,可讓您輕鬆地在子程序上啟動和停止 etcd 伺服器。這對於測試或單節點(多工作節點)部署很有用,在這種情況下,手動設定 etcd 伺服器很麻煩。
警告
對於生產環境和多節點部署,請考慮正確部署高可用性的 etcd 伺服器,因為這是您分散式任務的單點故障。
- class torch.distributed.elastic.rendezvous.etcd_server.EtcdServer(data_dir=None)[source][source]¶
注意
已於 etcd server v3.4.3 上測試。
在隨機的可用埠上啟動和停止本地獨立的 etcd 伺服器。 適用於單節點、多工作程序的啟動或測試,其中 sidecar etcd 伺服器比單獨設置 etcd 伺服器更方便。
此類別註冊一個終止處理程序,以在退出時關閉 etcd 子進程。 此終止處理程序不能取代呼叫
stop()
方法。使用以下回退機制來尋找 etcd 二進制檔案
使用環境變數 TORCHELASTIC_ETCD_BINARY_PATH
如果存在,則使用
<this file root>/bin/etcd
從
PATH
中使用etcd
用法
server = EtcdServer("/usr/bin/etcd", 2379, "/tmp/default.etcd") server.start() client = server.get_client() # use client server.stop()
- 參數
etcd_binary_path – etcd 伺服器二進制檔案的路徑 (有關回退路徑,請參閱上文)