• 教學 >
  • 將分散式資料平行與分散式 RPC 框架結合
捷徑

將分散式資料平行與分散式 RPC 框架結合

建立於:2020 年 7 月 28 日 | 最後更新:2023 年 6 月 06 日 | 最後驗證:未驗證

作者Pritam DamaniaYi Wang

注意

editgithub 中檢視與編輯此教學。

本教學使用一個簡單的範例來示範如何結合 DistributedDataParallel (DDP) 與 Distributed RPC framework,以結合分散式資料平行與分散式模型平行來訓練一個簡單的模型。 範例的原始碼可以在此處找到。

之前的教學,分散式資料平行入門分散式 RPC 框架入門,分別描述了如何執行分散式資料平行和分散式模型平行訓練。 雖然,在某些訓練範例中,您可能想要結合這兩種技術。 例如

  1. 如果我們有一個具有稀疏部分 (大型嵌入表) 和密集部分 (FC 層) 的模型,我們可能想要將嵌入表放在參數伺服器上,並使用 DistributedDataParallel 在多個訓練器之間複製 FC 層。 Distributed RPC framework 可以用於在參數伺服器上執行嵌入查找。

  2. 啟用 PipeDream 論文中描述的混合平行。 我們可以使用 Distributed RPC framework 來在多個 worker 之間管線化模型的階段,並使用 DistributedDataParallel 複製每個階段(如果需要)。


在本教學中,我們將介紹上面提到的案例 1。 我們的設定總共有 4 個 worker 如下

  1. 1 個 Master,負責在參數伺服器上建立嵌入表 (nn.EmbeddingBag)。 Master 還會在兩個訓練器上驅動訓練迴圈。

  2. 1 個參數伺服器,基本上將嵌入表保存在記憶體中,並回應來自 Master 和訓練器的 RPC。

  3. 2 個訓練器,儲存使用 DistributedDataParallel 在彼此之間複製的 FC 層 (nn.Linear)。 訓練器還負責執行正向傳遞、反向傳遞和最佳化器步驟。


整個訓練過程如下執行

  1. Master 建立一個 RemoteModule,該模組在參數伺服器上保存一個嵌入表。

  2. 然後,Master 啟動訓練器上的訓練迴圈,並將遠端模組傳遞給訓練器。

  3. 訓練器建立一個 HybridModel,它首先使用 Master 提供的遠端模組執行嵌入查找,然後執行包裝在 DDP 內的 FC 層。

  4. 訓練器執行模型的正向傳遞,並使用損失來使用 Distributed Autograd 執行反向傳遞。

  5. 作為反向傳遞的一部分,首先計算 FC 層的梯度,並透過 DDP 中的 allreduce 同步到所有訓練器。

  6. 接下來,Distributed Autograd 將梯度傳播到參數伺服器,其中嵌入表的梯度會被更新。

  7. 最後,Distributed Optimizer 用於更新所有參數。

注意

如果結合使用 DDP 和 RPC,您應該始終使用 Distributed Autograd 進行反向傳遞。

現在,讓我們詳細了解每個部分。 首先,我們需要設定所有 worker,然後才能執行任何訓練。 我們建立 4 個處理序,使 rank 0 和 1 是我們的訓練器,rank 2 是 Master,rank 3 是參數伺服器。

我們使用 TCP init_method 在所有 4 個 worker 上初始化 RPC 框架。 RPC 初始化完成後,Master 使用 RemoteModule 在參數伺服器上建立一個保存 EmbeddingBag 層的遠端模組。 然後,Master 迴圈遍歷每個訓練器,並透過使用 rpc_async 在每個訓練器上呼叫 _run_trainer 來啟動訓練迴圈。 最後,Master 等待所有訓練完成後再退出。

訓練器首先使用 init_process_group,使用 world_size=2(適用於兩個訓練器)初始化 DDP 的 ProcessGroup。 接下來,他們使用 TCP init_method 初始化 RPC 框架。 請注意,RPC 初始化和 ProcessGroup 初始化中的埠不同。 這是為了避免兩個框架初始化之間的埠衝突。 初始化完成後,訓練器只需等待來自 Master 的 _run_trainer RPC。

參數伺服器只需初始化 RPC 框架,並等待來自訓練器和 Master 的 RPC。

def run_worker(rank, world_size):
    r"""
    A wrapper function that initializes RPC, calls the function, and shuts down
    RPC.
    """

    # We need to use different port numbers in TCP init_method for init_rpc and
    # init_process_group to avoid port conflicts.
    rpc_backend_options = TensorPipeRpcBackendOptions()
    rpc_backend_options.init_method = "tcp://127.0.0.1:29501"

    # Rank 2 is master, 3 is ps and 0 and 1 are trainers.
    if rank == 2:
        rpc.init_rpc(
            "master",
            rank=rank,
            world_size=world_size,
            rpc_backend_options=rpc_backend_options,
        )

        remote_emb_module = RemoteModule(
            "ps",
            torch.nn.EmbeddingBag,
            args=(NUM_EMBEDDINGS, EMBEDDING_DIM),
            kwargs={"mode": "sum"},
        )

        # Run the training loop on trainers.
        futs = []
        for trainer_rank in [0, 1]:
            trainer_name = "trainer{}".format(trainer_rank)
            fut = rpc.rpc_async(
                trainer_name, _run_trainer, args=(remote_emb_module, trainer_rank)
            )
            futs.append(fut)

        # Wait for all training to finish.
        for fut in futs:
            fut.wait()
    elif rank <= 1:
        # Initialize process group for Distributed DataParallel on trainers.
        dist.init_process_group(
            backend="gloo", rank=rank, world_size=2, init_method="tcp://127.0.0.1:29500"
        )

        # Initialize RPC.
        trainer_name = "trainer{}".format(rank)
        rpc.init_rpc(
            trainer_name,
            rank=rank,
            world_size=world_size,
            rpc_backend_options=rpc_backend_options,
        )

        # Trainer just waits for RPCs from master.
    else:
        rpc.init_rpc(
            "ps",
            rank=rank,
            world_size=world_size,
            rpc_backend_options=rpc_backend_options,
        )
        # parameter server do nothing
        pass

    # block until all rpcs finish
    rpc.shutdown()


if __name__ == "__main__":
    # 2 trainers, 1 parameter server, 1 master.
    world_size = 4
    mp.spawn(run_worker, args=(world_size,), nprocs=world_size, join=True)

在我們討論 Trainer 的細節之前,讓我們先介紹 Trainer 使用的 HybridModel。如下所述,HybridModel 是使用一個遠端模組初始化的,該模組在參數伺服器上保存一個 embedding table(remote_emb_module),以及用於 DDP 的 device。模型的初始化會將一個 nn.Linear 層包裝在 DDP 內部,以便在所有 Trainer 之間複製和同步這一層。

模型的 forward 方法非常直接。它使用 RemoteModule 的 forward 在參數伺服器上執行 embedding 查詢,並將其輸出傳遞到 FC 層。

class HybridModel(torch.nn.Module):
    r"""
    The model consists of a sparse part and a dense part.
    1) The dense part is an nn.Linear module that is replicated across all trainers using DistributedDataParallel.
    2) The sparse part is a Remote Module that holds an nn.EmbeddingBag on the parameter server.
    This remote model can get a Remote Reference to the embedding table on the parameter server.
    """

    def __init__(self, remote_emb_module, device):
        super(HybridModel, self).__init__()
        self.remote_emb_module = remote_emb_module
        self.fc = DDP(torch.nn.Linear(16, 8).cuda(device), device_ids=[device])
        self.device = device

    def forward(self, indices, offsets):
        emb_lookup = self.remote_emb_module.forward(indices, offsets)
        return self.fc(emb_lookup.cuda(self.device))

接下來,讓我們看看 Trainer 上的設定。Trainer 首先使用一個遠端模組創建上述的 HybridModel,該模組在參數伺服器上保存 embedding table 以及 Trainer 自身的 rank。

現在,我們需要檢索一個 RRefs 列表,其中包含我們想要使用 DistributedOptimizer 優化的所有參數。要從參數伺服器檢索 embedding table 的參數,我們可以呼叫 RemoteModule 的 remote_parameters,它基本上會遍歷 embedding table 的所有參數,並返回一個 RRefs 列表。Trainer 通過 RPC 在參數伺服器上呼叫此方法,以接收指向所需參數的 RRefs 列表。由於 DistributedOptimizer 總是需要一個指向需要優化的參數的 RRefs 列表,即使是 FC 層的本地參數,我們也需要創建 RRefs。這是通過遍歷 model.fc.parameters(),為每個參數創建一個 RRef,並將其附加到從 remote_parameters() 返回的列表中來完成的。請注意,我們不能使用 model.parameters(),因為它會遞迴呼叫 model.remote_emb_module.parameters(),這是 RemoteModule 不支援的。

最後,我們使用所有 RRefs 創建我們的 DistributedOptimizer,並定義一個 CrossEntropyLoss 函數。

def _run_trainer(remote_emb_module, rank):
    r"""
    Each trainer runs a forward pass which involves an embedding lookup on the
    parameter server and running nn.Linear locally. During the backward pass,
    DDP is responsible for aggregating the gradients for the dense part
    (nn.Linear) and distributed autograd ensures gradients updates are
    propagated to the parameter server.
    """

    # Setup the model.
    model = HybridModel(remote_emb_module, rank)

    # Retrieve all model parameters as rrefs for DistributedOptimizer.

    # Retrieve parameters for embedding table.
    model_parameter_rrefs = model.remote_emb_module.remote_parameters()

    # model.fc.parameters() only includes local parameters.
    # NOTE: Cannot call model.parameters() here,
    # because this will call remote_emb_module.parameters(),
    # which supports remote_parameters() but not parameters().
    for param in model.fc.parameters():
        model_parameter_rrefs.append(RRef(param))

    # Setup distributed optimizer
    opt = DistributedOptimizer(
        optim.SGD,
        model_parameter_rrefs,
        lr=0.05,
    )

    criterion = torch.nn.CrossEntropyLoss()

現在我們可以介紹在每個 Trainer 上運行的主要訓練迴圈。get_next_batch 只是一个輔助函數,用於生成隨機輸入和目標以進行訓練。我們運行訓練迴圈多個 epoch,並且對於每個 batch

  1. 為 Distributed Autograd 設置一個 Distributed Autograd Context

  2. 運行模型的前向傳播並檢索其輸出。

  3. 使用損失函數,根據我們的輸出和目標計算損失。

  4. 使用 Distributed Autograd 執行一個使用損失的 distributed backward pass。

  5. 最後,運行 Distributed Optimizer 步驟來優化所有參數。

    def get_next_batch(rank):
        for _ in range(10):
            num_indices = random.randint(20, 50)
            indices = torch.LongTensor(num_indices).random_(0, NUM_EMBEDDINGS)

            # Generate offsets.
            offsets = []
            start = 0
            batch_size = 0
            while start < num_indices:
                offsets.append(start)
                start += random.randint(1, 10)
                batch_size += 1

            offsets_tensor = torch.LongTensor(offsets)
            target = torch.LongTensor(batch_size).random_(8).cuda(rank)
            yield indices, offsets_tensor, target

    # Train for 100 epochs
    for epoch in range(100):
        # create distributed autograd context
        for indices, offsets, target in get_next_batch(rank):
            with dist_autograd.context() as context_id:
                output = model(indices, offsets)
                loss = criterion(output, target)

                # Run distributed backward pass
                dist_autograd.backward(context_id, [loss])

                # Tun distributed optimizer
                opt.step(context_id)

                # Not necessary to zero grads as each iteration creates a different
                # distributed autograd context which hosts different grads
        print("Training done for epoch {}".format(epoch))

整個範例的原始程式碼可以在這裡找到。

文件

取得 PyTorch 的完整開發者文件

檢視文件

教學

取得針對初學者和進階開發者的深入教學

檢視教學

資源

尋找開發資源並取得問題解答

檢視資源