torchx.runner¶
執行器允許您在其中一個支援的 排程器 上將元件作為獨立作業執行。執行器會取得一個 specs.AppDef
物件,這是使用一組使用者提供的參數評估元件函數的結果,以及排程器名稱和排程器參數(又稱 runcfg
或 runopts
),並將元件作為作業提交(請參閱下圖)。

執行器函數¶
- torchx.runner.get_runner(name: Optional[str] = None, component_defaults: Optional[Dict[str, Dict[str, str]]] = None, **scheduler_params: Any) Runner [來源]¶
建構和取得 Runner 物件的便捷方法。用法
with get_runner() as runner: app_handle = runner.run(component(args), scheduler="kubernetes", runcfg) print(runner.status(app_handle))
或者,
runner = get_runner() try: app_handle = runner.run(component(args), scheduler="kubernetes", runcfg) print(runner.status(app_handle)) finally: runner.close()
- 參數::
name – 人類可讀的名稱,將作為所有啟動作業的一部分包含在內。
scheduler_params – 將傳遞給所有可用排程器的建構函數的額外參數。
執行器類別¶
- class torchx.runner.Runner(name: str, scheduler_factories: Dict[str, SchedulerFactory], component_defaults: Optional[Dict[str, Dict[str, str]]] = None, scheduler_params: Optional[Dict[str, object]] = None)[來源]¶
TorchX 個別元件執行器。具有讓使用者對
AppDefs
採取行動的方法。Runner
將快取已啟動應用程式的相關資訊(如果它們是在本地啟動的),否則將由特定的排程器實作來處理。- cancel(app_handle: str) None [原始碼]¶
停止應用程式,有效地指示排程器取消作業。如果應用程式不存在,則不執行任何動作。
備註
此方法會在取消請求提交至排程器後立即返回。應用程式將處於
RUNNING
狀態,直到排程器實際終止作業為止。如果排程器成功中斷作業並終止它,則最終狀態將為CANCELLED
,否則將為FAILED
。
- close() None [原始碼]¶
關閉此執行器並釋放/清除任何已配置的資源。遞迴呼叫所有排程器上的
close()
方法。在此方法被呼叫到執行器後,該執行器物件將被視為無效,並且在該執行器物件以及與此執行器相關聯的排程器上呼叫的任何方法都具有未定義的行為。可以在同一個執行器物件上多次呼叫此方法。
- describe(app_handle: str) Optional[AppDef] [原始碼]¶
根據應用程式控點重建應用程式(盡可能)。請注意,重建的應用程式可能不是透過執行 API 提交的完整應用程式。可以重建多少應用程式取決於排程器。
- 返回:
AppDef 或 None(如果應用程式不再存在,或者排程器不支援描述應用程式控點)
- dryrun(app: AppDef, scheduler: str, cfg: Optional[Mapping[str, Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]]] = None, workspace: Optional[str] = None, parent_run_id: Optional[str] = None) AppDryRunInfo [原始碼]¶
使用提供的執行設定,在給定的排程器上執行應用程式預先測試。實際上不會提交應用程式,而是返回原本會提交的內容。返回的
AppDryRunInfo
格式良好,可以直接列印或記錄。用法
dryrun_info = session.dryrun(app, scheduler="local", cfg) print(dryrun_info)
- dryrun_component(component: str, component_args: List[str], scheduler: str, cfg: Optional[Mapping[str, Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]]] = None, workspace: Optional[str] = None, parent_run_id: Optional[str] = None) AppDryRunInfo [source]¶
run_component()
的預執行版本。實際上不會執行組件,只會返回「將會」執行的內容。
- list(scheduler: str) List[ListAppResponse] [source]¶
對於在排程器上啟動的應用程式,此 API 會返回 ListAppResponse 物件的清單,每個物件都包含應用程式 ID、應用程式控點及其狀態。注意:此 API 處於原型階段,可能會有所變更。
- log_lines(app_handle: str, role_name: str, k: int = 0, regex: Optional[str] = None, since: Optional[datetime] = None, until: Optional[datetime] = None, should_tail: bool = False, streams: Optional[Stream] = None) Iterable[str] [source]¶
傳回指定作業容器的日誌行迭代器。
備註
k
是節點(主機)ID,而不是rank
。since
和until
不一定始終有效(取決於排程器)。
警告
傳回迭代器的語義和保證高度取決於排程器。有關此日誌迭代器的高階語義,請參閱
torchx.specs.api.Scheduler.log_iter
。因此,強烈建議不要使用此方法來產生要傳遞給下游函數/依賴項的輸出。此方法不保證會傳回 100% 的日誌行。如果排程器已完全或部分清除應用程式的日誌記錄,則此方法完全可以傳回無或部分日誌行。傳回的行將包含空白字元,例如
\n
或\r
。輸出行時,應確保避免添加額外的換行字元。用法
app_handle = session.run(app, scheduler="local", cfg=Dict[str, ConfigValue]()) print("== trainer node 0 logs ==") for line in session.log_lines(app_handle, "trainer", k=0): # for prints newlines will already be present in the line print(line, end="") # when writing to a file nothing extra is necessary f.write(line)
不建議使用的反模式
# DO NOT DO THIS! # parses accuracy metric from log and reports it for this experiment run accuracy = -1 for line in session.log_lines(app_handle, "trainer", k=0): if matches_regex(line, "final model_accuracy:[0-9]*"): accuracy = parse_accuracy(line) break report(experiment_name, accuracy)
- 參數::
app_handle - 應用程式控點
role_name - 應用程式中的角色(例如訓練器)
k - 要擷取其日誌的角色的第 k 個副本
regex - 可選的正規表示式過濾器,如果保留為空,則傳回所有行
since - 基於日期時間的開始游標。如果保留為空,則從第一個日誌行(作業開始)開始。
until - 基於日期時間的結束游標。如果保留為空,則會追蹤日誌輸出,直到作業完成且已耗用所有日誌行為止。
- 返回:
指定應用程式的角色第 k 個副本的迭代器。
- 引發:
UnknownAppException - 如果應用程式不存在於排程器中
- run(app: AppDef, scheduler: str, cfg: Optional[Mapping[str, Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]]] = None, workspace: Optional[str] = None, parent_run_id: Optional[str] = None) str [source]¶
以指定模式執行指定的應用程式。
備註
Runner
的子類別應該實作schedule
方法,而不是直接覆寫這個方法。- 返回:
用於在應用程式上呼叫其他動作 API 的應用程式控點。
- run_component(component: str, component_args: List[str], scheduler: str, cfg: Optional[Mapping[str, Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]]] = None, workspace: Optional[str] = None, parent_run_id: Optional[str] = None) str [source]¶
執行一個元件。
component
具有以下解析順序(由高到低)- 使用者註冊的元件。使用者可以透過以下方式註冊元件
https://packaging.python.org/specifications/entry-points/。方法會在
torchx.components
群組中尋找入口點。
- 相對於 torchx.components 的內建元件。元件的路徑應該
是相對於 torchx.components 的模組名稱和函式名稱,格式為:
$module.$function
。
- 基於檔案的元件,格式為:
$FILE_PATH:FUNCTION_NAME
。支援相對和 絕對路徑。
- 基於檔案的元件,格式為:
用法
# resolved to torchx.components.distributed.ddp() runner.run_component("distributed.ddp", ...) # resolved to my_component() function in ~/home/components.py runner.run_component("~/home/components.py:my_component", ...)
- 返回:
用於在應用程式上呼叫其他動作 API 的應用程式控點
- 引發:
ComponentValidationException – 如果元件無效。
ComponentNotFoundException – 如果
component_path
無法解析。
- schedule(dryrun_info: AppDryRunInfo) str [source]¶
實際上根據給定的預先執行資訊執行應用程式。當需要覆寫排程器請求中無法從物件 API 設定的參數時很有用。
警告
請謹慎使用,因為濫用此方法來覆寫原始排程器請求中的許多參數,可能會導致您使用 TorchX 的方式長期來看不符合規範。此方法旨在讓使用者在短期內能夠嘗試某些特定於排程器的功能,而不必等到 TorchX 在其 API 中公開排程器功能。
備註
建議
Session
的子類別實作這個方法,而不是直接實作run
方法。用法
dryrun_info = session.dryrun(app, scheduler="default", cfg) # overwrite parameter "foo" to "bar" dryrun_info.request.foo = "bar" app_handle = session.submit(dryrun_info)
- scheduler_run_opts(scheduler: str) runopts [原始碼]¶
返回支援的排程器後端的
runopts
。用法
local_runopts = session.scheduler_run_opts("local_cwd") print("local scheduler run options: {local_runopts}")
- 返回:
指定排程器類型的
runopts
。
- status(app_handle: str) Optional[AppStatus] [原始碼]¶
- 返回:
應用程式的狀態,如果應用程式不再存在(例如,過去已停止並從排程器的後端移除),則為
None
。