Multiprocessing¶
啟動和管理 n
個 worker 子程序的函式庫,這些子程序由函式或二進位檔指定。
對於函式,它使用 torch.multiprocessing
(因此使用 python multiprocessing
)來產生/fork worker 程序。對於二進位檔,它使用 python subprocessing.Popen
來建立 worker 程序。
用法 1:將兩個訓練器作為函式啟動
from torch.distributed.elastic.multiprocessing import Std, start_processes
def trainer(a, b, c):
pass # train
# runs two trainers
# LOCAL_RANK=0 trainer(1,2,3)
# LOCAL_RANK=1 trainer(4,5,6)
ctx = start_processes(
name="trainer",
entrypoint=trainer,
args={0: (1,2,3), 1: (4,5,6)},
envs={0: {"LOCAL_RANK": 0}, 1: {"LOCAL_RANK": 1}},
log_dir="/tmp/foobar",
redirects=Std.ALL, # write all worker stdout/stderr to a log file
tee={0: Std.ERR}, # tee only local rank 0's stderr to console
)
# waits for all copies of trainer to finish
ctx.wait()
用法 2:將 2 個 echo worker 作為二進位檔啟動
# same as invoking
# echo hello
# echo world > stdout.log
ctx = start_processes(
name="echo"
entrypoint="echo",
log_dir="/tmp/foobar",
args={0: "hello", 1: "world"},
redirects={1: Std.OUT},
)
如同 torch.multiprocessing
,函式 start_processes()
的回傳值是一個進程上下文 (api.PContext
)。如果啟動了一個函式,則會回傳一個 api.MultiprocessContext
;如果啟動了一個二進位檔,則會回傳一個 api.SubprocessContext
。兩者都是父類別 api.PContext
的特定實作。
啟動多個 Worker¶
- torch.distributed.elastic.multiprocessing.start_processes(name, entrypoint, args, envs, logs_specs, log_line_prefixes=None, start_method='spawn')[來源][來源]¶
使用提供的選項啟動
entrypoint
進程的n
個副本。entrypoint
可以是一個Callable
(函式) 或一個str
(二進位檔)。副本的數量由args
和envs
參數的條目數量決定,它們需要具有相同的鍵集合。args
和env
參數是要傳遞給 entrypoint 的引數和環境變數,並依據副本索引(本地排名)進行映射。所有本地排名都必須被考慮在內。也就是說,鍵集合應該是{0,1,...,(nprocs-1)}
。注意
當
entrypoint
是一個二進位檔 (str
) 時,args
只能是字串。如果給出了任何其他類型,則會將其轉換為字串表示形式 (例如str(arg1)
)。此外,只有當主函式使用torch.distributed.elastic.multiprocessing.errors.record
進行註解時,二進位檔失敗才會寫入error.json
錯誤檔案。對於函式啟動,這預設會執行,不需要手動使用@record
註解進行註解。redirects
和tee
是位元遮罩,用於指定要將哪些 std stream 重新導向到log_dir
中的日誌檔案。有效的遮罩值定義在Std
中。要僅重新導向/複製某些本地排名,請將redirects
作為一個映射傳遞,其中鍵作為本地排名來指定重新導向行為。任何遺失的本地排名都預設為Std.NONE
。tee
的作用類似於 unix "tee" 命令,它會將輸出重新導向 + 列印到控制台。要避免 worker 的 stdout/stderr 列印到控制台,請使用redirects
參數。對於每個進程,
log_dir
將包含{local_rank}/error.json
: 如果進程失敗,則包含錯誤資訊的檔案{local_rank}/stdout.json
: 如果redirect & STDOUT == STDOUT
{local_rank}/stderr.json
: 如果redirect & STDERR == STDERR
注意
預期
log_dir
存在、為空且是一個目錄。範例
log_dir = "/tmp/test" # ok; two copies of foo: foo("bar0"), foo("bar1") start_processes( name="trainer", entrypoint=foo, args:{0:("bar0",), 1:("bar1",), envs:{0:{}, 1:{}}, log_dir=log_dir ) # invalid; envs missing for local rank 1 start_processes( name="trainer", entrypoint=foo, args:{0:("bar0",), 1:("bar1",), envs:{0:{}}, log_dir=log_dir ) # ok; two copies of /usr/bin/touch: touch file1, touch file2 start_processes( name="trainer", entrypoint="/usr/bin/touch", args:{0:("file1",), 1:("file2",), envs:{0:{}, 1:{}}, log_dir=log_dir ) # caution; arguments casted to string, runs: # echo "1" "2" "3" and echo "[1, 2, 3]" start_processes( name="trainer", entrypoint="/usr/bin/echo", args:{0:(1,2,3), 1:([1,2,3],), envs:{0:{}, 1:{}}, log_dir=log_dir )
- 參數
name (str) – 一個人類可讀的簡短名稱,用於描述進程是什麼(在複製 stdout/stderr 輸出時用作標頭)
entrypoint (Union[Callable, str]) – 可以是一個
Callable
(函式) 或cmd
(二進位檔)log_dir – 用於寫入日誌檔案的目錄
start_method (str) – multiprocessing 的啟動方法 (spawn, fork, forkserver) 會被二進位檔忽略
redirects – 要將哪些 std stream 重新導向到日誌檔案
tee – 要將哪些 std stream 重新導向 + 列印到控制台
local_ranks_filter – 要將哪些排名的日誌列印到控制台
- 回傳類型
進程上下文¶
- class torch.distributed.elastic.multiprocessing.api.PContext(name, entrypoint, args, envs, logs_specs, log_line_prefixes=None)[source][source]¶
標準化透過不同機制啟動的一組進程的操作之基底類別。
名稱
PContext
是刻意與torch.multiprocessing.ProcessContext
區分。警告
stdouts 和 stderrs 應**永遠**是 tee_stdouts 和 tee_stderrs (分別) 的超集合,這是因為 tee 的實作方式為重新導向 + tail -f <stdout/stderr.log>
- class torch.distributed.elastic.multiprocessing.api.MultiprocessContext(name, entrypoint, args, envs, start_method, logs_specs, log_line_prefixes=None)[source][source]¶
保存作為函數調用的 worker 進程的
PContext
。
- class torch.distributed.elastic.multiprocessing.api.SubprocessContext(name, entrypoint, args, envs, logs_specs, log_line_prefixes=None)[source][source]¶
保存作為二進制文件調用的 worker 進程的
PContext
。
- class torch.distributed.elastic.multiprocessing.api.RunProcsResult(return_values=<factory>, failures=<factory>, stdouts=<factory>, stderrs=<factory>)[source][source]¶
使用
start_processes()
啟動的進程完成運行的結果。 由PContext
返回。請注意以下事項
所有欄位都按本地 rank 進行映射
return_values
- 僅針對函數 (而非二進制文件) 填入。stdouts
- stdout.log 的路徑 (如果沒有重新導向,則為空字串)stderrs
- stderr.log 的路徑 (如果沒有重新導向,則為空字串)
- class torch.distributed.elastic.multiprocessing.api.DefaultLogsSpecs(log_dir=None, redirects=Std.NONE, tee=Std.NONE, local_ranks_filter=None)[source][source]¶
預設 LogsSpecs 實作
如果 log_dir 不存在,將會建立它
為每次嘗試和 rank 產生巢狀資料夾。
- class torch.distributed.elastic.multiprocessing.api.LogsDest(stdouts=<factory>, stderrs=<factory>, tee_stdouts=<factory>, tee_stderrs=<factory>, error_files=<factory>)[source][source]¶
針對每種類型的日誌,保存本地 rank id 到檔案路徑的映射。