捷徑

Rendezvous

在 Torch 分散式彈性的上下文中,我們使用術語 *rendezvous* 來指稱結合了**分散式同步**原始物件與**對等點探索**的特定功能。

Torch 分散式彈性使用它來收集訓練任務的參與者(即節點),以便它們都同意相同的參與者清單以及每個人的角色,並在訓練何時可以開始/恢復時做出一致的集體決定。

Torch 分散式彈性 rendezvous 提供以下關鍵功能

Barrier:

執行 rendezvous 的節點將全部阻塞,直到 rendezvous 被認為已完成 - 當至少 min 個節點已加入 rendezvous barrier(對於同一任務)時,就會發生這種情況。 這也意味著 barrier 不一定是固定大小的。

在達到 min 個節點數量後,還有一個額外的短暫等待時間 - 這用於確保 rendezvous 不會「太快」完成(這可能會排除在大約同一時間嘗試加入的其他節點)。

如果在 barrier 收集了 max 個節點,則 rendezvous 會立即完成。

此外,還有一個整體逾時設定,如果始終未達到節點的min數量,則會導致 rendezvous 失敗 - 這是一個簡單的防呆機制,用於在資源管理器出現問題時釋放部分分配的工作資源,並且應被視為不可重試的錯誤。

獨佔性:

一個簡單的分散式 barrier 是不夠的,因為我們還需要確保在任何給定時間(對於給定的工作)只存在一組節點。換句話說,新的節點(即稍後加入的節點)不應該能夠為同一工作形成並行的獨立工作節點群組。

Torch Distributed Elastic rendezvous 確保如果一組節點已經完成 rendezvous(因此可能已經在訓練),那麼嘗試進行 rendezvous 的其他「延遲」節點只會聲明它們正在等待,並且必須等到(先前完成的)現有 rendezvous 首先被銷毀。

一致性:

當 rendezvous 完成時,所有成員都將對工作成員資格以及每個成員在其中的角色達成一致。此角色使用一個整數表示,稱為 rank,介於 0 和 world size 之間。

請注意,rank 是不穩定的,因為同一個節點在下一次(重新)rendezvous 中可能會被分配到不同的 rank。

容錯性:

Torch Distributed Elastic rendezvous 旨在容忍 rendezvous 過程中節點的故障。如果一個進程在加入 rendezvous 和完成 rendezvous 之間崩潰(或失去網路連線等),則會自動與剩餘的健康節點重新進行 rendezvous。

節點也可能在完成 rendezvous 之後(或被其他節點觀察到已完成)發生故障 - 這種情況將由 Torch Distributed Elastic train_loop 處理(並且也將觸發重新 rendezvous)。

共享鍵值儲存:

當 rendezvous 完成時,會建立並傳回一個共享鍵值儲存。此儲存實現了 torch.distributed.Store API(請參閱分散式通訊文件)。

此儲存僅由已完成 rendezvous 的成員共享。它旨在由 Torch Distributed Elastic 用於交換初始化工作控制和資料平面所需的信息。

等待中的 workers 和 rendezvous 關閉:

Torch Distributed Elastic rendezvous 處理物件提供了額外的功能,這些功能在技術上不屬於 rendezvous 過程的一部分

  1. 查詢有多少 workers 延遲到達 barrier,他們可以參與下一次 rendezvous。

  2. 設定 rendezvous 為關閉,以向所有節點發出訊號,指示不要參與下一次 rendezvous。

DynamicRendezvousHandler:

Torch Distributed Elastic 包含 DynamicRendezvousHandler 類別,該類別實現了上述 rendezvous 機制。它是一種與後端無關的類型,需要一個特定的 RendezvousBackend 實例在建構期間指定。

Torch distributed 使用者可以實作他們自己的後端類型,或者使用 PyTorch 附帶的以下實作之一

以下是一個狀態圖,描述了 rendezvous 的運作方式。

../_images/etcd_rdzv_diagram.png

註冊表

class torch.distributed.elastic.rendezvous.RendezvousParameters(backend, endpoint, run_id, min_nodes, max_nodes, local_addr=None, **kwargs)[source][source]

保存用於建構 RendezvousHandler 的參數。

參數
  • backend (str) – 用於處理 rendezvous 的後端名稱。

  • endpoint (str) – rendezvous 的端點,通常為 <hostname>[:<port>] 格式。

  • run_id (str) – rendezvous 的 ID。

  • min_nodes (int) – 允許加入 rendezvous 的最小節點數。

  • max_nodes (int) – 允許加入 rendezvous 的最大節點數。

  • local_addr (Optional[str]) – 本機節點的位址。

  • **kwargs – 指定後端的其他參數。

get(key, default=None)[原始碼][原始碼]

如果存在 key,則傳回 key 的值,否則傳回 default

傳回類型

Any

get_as_bool(key, default=None)[原始碼][原始碼]

傳回 key 的值,並轉換為 bool 類型。

傳回類型

Optional[bool]

get_as_int(key, default=None)[原始碼][原始碼]

傳回 key 的值,並轉換為 int 類型。

傳回類型

Optional[int]

class torch.distributed.elastic.rendezvous.RendezvousHandlerRegistry[原始碼][原始碼]

表示 RendezvousHandler 後端的註冊表。

Handler

class torch.distributed.elastic.rendezvous.RendezvousHandler[原始碼][原始碼]

主要 rendezvous 介面。

注意

Distributed Torch 使用者通常**不需要**實作他們自己的 RendezvousHandler。基於 C10d Store 的實作已經提供,並且建議大多數使用者使用。

abstract get_backend()[原始碼][原始碼]

傳回 rendezvous 後端的名稱。

傳回類型

str

abstract get_run_id()[原始碼][原始碼]

傳回 rendezvous 的執行 ID。

執行 ID 是一個使用者定義的 ID,它唯一標識分散式應用程式的一個實例。它通常對應於一個作業 ID,用於允許節點加入正確的分散式應用程式。

傳回類型

str

abstract is_closed()[原始碼][原始碼]

檢查 rendezvous 是否已關閉。

關閉的 rendezvous 表示所有未來在同一作業內重新 rendezvous 的嘗試都將失敗。

is_closed()set_closed() 具有最終傳播的語意,不應用於同步。其意圖是,如果至少一個節點確定作業已完成,它將關閉 rendezvous,而其他節點將很快觀察到這一點並停止運行。

傳回類型

bool

abstract next_rendezvous()[原始碼][原始碼]

進入 rendezvous 屏障的主要入口點。

阻塞直到 rendezvous 完成,並且當前進程包含在形成的 worker 群組中,或者發生逾時,或者 rendezvous 被標記為已關閉。

傳回

RendezvousInfo 的實例。

引發
傳回類型

RendezvousInfo

abstract num_nodes_waiting()[source][source]

傳回在會合屏障處延遲抵達,因此未被包含在目前工作節點群組中的節點數量。

呼叫者應定期呼叫此方法,以檢查是否有新的節點正在等待加入任務,如果是,則透過呼叫next_rendezvous() (重新會合) 來允許它們加入。

傳回類型

int

abstract set_closed()[source][source]

將會合程序標記為已關閉。

abstract shutdown()[source][source]

關閉為會合程序開啟的所有資源。

範例

rdzv_handler = ...
try:
    store, rank, world_size = rdzv_handler.next_rendezvous()
finally:
    rdzv_handler.shutdown()
傳回類型

bool

property use_agent_store: bool

指出 next_rendezvous() 傳回的儲存參考可以與使用者應用程式共享,並在應用程式生命週期內可用。

Rendezous handler impl 將以 RendezvousStoreInfo 的實例形式共享儲存詳細資訊。 應用程式按照慣例使用 MASTER_ADDR/MASTER_PORT 環境變數來查詢儲存。

資料類別

class torch.distributed.elastic.rendezvous.RendezvousInfo(store, rank, world_size, bootstrap_store_info)[source][source]

保存有關會合程序的資訊。

class torch.distributed.elastic.rendezvous.api.RendezvousStoreInfo(master_addr, master_port)[source][source]

儲存位址和埠,可用於引導訓練器分散式通訊

static build(rank, store)[source][source]

工廠方法,在 rank0 主機上尋找未使用的新埠,並與所有 ranks 共享 addr/port 資訊。

如果知道 master_addr/master_port(在共享現有 tcp store server 時很有用),請使用建構函式。

參數
  • rank (int) – 目前節點的 rank

  • store (Store) – 用於會合程序的儲存

  • local_addr (Optional[str]) – 目前節點的位址,如果未提供,將從主機名稱解析

  • server_port (Optional[int]) – TCPStore 伺服器的埠,當 TCPStore 被共享時。

傳回類型

RendezvousStoreInfo

例外

class torch.distributed.elastic.rendezvous.api.RendezvousError[source][source]

表示會合程序錯誤的基本類型。

class torch.distributed.elastic.rendezvous.api.RendezvousClosedError[原始碼][原始碼]

當 rendezvous(會合點)關閉時引發。

class torch.distributed.elastic.rendezvous.api.RendezvousTimeoutError[原始碼][原始碼]

當 rendezvous 未及時完成時引發。

class torch.distributed.elastic.rendezvous.api.RendezvousConnectionError[原始碼][原始碼]

當與 rendezvous 後端的連線失敗時引發。

class torch.distributed.elastic.rendezvous.api.RendezvousStateError[原始碼][原始碼]

當 rendezvous 的狀態損壞時引發。

class torch.distributed.elastic.rendezvous.api.RendezvousGracefulExitError[原始碼][原始碼]

當節點未包含在 rendezvous 中並正常退出時引發。

例外情況是一種退出堆疊的機制,但不代表失敗。

實作

動態 Rendezvous

torch.distributed.elastic.rendezvous.dynamic_rendezvous.create_handler(store, backend, params)[原始碼][原始碼]

從指定的參數建立一個新的 DynamicRendezvousHandler

參數
  • store (Store) – C10d store,將作為 rendezvous 的一部分返回。

  • backend (RendezvousBackend) – 用於保存 rendezvous 狀態的後端。

傳回類型

DynamicRendezvousHandler

參數

描述

join_timeout

rendezvous 預期完成的總時間(以秒為單位)。預設值為 600 秒。

last_call_timeout

達到最少節點數後,完成 rendezvous 之前的額外等待時間(以秒為單位)。預設值為 30 秒。

close_timeout

在呼叫 RendezvousHandler.set_closed()RendezvousHandler.shutdown() 後,rendezvous 預期關閉的時間(以秒為單位)。預設值為 30 秒。

class torch.distributed.elastic.rendezvous.dynamic_rendezvous.DynamicRendezvousHandler[原始碼][原始碼]

表示一個處理器,用於在一組節點之間設定 rendezvous。

classmethod from_backend(run_id, store, backend, min_nodes, max_nodes, local_addr=None, timeout=None)[原始碼][原始碼]

建立一個新的 DynamicRendezvousHandler

參數
  • run_id (str) – rendezvous 的執行 ID。

  • store (Store) – C10d store,將作為 rendezvous 的一部分返回。

  • backend (RendezvousBackend) – 用於保存 rendezvous 狀態的後端。

  • min_nodes (int) – 允許加入 rendezvous 的最小節點數。

  • max_nodes (int) – 允許加入 rendezvous 的最大節點數。

  • local_addr (Optional[str]) – 本機節點位址。

  • timeout (Optional[RendezvousTimeout]) – 交會點的逾時設定。

class torch.distributed.elastic.rendezvous.dynamic_rendezvous.RendezvousBackend[原始碼][原始碼]

表示持有交會點狀態的後端。

abstract get_state()[原始碼][原始碼]

取得交會點狀態。

傳回

一個包含編碼後的交會點狀態及其 fencing token 的 tuple,如果後端中沒有找到狀態,則為 None

引發
傳回類型

Optional[Tuple[bytes, Any]]

abstract property name: str

取得後端的名稱。

abstract set_state(state, token=None)[原始碼][原始碼]

設定交會點狀態。

新的交會點狀態是有條件設定的

  • 如果指定的 token 與儲存在後端的 fencing token 匹配,則狀態將被更新。 新狀態將與其 fencing token 一起返回給調用者。

  • 如果指定的 token 與儲存在後端的 fencing token 不匹配,則狀態將不會被更新; 而是將現有狀態及其 fencing token 返回給調用者。

  • 如果指定的 tokenNone,則僅當後端中沒有現有狀態時,才會設定新狀態。 新狀態或現有狀態及其 fencing token 都將返回給調用者。

參數
  • state (bytes) – 編碼後的交會點狀態。

  • token (Optional[Any]) – 一個可選的 fencing token,該 token 是通過先前調用 get_state()set_state() 檢索的。

傳回

一個包含序列化的交會點狀態、其 fencing token 和一個布林值的 tuple,該布林值指示我們的設定嘗試是否成功。

引發
傳回類型

Optional[Tuple[bytes, Any, bool]]

class torch.distributed.elastic.rendezvous.dynamic_rendezvous.RendezvousTimeout(join=None, last_call=None, close=None, heartbeat=None)[原始碼][原始碼]

持有交會點的逾時設定。

參數
  • join (Optional[timedelta]) – 期望交會點完成的時間。

  • last_call (Optional[timedelta]) – 在完成 rendezvous 之前,當 rendezvous 擁有最少數量的必要參與者後,額外的等待時間。

  • close (Optional[timedelta]) – 在呼叫 RendezvousHandler.set_closed()RendezvousHandler.shutdown() 後,rendezvous 預期關閉的時間範圍。

  • keep_alive – 預期完成 keep-alive heartbeat 的時間範圍。

property close: timedelta

取得關閉逾時時間。

property heartbeat: timedelta

取得 keep-alive heartbeat 逾時時間。

property join: timedelta

取得加入逾時時間。

property last_call: timedelta

取得最後呼叫逾時時間。

C10d Backend

torch.distributed.elastic.rendezvous.c10d_rendezvous_backend.create_backend(params)[source][source]

從指定的參數建立新的 C10dRendezvousBackend

參數

描述

store_type

C10d 儲存的類型。目前支援的類型為 “tcp” 和 “file”,分別對應於 torch.distributed.TCPStoretorch.distributed.FileStore。預設值為 “tcp”。

read_timeout

儲存操作的讀取逾時時間(以秒為單位)。預設值為 60 秒。

請注意,這僅適用於 torch.distributed.TCPStore。它與 torch.distributed.FileStore 無關,因為它不將逾時時間作為參數。

is_host

一個布林值,指示此 backend 實例是否將託管 C10d 儲存。如果未指定,則會透過將此機器的 hostname 或 IP 位址與指定的 rendezvous 端點比對來進行啟發式推斷。預設值為 None

請注意,此配置選項僅適用於 torch.distributed.TCPStore。在正常情況下,您可以安全地跳過它;唯一需要它的時候是無法正確判斷其值的時候 (例如,rendezvous 端點具有 CNAME 作為 hostname 或與機器的 FQDN 不符)。

傳回類型

Tuple[C10dRendezvousBackend, Store]

class torch.distributed.elastic.rendezvous.c10d_rendezvous_backend.C10dRendezvousBackend(store, run_id)[source][source]

代表以 C10d 為後端的 rendezvous backend。

參數
get_state()[source][source]

請參閱基底類別。

傳回類型

Optional[Tuple[bytes, Any]]

property name: str

請參閱基底類別。

set_state(state, token=None)[source][source]

請參閱基底類別。

傳回類型

Optional[Tuple[bytes, Any, bool]]

Etcd Backend

torch.distributed.elastic.rendezvous.etcd_rendezvous_backend.create_backend(params)[source][source]

從指定的參數建立一個新的 EtcdRendezvousBackend

參數

描述

read_timeout

etcd 操作的讀取逾時時間,以秒為單位。預設值為 60 秒。

protocol

用於與 etcd 通訊的協定。有效值為 “http” 和 “https”。預設值為 “http”。

ssl_cert

與 HTTPS 一起使用的 SSL 客戶端憑證的路徑。預設值為 None

ssl_cert_key

與 HTTPS 一起使用的 SSL 客戶端憑證私鑰的路徑。預設值為 None

ca_cert

root SSL 授權憑證的路徑。預設值為 None

傳回類型

Tuple[EtcdRendezvousBackend, Store]

class torch.distributed.elastic.rendezvous.etcd_rendezvous_backend.EtcdRendezvousBackend(client, run_id, key_prefix=None, ttl=None)[source][source]

表示基於 etcd 的 rendezvous 後端。

參數
  • client (Client) – 用於與 etcd 通訊的 etcd.Client 實例。

  • run_id (str) – rendezvous 的執行 ID。

  • key_prefix (Optional[str]) – 用於在 etcd 中儲存 rendezvous 狀態的路徑。

  • ttl (Optional[int]) – Rendezvous 狀態的 TTL。如果未指定,則預設為兩個小時。

get_state()[source][source]

請參閱基底類別。

傳回類型

Optional[Tuple[bytes, Any]]

property name: str

請參閱基底類別。

set_state(state, token=None)[source][source]

請參閱基底類別。

傳回類型

Optional[Tuple[bytes, Any, bool]]

Etcd Rendezvous (舊版)

警告

DynamicRendezvousHandler 類別取代了 EtcdRendezvousHandler 類別,建議大多數使用者使用。 EtcdRendezvousHandler 處於維護模式,未來將被棄用。

class torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvousHandler(rdzv_impl, local_addr)[source][source]

實作一個由 torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvous 支援的 torch.distributed.elastic.rendezvous.RendezvousHandler 介面。EtcdRendezvousHandler 使用 URL 來配置要使用的 rendezvous 類型,並將特定於實作的配置傳遞給 rendezvous 模組。基本的 etcd rendezvous 配置 URL 如下所示

etcd://<etcd_address>:<port>/<job_id>?min_workers=<min_workers>&max_workers=<max_workers>  # noqa: W605

-- example --

etcd://127.0.0.1:2379/1234?min_workers=1&max_workers=3

上面的 URL 解釋如下

  1. 使用使用 etcd 方案註冊的 rendezvous 處理常式

  2. 要使用的 etcd 端點是 localhost:2379

  3. job_id == 1234 用作 etcd 中的前綴(這允許為多個任務共享一個通用的 etcd 伺服器,只要能保證 job_ids 的唯一性)。請注意,任務 ID 可以是任何字串(例如,不需要是數字),只要它是唯一的即可。

  4. min_workers=1max_workers=3 指定了成員數量的範圍 - 只要叢集大小大於或等於 min_workers,Torch Distributed Elastic 就會開始運行任務,並且最多允許 max_workers 個工作節點加入叢集。

以下是可以傳遞給 etcd rendezvous 的完整參數列表

參數

描述

min_workers

rendezvous 有效的最小工作節點數

max_workers

允許加入的最大工作節點數

timeout

next_rendezvous 預期成功的總逾時時間(預設為 600 秒)

last_call_timeout

達到最小工作節點數後額外的等待時間(「最後呼叫」)(預設為 30 秒)

etcd_prefix

路徑前綴(從 etcd 根目錄開始),所有 etcd 節點將在其中建立(預設為 /torchelastic/p2p

Etcd 儲存

EtcdStore 是 C10d Store 實例類型,當 etcd 用作 rendezvous 後端時,由 next_rendezvous() 返回。

class torch.distributed.elastic.rendezvous.etcd_store.EtcdStore(etcd_client, etcd_store_prefix, timeout=None)[source][source]

透過依附在 rendezvous etcd 實例上來實現 c10 Store 介面。

這是 EtcdRendezvous 返回的儲存物件。

add(key, num)[source][source]

以原子方式將一個值增加一個整數值。

整數使用以 10 為基底的字串表示。如果 key 不存在,則預設值將為 0

傳回

新的(遞增的)值

傳回類型

int

check(keys)[source][source]

檢查是否所有鍵都立即存在(無需等待)。

傳回類型

bool

get(key)[source][source]

通過鍵獲取值,可能執行阻塞等待。

如果 key 沒有立即出現,將會阻塞等待最多 timeout 時間,或者直到 key 被發布。

傳回

value (bytes)

引發

LookupError - 如果在逾時後 key 仍然未發布

傳回類型

bytes

set(key, value)[source][source]

將鍵/值對寫入 EtcdStore

鍵和值都可以是 Python strbytes

wait(keys, override_timeout=None)[source][source]

等待直到所有鍵都被發布,或者直到逾時。

引發

LookupError - 如果發生逾時

Etcd 伺服器

EtcdServer 是一個方便的類別,可讓您輕鬆地在子程序上啟動和停止 etcd 伺服器。這對於測試或單節點(多工作節點)部署很有用,在這種情況下,手動設定 etcd 伺服器很麻煩。

警告

對於生產環境和多節點部署,請考慮正確部署高可用性的 etcd 伺服器,因為這是您分散式任務的單點故障。

class torch.distributed.elastic.rendezvous.etcd_server.EtcdServer(data_dir=None)[source][source]

注意

已於 etcd server v3.4.3 上測試。

在隨機的可用埠上啟動和停止本地獨立的 etcd 伺服器。 適用於單節點、多工作程序的啟動或測試,其中 sidecar etcd 伺服器比單獨設置 etcd 伺服器更方便。

此類別註冊一個終止處理程序,以在退出時關閉 etcd 子進程。 此終止處理程序不能取代呼叫 stop() 方法。

使用以下回退機制來尋找 etcd 二進制檔案

  1. 使用環境變數 TORCHELASTIC_ETCD_BINARY_PATH

  2. 如果存在,則使用 <this file root>/bin/etcd

  3. PATH 中使用 etcd

用法

server = EtcdServer("/usr/bin/etcd", 2379, "/tmp/default.etcd")
server.start()
client = server.get_client()
# use client
server.stop()
參數

etcd_binary_path – etcd 伺服器二進制檔案的路徑 (有關回退路徑,請參閱上文)

文件

存取 PyTorch 的完整開發人員文件

檢視文件

教學課程

取得針對初學者和進階開發人員的深入教學課程

檢視教學課程

資源

尋找開發資源並取得問題的解答

檢視資源