快捷方式

規劃器

TorchRec 規劃器負責決定用於分散式訓練和推論的最佳效能、平衡的分片計畫。

用於產生分片計畫的主要 API 是 EmbeddingShardingPlanner.plan

class torchrec.distributed.types.ShardingPlan(plan: Dict[str, ModuleShardingPlan])

分片計畫的表示。這使用較大的封裝模型的 FQN(即使用 DistributedModelParallel 封裝的模型)。當需要 TorchRec 的可組合性時,應使用 EmbeddingModuleShardingPlan。

plan

以參數名稱鍵控的參數分片規格字典的模組路徑鍵控的字典。

類型:

Dict[str, EmbeddingModuleShardingPlan]

get_plan_for_module(module_path: str) Optional[ModuleShardingPlan]
參數:

module_path (str) –

回傳:

以參數名稱鍵控的參數分片規格字典。如果給定的 module_path 不存在分片規格,則為 None。

回傳類型:

Optional[ModuleShardingPlan]

class torchrec.distributed.planner.planners.EmbeddingShardingPlanner(topology: Optional[Topology] = None, batch_size: Optional[int] = None, enumerator: Optional[Enumerator] = None, storage_reservation: Optional[StorageReservation] = None, proposer: Optional[Union[Proposer, List[Proposer]]] = None, partitioner: Optional[Partitioner] = None, performance_model: Optional[PerfModel] = None, stats: Optional[Union[Stats, List[Stats]]] = None, constraints: Optional[Dict[str, ParameterConstraints]] = None, debug: bool = True, callbacks: Optional[List[Callable[[List[ShardingOption]], List[ShardingOption]]]] = None)

根據提供的分片器(sharders)、拓撲(topology)和約束(constraints),為具有可分片參數的給定模組提供最佳化的分片計畫。

參數:
  • topology (Optional[Topology]) – 當前進程群組的拓撲。

  • batch_size (Optional[int]) – 模型(model)的批次大小(batch size)。

  • enumerator (Optional[Enumerator]) – 要使用的枚舉器(enumerator)。

  • storage_reservation (Optional[StorageReservation]) – 要使用的儲存保留(storage reservation)。

  • proposer (Optional[Union[Proposer, List[Proposer]]]) – 要使用的提議者(proposer)。

  • partitioner (Optional[Partitioner]) – 要使用的分割器(partitioner)。

  • performance_model (Optional[PerfModel]) – 要使用的效能模型(performance model)。

  • stats (Optional[Union[Stats, List[Stats]]]) – 要使用的統計資料(stats)。

  • constraints (Optional[Dict[str, ParameterConstraints]]) – 每個表格的分片約束(constraints)。

  • debug (bool) – 是否印出除錯(debug)訊息。

範例

ebc = EmbeddingBagCollection(tables=eb_configs, device=torch.device("meta"))
planner = EmbeddingShardingPlanner()
plan = planner.plan(
    module=ebc,
    sharders=[EmbeddingBagCollectionSharder()],
)
collective_plan(module: Module, sharders: Optional[List[ModuleSharder[Module]]] = None, pg: Optional[ProcessGroup] = None) ShardingPlan

在 rank 0 上呼叫 self.plan(…),然後廣播(broadcast)。

參數:
  • module (nn.Module) – 要分片的模組(module)。

  • sharders (Optional[List[ModuleSharder[nn.Module]]]) – 用於分片的分片器(sharders)。

  • pg (Optional[dist.ProcessGroup]) – 用於集體操作的進程群組(process group)。

回傳:

此模組的分片計畫。

回傳類型:

ShardingPlan (分片計畫)

plan(module: Module, sharders: List[ModuleSharder[Module]]) ShardingPlan

根據提供的分片器(sharders)、拓撲(topology)和約束(constraints),為具有可分片參數的給定模組提供最佳化的分片計畫。

參數:
  • module (nn.Module) – 要分片的模組(module)。

  • sharders (List[ModuleSharder[nn.Module]]) – 用於分片的 sharder 列表。

回傳:

此模組的分片計畫。

回傳類型:

ShardingPlan (分片計畫)

class torchrec.distributed.planner.enumerators.EmbeddingEnumerator(topology: Topology, batch_size: int, constraints: Optional[Dict[str, ParameterConstraints]] = None, estimator: Optional[Union[ShardEstimator, List[ShardEstimator]]] = None, use_exact_enumerate_order: Optional[bool] = False)

為給定的 nn.Module 產生 embedding 分片選項,並考慮使用者提供的約束。

參數:
  • topology (Topology) – 裝置拓撲。

  • batch_size (int) – 批次大小。

  • constraints (Optional[Dict[str, ParameterConstraints]]) – 參數名稱與提供的 ParameterConstraints 的字典。

  • estimator (Optional[Union[ShardEstimator, List[ShardEstimator]]]) – 分片效能估算器。

  • use_exact_enumerate_order (bool) – 是否按照確切的 name_children 枚舉順序枚舉可分片的參數。

enumerate(module: Module, sharders: List[ModuleSharder[Module]]) List[ShardingOption]

產生給定模組和 sharder 的相關分片選項。

參數:
  • module (nn.Module) – 要進行分片的模組。

  • sharders (List[ModuleSharder[nn.Module]]) – 為模組提供的 sharder。

回傳:

具有已填充值的有效分片選項。

回傳類型:

List[ShardingOption] (分片選項列表)

populate_estimates(sharding_options: List[ShardingOption]) None

請參閱類別描述。

class torchrec.distributed.planner.partitioners.GreedyPerfPartitioner(sort_by: SortBy = SortBy.STORAGE, balance_modules: bool = False)

貪婪式分割器。

參數:
  • sort_by (SortBy) – 依儲存或效能以降序對分片選項進行排序 (也就是說,大型表格將首先被放置)。

  • balance_modules (bool) – 是否首先依模組排序,其中較小的模組將首先排序。 實際上,這會以平衡的方式將表格放置在每個模組中。

partition(proposal: List[ShardingOption], storage_constraint: Topology) List[ShardingOption]

根據每個分片選項的 partition_by 屬性將分片選項放置在拓撲上。拓撲、儲存和效能會在放置結束時更新。

參數:
  • proposal (List[ShardingOption]) – 已填充的分片選項列表。

  • storage_constraint (Topology) – 裝置拓撲。

回傳:

所選計畫的分片選項列表。

回傳類型:

List[ShardingOption] (分片選項列表)

範例

sharding_options = [
        ShardingOption(partition_by="uniform",
                shards=[
                    Shards(storage=1, perf=1),
                    Shards(storage=1, perf=1),
                ]),
        ShardingOption(partition_by="uniform",
                shards=[
                    Shards(storage=2, perf=2),
                    Shards(storage=2, perf=2),
                ]),
        ShardingOption(partition_by="device",
                shards=[
                    Shards(storage=3, perf=3),
                    Shards(storage=3, perf=3),
                ])
        ShardingOption(partition_by="device",
                shards=[
                    Shards(storage=4, perf=4),
                    Shards(storage=4, perf=4),
                ]),
    ]
topology = Topology(world_size=2)

# First [sharding_options[0] and sharding_options[1]] will be placed on the
# topology with the uniform strategy, resulting in

topology.devices[0].perf.total = (1,2)
topology.devices[1].perf.total = (1,2)

# Finally sharding_options[2] and sharding_options[3]] will be placed on the
# topology with the device strategy (see docstring of `partition_by_device` for
# more details).

topology.devices[0].perf.total = (1,2) + (3,4)
topology.devices[1].perf.total = (1,2) + (3,4)

# The topology updates are done after the end of all the placements (the other
# in the example is just for clarity).
class torchrec.distributed.planner.storage_reservations.HeuristicalStorageReservation(percentage: float, parameter_multiplier: float = 6.0, dense_tensor_estimate: Optional[int] = None)

使用啟發式計算,為模型分片保留儲存空間。 儲存空間保留包含密集張量儲存空間、KJT 儲存空間,以及總儲存空間的額外百分比。

參數:
  • percentage (float) – 額外儲存百分比,用於保留作為超出儲存空間啟發式計算的誤差範圍。

  • parameter_multiplier (float) – 總參數儲存空間的啟發式乘數。

  • dense_tensor_estimate (Optional[int]) – 密集張量的儲存空間估計值,如果未提供,則使用預設的啟發式估計值。

class torchrec.distributed.planner.proposers.GreedyProposer(use_depth: bool = True, threshold: Optional[int] = None)

以貪婪的方式提出分片計畫。

根據效能對每個可分片參數的分片選項進行排序。 在每次迭代中,找到當前儲存空間使用量最大的參數,然後嘗試其下一個分片選項。

參數:
  • use_depth (bool) – 啟用後,fqn 的 sharding_options 將根據 max(shard.perf.total) 排序,否則 sharding_options 將根據 sum(shard.perf.total) 排序。

  • threshold (Optional[int]) – 提前停止的閾值。 指定後,當提議的 perf_rating 連續比 best_perf_rating 差時,提議者停止提議。

feedback(partitionable: bool, plan: Optional[List[ShardingOption]] = None, perf_rating: Optional[float] = None, storage_constraint: Optional[Topology] = None) None

向提議者提供回饋。

參數:
  • partitionable (bool) – 該計畫是否可分割。

  • plan (Optional[List[ShardingOption]]) – 要提供回饋的計畫。

  • perf_rating (Optional[float]) – 該計畫的效能評級。

  • storage_constraint (Optional[Topology]) – 該計畫的儲存空間約束。

load(search_space: List[ShardingOption], enumerator: Optional[Enumerator] = None) None

將搜尋空間載入到提議者中。

參數:
  • search_space (List[ShardingOption]) – 要載入的搜尋空間。

  • enumerator (Enumerator) – 用於產生搜尋空間的枚舉器。

propose() Optional[List[ShardingOption]]

提出分片計畫。

回傳:

提出的計畫。

回傳類型:

Optional[List[ShardingOption]]

class torchrec.distributed.planner.shard_estimators.EmbeddingPerfEstimator(topology: Topology, constraints: Optional[Dict[str, ParameterConstraints]] = None, is_inference: bool = False)

Embedding Wall Time Perf Estimator(嵌入層執行時間效能估算器)。 此估算器估算給定分片選項的執行時間。

參數:
  • topology (Topology) – 裝置拓撲。

  • constraints (Optional[Dict[str, ParameterConstraints]]) – 參數約束。

  • is_inference (bool) – 是否估算器用於推論 (inference)。

estimate(sharding_options: List[ShardingOption], sharder_map: Optional[Dict[str, ModuleSharder[Module]]] = None) None

估算給定分片選項的執行時間 (wall time)。

參數:
  • sharding_options (List[ShardingOption]) – 分片選項的列表。

  • sharder_map (Optional[Dict[str, ModuleSharder[nn.Module]]]) – Sharder 映射。

classmethod perf_func_emb_wall_time(shard_sizes: List[List[int]], compute_kernel: str, compute_device: str, sharding_type: str, batch_sizes: List[int], world_size: int, local_world_size: int, input_lengths: List[float], input_data_type_size: float, table_data_type_size: float, output_data_type_size: float, fwd_a2a_comm_data_type_size: float, bwd_a2a_comm_data_type_size: float, fwd_sr_comm_data_type_size: float, bwd_sr_comm_data_type_size: float, num_poolings: List[float], hbm_mem_bw: float, ddr_mem_bw: float, hbm_to_ddr_mem_bw: float, intra_host_bw: float, inter_host_bw: float, bwd_compute_multiplier: float, weighted_feature_bwd_compute_multiplier: float, is_pooled: bool, is_weighted: bool = False, caching_ratio: Optional[float] = None, is_inference: bool = False, prefetch_pipeline: bool = False, expected_cache_fetches: float = 0, uneven_sharding_perf_multiplier: float = 1.0) List[Perf]

試圖將效能 (perfs) 建模為相對執行時間 (wall times) 的函數。

參數:
  • shard_sizes (List[List[int]]) – 每個分片的 (local_rows, local_cols) 列表。

  • compute_kernel (str) – 計算核心 (compute kernel)。

  • compute_device (str) – 計算裝置 (compute device)。

  • sharding_type (str) – tw, rw, cw, twrw, dp。

  • batch_sizes (List[int]) – 每個輸入特徵的批次大小 (batch size)。

  • world_size (int) – 所有主機的裝置數量。

  • local_world_size (int) – 每個主機的裝置數量。

  • input_lengths (List[float]) – 每個輸入查詢特徵的平均查找次數列表。

  • input_data_type_size (float) – 分散式 data_parallel 輸入的資料類型大小。

  • table_data_type_size (float) – 資料表的資料型態大小。

  • output_data_type_size (float) – 輸出嵌入 (embeddings) 的資料型態大小。

  • fwd_comm_data_type_size (float) – 在前向傳播 (forward communication) 期間,分散式資料並行 (distributed data_parallel) 輸入的資料型態大小。

  • bwd_comm_data_type_size (float) – 在反向傳播 (backward communication) 期間,分散式資料並行輸入的資料型態大小。

  • num_poolings (List[float]) – 每個樣本的 pooling 數量,通常為 1.0。

  • hbm_mem_bw (float) – 裝置 HBM 的頻寬。

  • ddr_mem_bw (float) – 系統 DDR 記憶體的頻寬。

  • hbm_to_ddr_bw (float) – 裝置 HBM 和系統 DDR 之間的頻寬。

  • intra_host_bw (float) – 單一主機內部的頻寬,例如多個執行緒。

  • inter_host_bw (float) – 兩個主機之間的頻寬,例如多部機器。

  • is_pooled (bool) – 如果嵌入輸出是 pooled (即 EmbeddingBag),則為 True;如果不是 pooled/sequential (即 Embedding),則為 False。

  • is_weighted (bool = False) – 如果模組是 EBC 且是 weighted,通常表示一個 id score list 特徵。

  • is_inference (bool = False) – 是否規劃用於推論 (inference)。

  • caching_ratio (Optional[float] = None) – 快取比例,用於決定裝置的頻寬。

  • prefetch_pipeline (bool = False) – 是否啟用預取管線 (prefetch pipeline)。

  • expected_cache_fetches (float) – 全域批次 (global batch) 中預期的快取提取 (cache fetches) 數量。

  • uneven_sharding_perf_multiplier (float = 1.0) – 用於考量不均勻分片 (sharding) 效能的乘數。

回傳:

每個分片 (shard) 的效能列表。

回傳類型:

List[float]

class torchrec.distributed.planner.shard_estimators.EmbeddingStorageEstimator(topology: Topology, constraints: Optional[Dict[str, ParameterConstraints]] = None, pipeline_type: PipelineType = PipelineType.NONE, run_embedding_at_peak_memory: bool = False, is_inference: bool = False)

嵌入儲存空間使用量估算器

參數:
  • topology (Topology) – 裝置拓撲。

  • constraints (Optional[Dict[str, ParameterConstraints]]) – 參數約束。

  • pipeline_type (PipelineType) – 管線的類型 (如果有的話)。將決定記憶體估算期間的輸入複製因子 (replication factor)。

  • run_embedding_at_peak_memory (bool) –

    如果 embedding fwd/bwd 將在 HBM 使用量達到峰值時執行。當設定為 TRUE 時,embedding 前向/反向傳播期間的任何臨時記憶體分配,只要 output_dist 之前的輸出大小將被計入 HBM 儲存成本。否則,它們不會被計入,因為它們將被真正的記憶體峰值「隱藏」。

    僅當為向後兼容性設定 pipeline_type 時才生效(不影響使用舊的與管線無關的公式的模型)

    預設為 false,因為對於 RecSys 來說,這通常為 false,因為記憶體峰值發生在密集前向傳播結束/密集反向傳播開始時。

  • is_inference (bool) – 如果模型是推論模型。預設為 False。

estimate(sharding_options: List[ShardingOption], sharder_map: Optional[Dict[str, ModuleSharder[Module]]] = None) None

估算每個分片選項 (sharding option) 的儲存成本。

參數:
  • sharding_options (List[ShardingOption]) – 分片選項的列表。

  • sharder_map (Optional[Dict[str, ModuleSharder[nn.Module]]]) – 從模組類型到分片器 (sharder) 的映射。

文件

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources