捷徑

torchx.specs

此處包含 TorchX AppDef 和相關元件定義。元件使用這些定義來定義應用程式,然後可以透過 TorchX 排程器或管道配接器啟動這些應用程式。

AppDef

class torchx.specs.AppDef(name: str, roles: ~typing.List[~torchx.specs.api.Role] = <factory>, metadata: ~typing.Dict[str, str] = <factory>)[來源]

表示由多個 角色 和中繼資料組成的分散式應用程式。包含驅動程式將此應用程式提交至排程器所需的必要資訊。

參數
  • name – 應用程式名稱

  • roles – 角色清單

  • metadata – 應用程式的中繼資料(中繼資料的處理方式取決於排程器)

Role

class torchx.specs.Role(name: str, image: str, min_replicas: ~typing.Optional[int] = None, base_image: ~typing.Optional[str] = None, entrypoint: str = '<MISSING>', args: ~typing.List[str] = <factory>, env: ~typing.Dict[str, str] = <factory>, num_replicas: int = 1, max_retries: int = 0, retry_policy: ~torchx.specs.api.RetryPolicy = RetryPolicy.APPLICATION, resource: ~torchx.specs.api.Resource = <factory>, port_map: ~typing.Dict[str, int] = <factory>, metadata: ~typing.Dict[str, ~typing.Any] = <factory>, mounts: ~typing.List[~typing.Union[~torchx.specs.api.BindMount, ~torchx.specs.api.VolumeMount, ~torchx.specs.api.DeviceMount]] = <factory>)[來源]

一組在 AppDef 中執行特定任務的節點。範例

  1. 分散式資料平行應用程式 - 由單一角色(訓練器)組成。

  2. 具有參數伺服器的應用程式 - 由多個角色(訓練器、ps)組成。

注意

映像檔 是一個軟體套件,安裝在由排程器排定的容器上。排程器上的容器決定了映像檔的實際內容。映像檔可以像 tarball 一樣簡單,也可以對應到 Docker 映像檔。排程器通常知道如何「提取」給定映像檔名稱(字串)的映像檔,該名稱可以是簡單的名稱(例如 Docker 映像檔)或網址,例如 s3://path/my_image.tar)。

用法

trainer = Role(name="trainer",
               image = "pytorch/torch:1",
               entrypoint = "my_trainer.py"
               args = ["--arg", "foo", ENV_VAR="FOOBAR"],
               num_replicas = 4,
               resource = Resource(cpu=1, gpu=1, memMB=500),
               port_map={"tcp_store":8080, "tensorboard": 8081},
               metadata={"local_cwd.property", value})
參數
  • name – 角色名稱

  • image – 安裝在容器上的軟體套件。

  • entrypoint –(在容器內)呼叫角色的命令

  • args – 傳遞給 entrypoint 命令的命令列參數

  • env – 環境變數映射

  • num_replicas – 要執行的容器複本數

  • min_replicas – 啟動作業所需的複本最小數量。設定後,作業大小會根據叢集資源和策略,在 min_replicas 和 num_replicas 之間自動調整。如果排程器不支援自動調整規模,則會忽略此欄位,並且作業大小將為 num_replicas。實驗性:對於 HOT_SPARE 重啟策略,此欄位用於指示作業執行所需的仲裁數。

  • max_retries – 放棄前的最大重試次數

  • retry_policy – 複本失敗時的重試行為

  • resource – 角色的資源需求。角色應由排程器在 num_replicas 個容器上排程,每個容器都應至少具有 resource 個保證。

  • port_map – 角色的連接埠對應。金鑰是連接埠的唯一識別碼,例如「tensorboard」:9090

  • metadata – 與角色相關聯的自由格式資訊,例如排程器特定資料。金鑰應遵循以下模式: $scheduler.$key

  • mounts – 機器上的掛載清單

pre_proc(scheduler: str, dryrun_info: AppDryRunInfo) AppDryRunInfo[source]

根據特定角色的配置修改排程器請求。在排程器 submit_dryrun 期間,會針對每個角色呼叫此方法。如果有多個角色,則會按照 AppDef.roles 清單定義的順序針對每個角色呼叫此方法。

class torchx.specs.RetryPolicy(value)[source]

定義 AppDefRoles 的重試策略。該策略定義了當角色複本遇到失敗時的行為

  1. 不成功的(非零)退出代碼

  2. 硬體/主機當機

  3. 搶佔

  4. 驅逐

注意

並非所有排程器都支援所有重試策略。但是,所有排程器都必須支援 RetryPolicy.APPLICATION。有關排程器支援的重試策略和行為注意事項(如有)的詳細資訊,請參閱排程器的文件。

  1. REPLICA:替換複本實例。倖存的複本不會受到影響。

    dist.ddp 元件一起使用,讓 torchelastic 協調重新啟動和成員資格變更。否則,將由應用程式自行處理失敗的複本離開和替換複本的加入。

  2. APPLICATION:重新啟動整個應用程式。

  3. HOT_SPARE:只要仲裁數 (min_replicas)

    沒有因為使用額外主機作為備用主機而受到侵犯,就會重新啟動角色的複本。它實際上不支援彈性,只是使用 num_replicas 和 min_replicas 之間的差值作為備用主機(實驗性)。

資源

class torchx.specs.Resource(cpu: int, gpu: int, memMB: int, capabilities: ~typing.Dict[str, ~typing.Any] = <factory>, devices: ~typing.Dict[str, int] = <factory>)[source]

表示 Role 的資源需求。

參數
  • cpu – 邏輯 CPU 核心的數量。CPU 核心的定義取決於排程器。有關邏輯 CPU 核心如何對應到實體核心和執行緒的資訊,請參閱您的排程器文件。

  • gpu – GPU 的數量

  • memMB – RAM 的 MB 數

  • capabilities – 其他硬體規格(由排程器解釋)

  • devices – 具有其數量的已命名裝置清單

注意:您應該優先使用 named_resources,而不是直接指定原始資源需求。

static copy(original: Resource, **capabilities: Any) Resource[source]

複製資源並套用新的功能。如果原始資源和參數中存在相同的功能,則將使用參數中的功能。

torchx.specs.resource(cpu: Optional[int] = None, gpu: Optional[int] = None, memMB: Optional[int] = None, h: Optional[str] = None) Resource[source]

一種便捷方法,可以用於從原始資源規格(cpu、gpu、memMB)或已註冊的命名資源(h)建立 Resource 物件。請注意,(cpu、gpu、memMB)與 h 互斥,如果指定了 h,則優先使用 h

如果指定了 h,則會使用它從已註冊的命名資源清單中查詢資源規格。請參閱註冊命名資源

否則,將從原始資源規範中創建一個 Resource 物件。

範例

resource(cpu=1) # returns Resource(cpu=1)
resource(named_resource="foobar") # returns registered named resource "foo"
resource(cpu=1, named_resource="foobar") # returns registered named resource "foo" (cpu=1 ignored)
resource() # returns default resource values
resource(cpu=None, gpu=None, memMB=None) # throws
torchx.specs.get_named_resources(res: str) Resource[原始碼]

根據透過 entrypoints.txt 註冊的字串定義獲取資源物件。

TorchX 实现了 named_resource 注册机制,该机制包含以下步骤

  1. 創建一個模組並定義您的資源檢索函數

# my_module.resources
from typing import Dict
from torchx.specs import Resource

def gpu_x_1() -> Dict[str, Resource]:
    return Resource(cpu=2, memMB=64 * 1024, gpu = 2)
  1. 在 entrypoints 區段中註冊資源檢索

[torchx.named_resources]
gpu_x_1 = my_module.resources:gpu_x_1

gpu_x_1 可以作為字串參數用於此函數

from torchx.specs import named_resources
resource = named_resources["gpu_x_1"]

AWS 命名資源

torchx.specs.named_resources_aws 包含表示對應 AWS 執行個體類型的資源定義,這些執行個體類型取自 https://aws.amazon.com/ec2/instance-types/。安裝 torchx 庫後,資源會透過 entrypoints 公開。映射儲存在 setup.py 檔案中。

命名資源目前沒有指定 AWS 執行個體類型的功能,僅表示記憶體、CPU 和 GPU 數量中的等效資源。

注意

這些資源定義在未來可能會發生變化。預計每個用戶都將管理自己的資源。請遵循 https://pytorch.dev.org.tw/torchx/latest/specs.html#torchx.specs.get_named_resources 來設置命名資源。

用法

from torchx.specs import named_resources
print(named_resources["aws_t3.medium"])
print(named_resources["aws_m5.2xlarge"])
print(named_resources["aws_p3.2xlarge"])
print(named_resources["aws_p3.8xlarge"])
torchx.specs.named_resources_aws.aws_m5_2xlarge() Resource[原始碼]
torchx.specs.named_resources_aws.aws_p3_2xlarge() Resource[原始碼]
torchx.specs.named_resources_aws.aws_p3_8xlarge() Resource[原始碼]
torchx.specs.named_resources_aws.aws_t3_medium() Resource[原始碼]

巨集

class torchx.specs.macros[原始碼]

定義可在 Role.args 元素的值和 Role.env 的值中使用的巨集。巨集將在運行時替換為其實際值。

警告

除了上面提到的欄位之外,在 Role 的欄位中使用的巨集不會被替換。

可用的巨集

  1. img_root - 已提取的 container.image 的根目錄

  2. app_id - 由排程器分配的應用程式 ID

  3. replica_id - 角色副本的每個執行個體的唯一 ID,

    例如,具有 3 個副本的角色可以將 0、1、2 作為副本 ID。請注意,當容器失敗並被替換時,新容器將具有與其替換的容器相同的 replica_id。例如,如果節點 1 失敗並被排程器替換,則替換節點也將具有 replica_id=1

範例

# runs: hello_world.py --app_id ${app_id}
trainer = Role(
           name="trainer",
           entrypoint="hello_world.py",
           args=["--app_id", macros.app_id],
           env={"IMAGE_ROOT_DIR": macros.img_root})
app = AppDef("train_app", roles=[trainer])
app_handle = session.run(app, scheduler="local_docker", cfg={})
class Values(img_root: str, app_id: str, replica_id: str, rank0_env: str, base_img_root: str = 'DEPRECATED')[原始碼]
apply(role: Role) Role[原始碼]

apply 會將值應用於指定角色的副本並返回它。

substitute(arg: str) str[原始碼]

substitute 會將值應用於模板參數。

執行組態

class torchx.specs.runopts[原始碼]

保存已接受的排程器執行組態鍵、預設值(如果有的話)和說明訊息字串。這些選項由 Scheduler 提供,並在 Session.run 中針對用戶提供的執行組態進行驗證。允許 None 預設值。必要的選項不能具有非 None 的預設值。

重要

此類別沒有訪問器,因為它旨在由 Scheduler.run_config_options 構造和返回,並作為“說明”工具或異常訊息的一部分列印出來。

用法

opts = runopts()

opts.add("run_as_user", type_=str, help="user to run the job as")
opts.add("cluster_id", type_=int, help="cluster to submit the job", required=True)
opts.add("priority", type_=float, default=0.5, help="job priority")
opts.add("preemptible", type_=bool, default=False, help="is the job preemptible")

# invalid
opts.add("illegal", default=10, required=True)
opts.add("bad_type", type=str, default=10)

opts.check(cfg)
print(opts)
add(cfg_key: str, type_: Type[Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]], help: str, default: Optional[Union[str, int, float, bool, List[str], Dict[str, str]]] = None, required: bool = False) None[source]

使用指定的說明字串和 default 值(如果有的話)新增 config 選項。如果未指定 default,則此選項為必填選項。

cfg_from_str(cfg_str: str) Dict[str, Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]][source]

從字串字面值解析排程器 cfg,並返回一個 cfg 映射,其中 cfg 值已根據此 runopts 物件的指定轉換為適當的類型。未知的鍵將被忽略,並且不會返回到結果映射中。

注意

resolve 方法不同,此方法不會解析預設選項,也不會檢查給定的 cfg_str 中是否實際存在所需的選項。此方法旨在在呼叫 resolve() 之前呼叫,當輸入是字串編碼的運行 cfg 時。也就是說,要完全解析 cfg,請呼叫 opt.resolve(opt.cfg_from_str(cfg_literal))

如果 cfg_str 是一個空字串,則返回一個空的 cfg。否則,預計至少有一個由 "="(等於)分隔的鍵值對。

可以使用 ","(逗號)或 ";"(分號)分隔多個鍵值對。

CfgVal 允許使用 List 類型的原始值,可以使用 ","";"(分號)分隔。由於相同的定界符用於分隔 cfg 鍵值對,因此此方法將最後(尾部)的 ","";" 解釋為鍵值對之間的定界符。請參閱下面的範例。

範例

opts = runopts()
opts.add("FOO", type_=List[str], default=["a"], help="an optional list option")
opts.add("BAR", type_=str, required=True, help="a required str option")

# required and default options not checked
# method returns strictly parsed cfg from the cfg literal string
opts.cfg_from_str("") == {}

# however, unknown options are ignored
# since the value type is unknown hence cannot cast to the correct type
opts.cfg_from_str("UNKNOWN=VALUE") == {}

opts.cfg_from_str("FOO=v1") == {"FOO": "v1"}

opts.cfg_from_str("FOO=v1,v2") == {"FOO": ["v1", "v2"]}
opts.cfg_from_str("FOO=v1;v2") == {"FOO": ["v1", "v2"]}

opts.cfg_from_str("FOO=v1,v2,BAR=v3") == {"FOO": ["v1", "v2"], "BAR": "v3"}
opts.cfg_from_str("FOO=v1;v2,BAR=v3") == {"FOO": ["v1", "v2"], "BAR": "v3"}
opts.cfg_from_str("FOO=v1;v2;BAR=v3") == {"FOO": ["v1", "v2"], "BAR": "v3"}
get(name: str) Optional[runopt][原始碼]

如果已註冊選項,則返回選項,否則返回 None

static is_type(obj: Optional[Union[str, int, float, bool, List[str], Dict[str, str]]], tp: Type[Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]]) bool[原始碼]

如果 objtp 類型,則返回 True。類似於 isinstance() 但支援 tp = List[str],因此可以用於驗證 ConfigValue。

resolve(cfg: Mapping[str, Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]]) Dict[str, Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]][原始碼]

根據此 runopts 檢查給定的配置,如果未設置,則設置預設配置。

注意

此運行選項未知的額外配置將被忽略。

執行狀態

class torchx.specs.AppStatus(state: ~torchx.specs.api.AppState, num_restarts: int = 0, msg: str = '', structured_error_msg: str = '<NONE>', ui_url: ~typing.Optional[str] = None, roles: ~typing.List[~torchx.specs.api.RoleStatus] = <factory>)[原始碼]

AppDef 的執行階段狀態。排程器可以返回任意文字訊息(msg 欄位)。如果發生任何錯誤,排程器可以使用 json 回應填充 structured_error_msg

replicas 表示作業中副本的狀態。如果作業以多次重試執行,則參數將包含最近一次重試的狀態。注意:如果先前的重試失敗,但最近一次重試成功或正在進行中,則 replicas 將不會包含發生的錯誤。

format(filter_roles: Optional[List[str]] = None) str[原始碼]
格式化應用程式狀態的日誌。應用程式狀態包括
  1. 狀態:應用程式的狀態。

  2. 重新啟動次數:應用程式重新啟動的次數。

  3. 角色:角色清單。

  4. 訊息:排程器返回的任意文字訊息。

  5. 結構化錯誤訊息:Json 回應錯誤訊息。

  6. 使用者介面網址:應用程式網址

raise_for_status() None[原始碼]

如果狀態不是 SUCCEEDED,則 raise_for_status 將引發 AppStatusError。

class torchx.specs.AppState(value)[原始碼]

應用程式的狀態。應用程式從初始 UNSUBMITTED 狀態開始,並經歷 SUBMITTEDPENDINGRUNNING 狀態,最終達到終端狀態:SUCCEEDEDFAILEDCANCELLED

如果排程器支援搶佔,則應用程式會在搶佔時從 RUNNING 狀態轉移到 PENDING 狀態。

如果使用者停止應用程式,則應用程式狀態會轉移到 STOPPED 狀態,然後在排程器實際取消作業時轉移到 CANCELLED 狀態。

  1. UNSUBMITTED - 應用程式尚未提交到排程器

  2. SUBMITTED - 應用程式已成功提交到排程器

  3. PENDING - 應用程式已提交到排程器,正在等待分配

  4. RUNNING - 應用程式正在執行

  5. SUCCEEDED - 應用程式已成功完成

  6. FAILED - 應用程式未成功完成

  7. CANCELLED - 應用程式在完成之前被取消

  8. UNKNOWN - 應用程式狀態未知

torchx.specs.ReplicaState

AppState 的別名

掛載

torchx.specs.parse_mounts(opts: List[str]) List[Union[BindMount, VolumeMount, DeviceMount]][原始碼]

parse_mounts 會將選項清單解析為遵循類似於 Dockers 繫結掛載格式的類型化掛載。

可以在同一個清單中指定多個掛載。type 必須在每個掛載中首先指定。

範例

type=bind,src=/host,dst=/container,readonly,[type=bind,src=…,dst=…]

支援的類型

BindMount: type=bind,src=<主機路徑>,dst=<容器路徑>[,readonly] VolumeMount: type=volume,src=<名稱/id>,dst=<容器路徑>[,readonly] DeviceMount: type=device,src=/dev/<裝置>[,dst=<容器路徑>][,perm=rwm]

class torchx.specs.BindMount(src_path: str, dst_path: str, read_only: bool = False)[原始碼]

定義一個繫結掛載,以使用 mount –bind 將主機路徑繫結到工作者環境中。有關繫結掛載如何針對每個排程器運作的資訊,請參閱排程器文件。

參數
  • src_path – 主機上的路徑

  • dst_path – 工作者環境/容器中的路徑

  • read_only – 掛載是否應為唯讀

class torchx.specs.VolumeMount(src: str, dst_path: str, read_only: bool = False)[原始碼]

定義要掛載到工作環境中的永久性磁碟區掛載。 :param src: 要掛載的磁碟區的名稱或 ID :param dst_path: 工作環境/容器中的路徑 :param read_only: 掛載是否應為唯讀

class torchx.specs.DeviceMount(src_path: str, dst_path: str, permissions: str = 'rwm')[原始碼]

定義要掛載到容器中的主機裝置。 :param src_path: 主機上的路徑 :param dst_path: 工作環境/容器中的路徑 :param permissions: 要在裝置上設定的權限。 預設值: 讀取、寫入、mknode

元件 Linter

torchx.specs.file_linter.validate(path: str, component_function: str) List[LinterMessage][原始碼]

驗證函數以確保其符合元件標準。

validate 尋找 component_function 並根據以下規則驗證它

  1. 函數必須具有 Google 風格文件

  2. 所有函數參數都必須加上註解

  3. 函數必須回傳 torchx.specs.api.AppDef

參數
  • path – Python 原始碼檔案的路徑。

  • component_function – 要驗證的函數名稱。

回傳值:

驗證錯誤清單

回傳型別:

List[LinterMessage]

torchx.specs.file_linter.get_fn_docstring(fn: Callable[[...], object]) Tuple[str, Dict[str, str]][原始碼]

從提供的函數中解析函數和參數描述。 文件字串應採用 Google 風格格式

如果函數沒有文件字串,則函數描述將是函數的名稱,有關如何改進說明訊息和參數描述的提示將是參數的名稱。

文件字串中不存在的參數將包含預設/必要資訊

參數

fn – 具有或不具有文件字串的函數

回傳值:

函數描述,參數描述,其中 key 是參數的名稱,而 value 是描述

如果描述

class torchx.specs.file_linter.LinterMessage(name: str, description: str, line: int, char: int, severity: str = 'error')[原始碼]
class torchx.specs.file_linter.TorchFunctionVisitor(component_function_name: str)[原始碼]

尋找 component_function 並對其執行已註冊驗證器的訪問器。 目前已註冊的驗證器

  • TorchxFunctionArgsValidator - 驗證函數的參數。
    準則
    • 每個參數都應使用型別加上註解

    • 支援以下型別
      • primitive_types: {int, str, float},

      • Optional[primitive_types],

      • Dict[primitive_types, primitive_types],

      • List[primitive_types],

      • Optional[Dict[primitive_types, primitive_types]],

      • Optional[List[primitive_types]]

visit_FunctionDef(node: FunctionDef) None[原始碼]

使用子驗證器驗證函數定義。

類別 torchx.specs.file_linter.TorchXArgumentHelpFormatter(prog, indent_increment=2, max_help_position=24, width=None)[原始碼]

說明訊息格式器,用於將預設值和必要資訊添加到參數說明中。

如果參數為必要參數,則類別會在說明訊息的末尾附加 (必要)。 如果參數具有預設值,則類別會在末尾附加 (預設值: $DEFAULT)。 此格式器僅設計用於 torchx 元件函數。 這些函數不能同時具有必要參數和預設參數。

類別 torchx.specs.file_linter.TorchxFunctionArgsValidator[原始碼]
validate(app_specs_func_def: FunctionDef) List[LinterMessage][原始碼]

用於驗證提供的函數定義的方法。

類別 torchx.specs.file_linter.TorchxFunctionValidator[原始碼]
抽象 validate(app_specs_func_def: FunctionDef) List[LinterMessage][原始碼]

用於驗證提供的函數定義的方法。

類別 torchx.specs.file_linter.TorchxReturnValidator[原始碼]
validate(app_specs_func_def: FunctionDef) List[LinterMessage][原始碼]
驗證 torchx 函數的返回註釋。 目前允許的註釋
  • AppDef

  • specs.AppDef

文件

取得 PyTorch 的完整開發人員文件

查看文件

教學課程

取得適用於初學者和進階開發人員的深入教學課程

查看教學課程

資源

尋找開發資源並取得問題解答

查看資源