捷徑

Elastic Agent

伺服器

elastic agent 是 torchelastic 的控制平面。

它是一個啟動和管理底層 worker 程序的程序。 agent 負責

  1. 與分散式 torch 協同工作:worker 在啟動時會擁有所有必要的資訊,可以成功且輕鬆地呼叫 torch.distributed.init_process_group()

  2. 容錯:監視 worker,並在偵測到 worker 故障或不健康時,拆除所有 worker 並重新啟動所有 worker。

  3. 彈性:對成員變更做出反應,並使用新成員重新啟動 worker。

最簡單的 Agent 會部署在每個節點上,並與本地程序協同運作。更進階的 Agent 可以遠端啟動及管理 Worker。Agent 可以完全去中心化,根據其管理的 Worker 來做決策。或者也可以進行協調,與其他 Agent(管理同一個 Job 中的 Worker)進行溝通,以做出集體決策。

以下是管理本地 Worker 群組的 Agent 的示意圖。

../_images/agent_diagram.jpg

概念

本節介紹與理解 torchelastic 中 agent 角色相關的高階類別和概念。

class torch.distributed.elastic.agent.server.ElasticAgent[原始碼][原始碼]

Agent 程序負責管理一個或多個 Worker 程序。

Worker 程序應為常規的分散式 PyTorch 腳本。當 Worker 程序由 Agent 建立時,Agent 會提供必要的資訊,以便 Worker 程序正確初始化 torch 程序群組。

Agent 到 Worker 的確切部署拓撲和比率取決於 Agent 的具體實現和使用者的 Job 放置偏好。例如,要在 GPU 上使用 8 個 Trainer(每個 GPU 一個)運行分散式訓練 Job,可以:

  1. 使用 8 個單 GPU 實例,每個實例放置一個 Agent,每個 Agent 管理 1 個 Worker。

  2. 使用 4 個雙 GPU 實例,每個實例放置一個 Agent,每個 Agent 管理 2 個 Worker。

  3. 使用 2 個四 GPU 實例,每個實例放置一個 Agent,每個 Agent 管理 4 個 Worker。

  4. 使用 1 個 8 GPU 實例,每個實例放置一個 Agent,每個 Agent 管理 8 個 Worker。

用法

group_result = agent.run()
 if group_result.is_failed():
   # workers failed
   failure = group_result.failures[0]
   logger.exception("worker 0 failed with exit code : %s", failure.exit_code)
 else:
   return group_result.return_values[0] # return rank 0's results
abstract get_worker_group(role='default')[原始碼][原始碼]

傳回給定 roleWorkerGroup

請注意,Worker 群組是一個可變物件,因此在多執行緒/程序的環境中,其狀態可能會變更。建議(但非必要)實現者傳回防禦性的唯讀副本。

傳回類型

WorkerGroup

abstract run(role='default')[原始碼][原始碼]

執行 Agent。

支援在失敗時重試 Worker 群組,最多 max_restarts 次。

傳回

執行的結果,包含每個 Worker 的傳回值或失敗詳細資訊,並按照 Worker 的全域排名進行對應。

引發

Exception - 任何其他與 Worker 程序無關的失敗

傳回類型

RunResult

class torch.distributed.elastic.agent.server.WorkerSpec(role, local_world_size, rdzv_handler, fn=None, entrypoint=None, args=(), max_restarts=3, monitor_interval=0.1, master_port=None, master_addr=None, local_addr=None)[原始碼][原始碼]

關於特定類型 Worker 的藍圖資訊。

對於給定的 Role,必須只存在一個 Worker Spec。Worker Spec 預期在所有節點(機器)上是同質的,也就是說,每個節點都為特定的 Spec 運行相同數量的 Worker。

參數
  • role (str) – 此 Spec 的 Worker 的使用者定義 Role

  • local_world_size (int) – 要運行的本地 Worker 數量

  • fn (Optional[Callable]) – (已棄用,請改用 entrypoint)

  • entrypoint (Optional[Union[Callable, str]]) – Worker 函數或命令

  • args (Tuple) – 要傳遞給 entrypoint 的引數

  • rdzv_handler (RendezvousHandler) – 處理這組 Worker 的 rdzv

  • max_restarts (int) – worker 的最大重試次數

  • monitor_interval (float) – 每 n 秒監控 worker 的狀態

  • master_port (Optional[int]) – 執行 rank 0 上 c10d store 的固定 port。如果未指定,則會選擇一個隨機的可用 port

  • master_addr (Optional[str]) – 執行 rank 0 上 c10d store 的固定 master_addr。如果未指定,則會選擇 agent rank 0 上的 hostname

  • redirects – 將標準輸出流重新導向到檔案。可以通過傳遞一個映射表,有選擇性地針對特定本地 rank 進行重新導向

  • tee – 將指定的標準輸出流複製一份到 console 和檔案。可以通過傳遞一個映射表,有選擇性地針對特定本地 rank 進行複製。優先於 redirects 設定。

get_entrypoint_name()[source][source]

取得 entry point 名稱。

如果 entrypoint 是一個函數 (例如 Callable),則返回其 __qualname__。否則,如果 entrypoint 是一個二進制檔案 (例如 str),則返回二進制檔案名稱。

class torch.distributed.elastic.agent.server.WorkerState(value)[source][source]

WorkerGroup 的狀態。

worker group 中的 worker 會以一個單元的方式改變狀態。 如果 worker group 中有一個 worker 失敗,則整個集合會被認為是失敗的

UNKNOWN - agent lost track of worker group state, unrecoverable
INIT - worker group object created not yet started
HEALTHY - workers running and healthy
UNHEALTHY - workers running and unhealthy
STOPPED - workers stopped (interrupted) by the agent
SUCCEEDED - workers finished running (exit 0)
FAILED - workers failed to successfully finish (exit !0)

worker group 從初始的 INIT 狀態開始,然後進展到 HEALTHYUNHEALTHY 狀態,最後到達終端 SUCCEEDEDFAILED 狀態。

worker group 可以被中斷並暫時置於 STOPPED 狀態,由 agent 執行。 STOPPED 狀態的 worker 計劃在不久的將來由 agent 重新啟動。 worker 被置於 STOPPED 狀態的一些範例是

  1. Worker group 故障|觀察到不健康

  2. 檢測到成員資格變更

當對 worker group 執行動作 (start, stop, rdzv, retry 等) 失敗,並導致該動作部分應用於 worker group 時,狀態將為 UNKNOWN。 通常,這發生在 agent 上狀態變更事件期間,未捕獲/未處理的異常。 agent 不應嘗試恢復處於 UNKNOWN 狀態的 worker group,最好自行終止,並允許 job manager 重試該節點。

static is_running(state)[source][source]

返回 Worker 的狀態。

傳回

如果 worker 狀態表示 worker 仍在運行 (例如,該進程存在但不一定健康),則為 True。

傳回類型

bool

class torch.distributed.elastic.agent.server.Worker(local_rank, global_rank=-1, role_rank=-1, world_size=-1, role_world_size=-1)[source][source]

一個 worker 實例。

將此與表示 worker 規範的 WorkerSpec 進行比較。 Worker 是從 WorkerSpec 創建的。 Worker 之於 WorkerSpec,就像 object 之於 class。

worker 的 idElasticAgent 的特定實現來解釋。 對於本地 agent,它可以是 worker 的 pid (int),對於遠程 agent,它可以編碼為 host:port (string)

參數
  • id (Any) – 唯一標識一個 worker (由 agent 解釋)

  • local_rank (int) – worker 的本機排序 (local rank)

  • global_rank (int) – worker 的全域排序 (global rank)

  • role_rank (int) – 在所有具有相同角色的 workers 中,worker 的排序

  • world_size (int) – workers 的數量 (全域)

  • role_world_size (int) – 具有相同角色的 workers 數量

class torch.distributed.elastic.agent.server.WorkerGroup(spec)[source][source]

一組 Worker 實例。

此類別定義了由 ElasticAgent 管理的給定 WorkerSpec 的一組 Worker 實例。worker 群組是否包含跨實例的 workers 取決於 agent 的實作。

實作

以下是由 torchelastic 提供的 agent 實作。

class torch.distributed.elastic.agent.server.local_elastic_agent.LocalElasticAgent(spec, logs_specs, start_method='spawn', exit_barrier_timeout=300, log_line_prefix_template=None)[source][source]

處理主機本機 workers 的 torchelastic.agent.server.ElasticAgent 的實作。

此 agent 部署在每個主機上,並配置為產生 n 個 workers。使用 GPU 時,n 對應於主機上可用的 GPU 數量。

即使 workers 可能進行主機間通訊,本機 agent 也不會與部署在其他主機上的其他本機 agent 通訊。worker id 被解釋為本機程序。agent 會作為一個單元啟動和停止所有 worker 程序。

傳遞給 worker 函數的 worker 函數和引數必須與 Python multiprocessing 相容。若要將 multiprocessing 資料結構傳遞給 workers,您可以在與指定的 start_method 相同的 multiprocessing 上下文中建立資料結構,並將其作為函數引數傳遞。

exit_barrier_timeout 指定等待其他 agents 完成的時間量(以秒為單位)。這充當安全網,以處理 workers 在不同時間完成的情況,以防止 agents 將早期完成的 workers 視為縮減事件。強烈建議使用者程式碼處理確保以同步方式終止 workers,而不是依賴 exit_barrier_timeout。

如果環境變數 TORCHELASTIC_ENABLE_FILE_TIMER 的值為 1 且已在 `LocalElasticAgent` 流程中定義,則可以在 `LocalElasticAgent` 中啟用基於具名管道 (named pipe) 的 watchdog。或者,可以設定另一個環境變數 `TORCHELASTIC_TIMER_FILE`,並為具名管道設定唯一的檔名。如果未設定環境變數 `TORCHELASTIC_TIMER_FILE`,則 `LocalElasticAgent` 將在內部建立唯一的檔名,並將其設定為環境變數 `TORCHELASTIC_TIMER_FILE`,並且此環境變數將傳播到 worker 流程,以允許它們連接到 `LocalElasticAgent` 使用的同一個具名管道。

日誌會寫入指定的日誌目錄。預設情況下,每個日誌行都會以 [${role_name}${local_rank}]: 作為前綴(例如,[trainer0]: foobar)。可以通過傳遞 範本字串 作為 log_line_prefix_template 引數來自訂日誌前綴。以下巨集(識別碼)在執行時會被替換:${role_name}, ${local_rank}, ${rank}。例如,若要使用全域排序而不是本機排序作為每個日誌行的前綴,請設定 log_line_prefix_template = "[${rank}]:

啟動函數範例

def trainer(args) -> str:
    return "do train"

def main():
    start_method="spawn"
    shared_queue= multiprocessing.get_context(start_method).Queue()
    spec = WorkerSpec(
                role="trainer",
                local_world_size=nproc_per_process,
                entrypoint=trainer,
                args=("foobar",),
                ...<OTHER_PARAMS...>)
    agent = LocalElasticAgent(spec, start_method)
    results = agent.run()

    if results.is_failed():
        print("trainer failed")
    else:
        print(f"rank 0 return value: {results.return_values[0]}")
        # prints -> rank 0 return value: do train

啟動二進位檔案範例

def main():
    spec = WorkerSpec(
                role="trainer",
                local_world_size=nproc_per_process,
                entrypoint="/usr/local/bin/trainer",
                args=("--trainer-args", "foobar"),
                ...<OTHER_PARAMS...>)
    agent = LocalElasticAgent(spec)
    results = agent.run()

    if not results.is_failed():
        print("binary launches do not have return values")

擴展 Agent

若要擴展 agent,您可以直接實作 `ElasticAgent,但我們建議您改為擴展 SimpleElasticAgent,它提供了大部分 scaffolding 並留給您一些特定的抽象方法來實作。

class torch.distributed.elastic.agent.server.SimpleElasticAgent(spec, exit_barrier_timeout=300)[source][source]

一個 ElasticAgent,用於管理一種特定類型的工作者角色。

一個 ElasticAgent,用於管理單個 WorkerSpec 的工作者 (WorkerGroup),例如一種特定類型的工作者角色。

_assign_worker_ranks(store, group_rank, group_world_size, spec)[source][source]

決定工作者進程的正確排名。

快速路徑:當所有工作者都具有相同的角色和世界規模時。 我們計算全域排名為 group_rank * group_world_size + local_rank。 role_world_sizeglobal_world_size 相同。 在這種情況下不使用 TCP 儲存。 僅當使用者將環境變數 TORCH_ELASTIC_WORKER_IDENTICAL 設為 1 時,才會啟用此功能。

時間複雜度:每個工作者 O(1),總體 O(1)

慢速路徑:當工作者具有不同的角色和世界規模時。 我們使用以下演算法

  1. 每個代理程式將其組態(group_rank、group_world_size、num_workers)寫入通用儲存。

  2. 排名 0 的代理程式從儲存讀取所有 role_info,並決定每個代理程式的工作者排名。

  3. 決定全域排名:工作者的全域排名是透過累積在它前面的所有工作者的 local_world_size 的總和來計算的。 出於效率考量,會為每個工作者指派一個基本全域排名,使其工作者的範圍在 [base_global_rank, base_global_rank + local_world_size) 內。

  4. 決定角色排名:角色排名是使用第 3 點中的演算法來決定的,唯一的例外是排名是根據角色名稱計算的。

  5. 排名 0 的代理程式將指派的排名寫入儲存。

  6. 每個代理程式從儲存讀取指派的排名。

時間複雜度:每個工作者 O(1),rank0 O(n),總體 O(n)

傳回類型

List[Worker]

_exit_barrier()[source][source]

定義一個屏障,使代理程式進程保持活動狀態,直到所有工作者完成。

等待 exit_barrier_timeout 秒,讓所有代理程式完成執行其本機工作者(無論是否成功)。 這可作為安全措施,防止使用者指令碼在不同時間終止。

_initialize_workers(worker_group)[source][source]

為 worker_group 啟動一組全新的工作者。

本質上,它是一個 rendezvous,後跟一個 start_workers。 在呼叫此方法之前,呼叫者應先呼叫 _stop_workers() 以停止正在執行的工作者。

樂觀地將剛啟動的工作者群組的狀態設定為 HEALTHY,並將狀態的實際監控委派給 _monitor_workers() 方法

abstract _monitor_workers(worker_group)[source][source]

檢查 worker_group 的工作者。

此函數也會傳回工作者群組的新狀態。

傳回類型

RunResult

_rendezvous(worker_group)[source][source]

為工作者規格指定的工作者執行 rendezvous。

為工作者指派新的全域排名和世界規模。 更新工作者群組的 rendezvous 儲存。

_restart_workers(worker_group)[source][source]

重新啟動 (停止、rendezvous、啟動) 群組中的所有本機工作者。

abstract _shutdown(death_sig=Signals.SIGTERM, is_restart=False)[source][source]

清除在代理程式工作期間配置的所有資源。

參數

death_sig (Signals) – 要傳送到子進程的訊號,預設為 SIGTERM

abstract _start_workers(worker_group)[source][source]

啟動 worker_group.spec.local_world_size 個 worker。

這是根據 worker group 的 worker spec 所訂。回傳一個從 local_rank 到 worker id 的映射。

傳回類型

Dict[int, Any]

abstract _stop_workers(worker_group, is_restart=False)[source][source]

停止指定 worker group 中的所有 workers。

實作者必須處理 WorkerState 定義的所有狀態中的 workers。 也就是說,必須妥善處理停止不存在的 workers、不健康的(卡住的)workers 等。

class torch.distributed.elastic.agent.server.api.RunResult(state, return_values=<factory>, failures=<factory>)[source][source]

回傳 worker 執行的結果。

Run 結果遵循「全有或全無」策略,只有當此 agent 管理的所有本地 workers 都成功完成時,run 才會成功。

如果結果成功(例如,is_failed() = False),則 return_values 欄位包含此 agent 管理的 workers 的輸出(回傳值),並依其 GLOBAL rank 映射。 也就是說,result.return_values[0] 是 global rank 0 的回傳值。

注意

return_values 僅在 worker entrypoint 是函數時才有意義。指定為 binary entrypoint 的 workers 沒有標準的回傳值,並且 return_values 欄位沒有意義,並且可能為空。

如果 is_failed() 回傳 True,則 failures 欄位包含失敗資訊,同樣地,依失敗的 worker 的 GLOBAL rank 映射。

return_valuesfailures 中的鍵是互斥的,也就是說,worker 的最終狀態只能是以下之一:成功、失敗。根據 agent 的重新啟動策略,由 agent 故意終止的 workers 不會在 return_valuesfailures 中表示。

Agent 中的 Watchdog

如果環境變數 TORCHELASTIC_ENABLE_FILE_TIMER 的值為 1 且已在 `LocalElasticAgent` 流程中定義,則可以在 `LocalElasticAgent` 中啟用基於具名管道 (named pipe) 的 watchdog。或者,可以設定另一個環境變數 `TORCHELASTIC_TIMER_FILE`,並為具名管道設定唯一的檔名。如果未設定環境變數 `TORCHELASTIC_TIMER_FILE`,則 `LocalElasticAgent` 將在內部建立唯一的檔名,並將其設定為環境變數 `TORCHELASTIC_TIMER_FILE`,並且此環境變數將傳播到 worker 流程,以允許它們連接到 `LocalElasticAgent` 使用的同一個具名管道。

健康檢查伺服器

如果環境變數 TORCHELASTIC_HEALTH_CHECK_PORT 已在 `LocalElasticAgent` 處理程序中定義,則可以在 `LocalElasticAgent` 中啟用健康檢查監控伺服器。新增健康檢查伺服器的介面,可以通過在指定的 port number 上啟動 tcp/http 伺服器來擴展。 此外,健康檢查伺服器將具有回呼,以檢查 watchdog 是否存活。

class torch.distributed.elastic.agent.server.health_check_server.HealthCheckServer(alive_callback, port, timeout)[source][source]

健康檢查監控伺服器的介面,可以通過在指定的 port 上啟動 tcp/http 伺服器來擴展。

參數
  • alive_callback (Callable[[], int]) – Callable[[], int], 回呼到 agent 的上次進度時間

  • port (int) – int, 啟動 tcp/http 伺服器的 port number

  • timeout (int) – int, 判斷 agent 是否存活/死亡的逾時秒數

start()[source][source]

Pytorch 不支援此功能,不會啟動任何健康檢查伺服器

stop()[source][source]

停止健康檢查伺服器的函數

torch.distributed.elastic.agent.server.health_check_server.create_healthcheck_server(alive_callback, port, timeout)[原始碼][原始碼]

建立健康檢查伺服器物件

傳回類型

HealthCheckServer

文件

存取 PyTorch 的完整開發者文件

檢視文件

教學

取得初學者和進階開發者的深入教學

檢視教學

資源

尋找開發資源並獲得您的問題解答

檢視資源