快捷方式

到期計時器

到期計時器與 agent 在同一個程序上設置,並從您的腳本中使用,以處理卡住的 worker。當您進入可能卡住的程式碼區塊時,您可以獲取一個到期計時器,該計時器指示計時器伺服器在程序未在自行設定的到期截止日期前釋放計時器的情況下,終止該程序。

用法

import torchelastic.timer as timer
import torchelastic.agent.server as agent

def main():
    start_method = "spawn"
    message_queue = mp.get_context(start_method).Queue()
    server = timer.LocalTimerServer(message, max_interval=0.01)
    server.start() # non-blocking

    spec = WorkerSpec(
                fn=trainer_func,
                args=(message_queue,),
                ...<OTHER_PARAMS...>)
    agent = agent.LocalElasticAgent(spec, start_method)
    agent.run()

def trainer_func(message_queue):
    timer.configure(timer.LocalTimerClient(message_queue))
    with timer.expires(after=60): # 60 second expiry
        # do some work

在上面的範例中,如果 trainer_func 花費超過 60 秒才能完成,則 worker 程序將被終止,並且 agent 將重試 worker 群組。

客戶端方法

torch.distributed.elastic.timer.configure(timer_client)[原始碼][原始碼]

設定一個計時器客戶端。必須在使用 expires 之前呼叫。

torch.distributed.elastic.timer.expires(after, scope=None, client=None)[原始碼][原始碼]

取得一個倒數計時器,它會在從現在開始的 after 秒後到期,除非它所包裝的程式碼區塊在此時間範圍內完成。當計時器到期時,此 worker 有資格被收割(reaped)。「收割」的確切含義取決於客戶端的實作。在大多數情況下,收割意味著終止 worker 處理程序。請注意,worker 並**不**保證會在正好 time.now() + after 時被收割,而是 worker「有資格」被收割,並且客戶端與之通訊的 TimerServer 將最終決定何時以及如何收割具有到期計時器的 worker。

用法

torch.distributed.elastic.timer.configure(LocalTimerClient())
with expires(after=10):
    torch.distributed.all_reduce(...)

伺服器/客戶端實作

以下是 torchelastic 提供的計時器伺服器和客戶端配對。

注意

計時器伺服器和客戶端必須始終成對實作和使用,因為伺服器和客戶端之間存在訊息傳遞協定。

以下是基於 multiprocess.Queue 實作的一對計時器伺服器和客戶端。

class torch.distributed.elastic.timer.LocalTimerServer(mp_queue, max_interval=60, daemon=True)[原始碼][原始碼]

LocalTimerClient 搭配使用的伺服器。客戶端預期是正在執行此伺服器的父處理程序的子處理程序。作業中的每個主機預期在本機啟動自己的計時器伺服器,並且每個伺服器實例管理本機 worker 的計時器(在本機同一主機上的處理程序上執行)。

class torch.distributed.elastic.timer.LocalTimerClient(mp_queue)[原始碼][原始碼]

LocalTimerServer 的客戶端。此客戶端旨在與正在執行 LocalTimerServer 的同一主機上使用,並使用 pid 來唯一識別 worker。這在每個主機上的每個 GPU 產生一個子處理程序 (trainer) 的情況下特別有用。

以下是另一對基於具名管道實作的計時器伺服器和客戶端。

class torch.distributed.elastic.timer.FileTimerServer(file_path, run_id, max_interval=10, daemon=True, log_event=None)[原始碼][原始碼]

FileTimerClient 搭配使用的伺服器。客戶端預期在與正在執行此伺服器的處理程序相同的主機上執行。作業中的每個主機預期在本機啟動自己的計時器伺服器,並且每個伺服器實例管理本機 worker 的計時器(在本機同一主機上的處理程序上執行)。

參數
  • file_path (str) – str,要建立的 FIFO 特殊檔案的路徑。

  • max_interval (float) – float,每個 watchdog 迴圈的最大間隔(秒)。

  • daemon (bool) – bool,是否在守護程序模式下執行 watchdog 執行緒。守護程序執行緒不會阻止處理程序停止。

  • log_event (Optional[Callable[[str, Optional[FileTimerRequest]], None]]) – Callable[[Dict[str, str]], None],可選的回呼函數,用於以 JSON 格式記錄事件。

class torch.distributed.elastic.timer.FileTimerClient(file_path, signal=Signals.SIGKILL)[source][source]

FileTimerServer 的客戶端。此客戶端旨在與執行 FileTimerServer 的主機上使用,並使用 pid 來唯一識別工作進程 (worker)。此客戶端使用具名管道 (named_pipe) 將計時器請求傳送到 FileTimerServer。 此客戶端是生產者,而 FileTimerServer 是消費者。多個客戶端可以與同一個 FileTimerServer 一起工作。

參數
  • file_path (str) – 字串,FIFO 特殊檔案的路徑。FileTimerServer 必須透過呼叫 os.mkfifo() 建立它。

  • signal – signal,用於終止進程的訊號。使用負數或零訊號將不會終止進程。

編寫自訂計時器伺服器/客戶端

若要編寫您自己的計時器伺服器和客戶端,請擴展用於伺服器的 torch.distributed.elastic.timer.TimerServer 和用於客戶端的 torch.distributed.elastic.timer.TimerClientTimerRequest 物件用於在伺服器和客戶端之間傳遞訊息。

class torch.distributed.elastic.timer.TimerRequest(worker_id, scope_id, expiration_time)[source][source]

資料物件,表示在 TimerClientTimerServer 之間使用的倒數計時器的取得和釋放。負數的 expiration_time 應被解釋為「釋放」請求。

注意

worker_id 的類型是實作特定的。它是 TimerServer 和 TimerClient 實作中用於唯一識別工作進程 (worker) 的任何內容。

class torch.distributed.elastic.timer.TimerServer(request_queue, max_interval, daemon=True)[source][source]

實體,用於監視作用中的計時器並及時使其過期。此伺服器負責收割已過期的計時器的工作進程 (worker)。

abstract clear_timers(worker_ids)[source][source]

清除給定 worker_ids 的所有計時器。

abstract get_expired_timers(deadline)[source][source]

傳回每個 worker_id 的所有已過期計時器。過期的計時器是 expiration_time 小於或等於所提供截止時間的計時器。

回傳類型

Dict[str, List[TimerRequest]]

abstract register_timers(timer_requests)[source][source]

處理傳入的計時器請求,並將其註冊到伺服器。計時器請求可以是取得計時器或釋放計時器的請求。expiration_time 為負數的計時器請求應被解釋為釋放計時器的請求。

class torch.distributed.elastic.timer.TimerClient[source][source]

客戶端函式庫,用於透過與 TimerServer 通訊來取得和釋放倒數計時器。

abstract acquire(scope_id, expiration_time)[source][source]

給定 scope_id 和 expiration_time,為持有此客戶端物件的工作者取得計時器。通常會向 TimerServer 註冊計時器。

abstract release(scope_id)[source][source]

釋放此客戶端代表的工作者上的 scope_id 的計時器。在呼叫此方法後,scope 上的倒數計時器將不再生效。

除錯資訊記錄

torch.distributed.elastic.timer.debug_info_logging.log_debug_info_for_expired_timers(run_id, expired_timers)[source][source]

文件

取得 PyTorch 的完整開發人員文件

查看文件

教學

取得針對初學者和進階開發人員的深入教學

查看教學

資源

尋找開發資源並取得問題解答

查看資源