• 教學 >
  • 分散式 RPC 框架入門
捷徑

分散式 RPC 框架入門

建立於:2020 年 1 月 1 日 | 最後更新:2025 年 1 月 21 日 | 最後驗證:2024 年 11 月 5 日

作者Shen Li

注意

editgithub 中檢視和編輯本教學。

先決條件

本教學使用兩個簡單的範例來示範如何使用 torch.distributed.rpc 套件建立分散式訓練,該套件最初在 PyTorch v1.4 中作為實驗性功能推出。 這兩個範例的原始碼可以在 PyTorch 範例中找到。

之前的教學課程,分散式數據平行入門使用 PyTorch 編寫分散式應用程式,描述了 DistributedDataParallel,它支援一種特定的訓練範例,其中模型在多個進程中複製,並且每個進程處理輸入數據的分割。 有時,您可能會遇到需要不同訓練範例的情況。 例如

  1. 在強化學習中,從環境中獲取訓練數據可能相對昂貴,而模型本身可能很小。 在這種情況下,產生多個並行運行的觀察器並共享單一代理可能很有用。 在這種情況下,代理在本地負責訓練,但應用程式仍然需要函式庫來在觀察器和訓練器之間發送和接收數據。

  2. 您的模型可能太大而無法放入單台機器上的 GPU 中,因此需要一個函式庫來幫助將模型拆分到多台機器上。 或者您可能正在實現一個 參數伺服器 訓練框架,其中模型參數和訓練器位於不同的機器上。

torch.distributed.rpc 套件可以幫助處理上述情況。 在情況 1 中,RPCRRef 允許將數據從一個工作人員發送到另一個工作人員,同時輕鬆引用遠端數據物件。 在情況 2 中,分散式自動梯度分散式優化器 使執行反向傳播和優化器步驟就像本地訓練一樣。 在接下來的兩個章節中,我們將使用強化學習範例和語言模型範例來示範 torch.distributed.rpc 的 API。 請注意,本教學不旨在建立最準確或最有效的模型來解決給定的問題,相反,這裡的主要目標是展示如何使用 torch.distributed.rpc 套件來建立分散式訓練應用程式。

使用 RPC 和 RRef 的分散式強化學習

本節描述使用 RPC 建立玩具分散式強化學習模型以從 OpenAI Gym 解決 CartPole-v1 的步驟。 策略程式碼主要借用自現有的單線程 範例,如下所示。 我們將跳過 Policy 設計的詳細資訊,並專注於 RPC 用法。

import torch.nn as nn
import torch.nn.functional as F

class Policy(nn.Module):

    def __init__(self):
        super(Policy, self).__init__()
        self.affine1 = nn.Linear(4, 128)
        self.dropout = nn.Dropout(p=0.6)
        self.affine2 = nn.Linear(128, 2)

    def forward(self, x):
        x = self.affine1(x)
        x = self.dropout(x)
        x = F.relu(x)
        action_scores = self.affine2(x)
        return F.softmax(action_scores, dim=1)

我們準備好展示觀察器。 在此範例中,每個觀察器建立自己的環境,並等待代理的命令來運行一個 episode。 在每個 episode 中,一個觀察器最多循環 n_steps 次迭代,並且在每次迭代中,它使用 RPC 將其環境狀態傳遞給代理並取回一個動作。 然後,它將該動作應用於其環境,並從環境中獲得獎勵和下一個狀態。 之後,觀察器使用另一個 RPC 向代理報告獎勵。 再次,請注意,這顯然不是最有效的觀察器實現。 例如,一種簡單的優化可能是將當前狀態和最後一次獎勵打包在一個 RPC 中,以減少通信開銷。 但是,目標是示範 RPC API,而不是為 CartPole 建立最佳解算器。 因此,讓我們保持邏輯簡單,並在此範例中明確兩個步驟。

import argparse
import gym
import torch.distributed.rpc as rpc

parser = argparse.ArgumentParser(
    description="RPC Reinforcement Learning Example",
    formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)

parser.add_argument('--world_size', default=2, type=int, metavar='W',
                    help='number of workers')
parser.add_argument('--log_interval', type=int, default=10, metavar='N',
                    help='interval between training status logs')
parser.add_argument('--gamma', type=float, default=0.99, metavar='G',
                    help='how much to value future rewards')
parser.add_argument('--seed', type=int, default=1, metavar='S',
                    help='random seed  for reproducibility')
args = parser.parse_args()

class Observer:

    def __init__(self):
        self.id = rpc.get_worker_info().id
        self.env = gym.make('CartPole-v1')
        self.env.seed(args.seed)

    def run_episode(self, agent_rref):
        state, ep_reward = self.env.reset(), 0
        for _ in range(10000):
            # send the state to the agent to get an action
            action = agent_rref.rpc_sync().select_action(self.id, state)

            # apply the action to the environment, and get the reward
            state, reward, done, _ = self.env.step(action)

            # report the reward to the agent for training purpose
            agent_rref.rpc_sync().report_reward(self.id, reward)

            # finishes after the number of self.env._max_episode_steps
            if done:
                break

Agent 的程式碼稍微複雜,我們會將其拆解成多個部分。在這個範例中,agent 同時作為訓練器和主控端,它會將命令傳送給多個分散式觀察者以執行 episodes,並且也會在本地記錄所有動作和獎勵,這些資訊將在每個 episode 之後的訓練階段中使用。下面的程式碼展示了 Agent 建構子,其中大部分的程式碼行都是在初始化各種元件。最後的迴圈會在其他 workers 上遠端初始化觀察者,並在本地持有這些觀察者的 RRefs。agent 稍後會使用這些觀察者的 RRefs 來傳送命令。應用程式不需要擔心 RRefs 的生命週期。每個 RRef 的擁有者都會維護一個引用計數映射來追蹤其生命週期,並保證只要有任何該 RRef 的作用使用者存在,遠端的資料物件就不會被刪除。請參考 RRef設計文件 以獲取更多詳細資訊。

import gym
import numpy as np

import torch
import torch.distributed.rpc as rpc
import torch.optim as optim
from torch.distributed.rpc import RRef, rpc_async, remote
from torch.distributions import Categorical

class Agent:
    def __init__(self, world_size):
        self.ob_rrefs = []
        self.agent_rref = RRef(self)
        self.rewards = {}
        self.saved_log_probs = {}
        self.policy = Policy()
        self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-2)
        self.eps = np.finfo(np.float32).eps.item()
        self.running_reward = 0
        self.reward_threshold = gym.make('CartPole-v1').spec.reward_threshold
        for ob_rank in range(1, world_size):
            ob_info = rpc.get_worker_info(OBSERVER_NAME.format(ob_rank))
            self.ob_rrefs.append(remote(ob_info, Observer))
            self.rewards[ob_info.id] = []
            self.saved_log_probs[ob_info.id] = []

接下來,agent 暴露了兩個 API 給觀察者,用於選擇動作和報告獎勵。這些函數只會在 agent 本機上執行,但會由觀察者透過 RPC 觸發。

class Agent:
    ...
    def select_action(self, ob_id, state):
        state = torch.from_numpy(state).float().unsqueeze(0)
        probs = self.policy(state)
        m = Categorical(probs)
        action = m.sample()
        self.saved_log_probs[ob_id].append(m.log_prob(action))
        return action.item()

    def report_reward(self, ob_id, reward):
        self.rewards[ob_id].append(reward)

讓我們在 agent 上新增一個 run_episode 函數,它會告訴所有觀察者執行一個 episode。在這個函數中,它首先建立一個清單來收集來自非同步 RPC 的 futures,然後迴圈遍歷所有觀察者的 RRefs 以進行非同步 RPC。在這些 RPC 中,agent 也將自身的一個 RRef 傳遞給觀察者,以便觀察者也可以呼叫 agent 上的函數。如上所示,每個觀察者都會向 agent 發出 RPC,這些 RPC 是巢狀 RPC。在每個 episode 之後,saved_log_probsrewards 將包含記錄的動作機率和獎勵。

class Agent:
    ...
    def run_episode(self):
        futs = []
        for ob_rref in self.ob_rrefs:
            # make async RPC to kick off an episode on all observers
            futs.append(
                rpc_async(
                    ob_rref.owner(),
                    ob_rref.rpc_sync().run_episode,
                    args=(self.agent_rref,)
                )
            )

        # wait until all obervers have finished this episode
        for fut in futs:
            fut.wait()

最後,在一個 episode 之後,agent 需要訓練模型,這是在下面的 finish_episode 函數中實作的。這個函數中沒有 RPC,而且大部分是從單執行緒的 範例 中借用的。因此,我們跳過對其內容的描述。

class Agent:
    ...
    def finish_episode(self):
      # joins probs and rewards from different observers into lists
      R, probs, rewards = 0, [], []
      for ob_id in self.rewards:
          probs.extend(self.saved_log_probs[ob_id])
          rewards.extend(self.rewards[ob_id])

      # use the minimum observer reward to calculate the running reward
      min_reward = min([sum(self.rewards[ob_id]) for ob_id in self.rewards])
      self.running_reward = 0.05 * min_reward + (1 - 0.05) * self.running_reward

      # clear saved probs and rewards
      for ob_id in self.rewards:
          self.rewards[ob_id] = []
          self.saved_log_probs[ob_id] = []

      policy_loss, returns = [], []
      for r in rewards[::-1]:
          R = r + args.gamma * R
          returns.insert(0, R)
      returns = torch.tensor(returns)
      returns = (returns - returns.mean()) / (returns.std() + self.eps)
      for log_prob, R in zip(probs, returns):
          policy_loss.append(-log_prob * R)
      self.optimizer.zero_grad()
      policy_loss = torch.cat(policy_loss).sum()
      policy_loss.backward()
      self.optimizer.step()
      return min_reward

有了 PolicyObserverAgent 類別,我們就可以啟動多個進程來執行分散式訓練。在這個範例中,所有進程都執行相同的 run_worker 函數,並且它們使用 rank 來區分它們的角色。Rank 0 永遠是 agent,而所有其他 rank 都是觀察者。Agent 作為 master,透過重複呼叫 run_episodefinish_episode,直到運行的獎勵超過環境指定的獎勵閾值。所有觀察者都被動地等待來自 agent 的命令。程式碼被 rpc.init_rpcrpc.shutdown 包裹,它們分別初始化和終止 RPC 實例。更多細節可在 API 頁面 中找到。

import os
from itertools import count

import torch.multiprocessing as mp

AGENT_NAME = "agent"
OBSERVER_NAME="obs{}"

def run_worker(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    if rank == 0:
        # rank0 is the agent
        rpc.init_rpc(AGENT_NAME, rank=rank, world_size=world_size)

        agent = Agent(world_size)
        print(f"This will run until reward threshold of {agent.reward_threshold}"
                " is reached. Ctrl+C to exit.")
        for i_episode in count(1):
            agent.run_episode()
            last_reward = agent.finish_episode()

            if i_episode % args.log_interval == 0:
                print(f"Episode {i_episode}\tLast reward: {last_reward:.2f}\tAverage reward: "
                    f"{agent.running_reward:.2f}")
            if agent.running_reward > agent.reward_threshold:
                print(f"Solved! Running reward is now {agent.running_reward}!")
                break
    else:
        # other ranks are the observer
        rpc.init_rpc(OBSERVER_NAME.format(rank), rank=rank, world_size=world_size)
        # observers passively waiting for instructions from the agent

    # block until all rpcs finish, and shutdown the RPC instance
    rpc.shutdown()


mp.spawn(
    run_worker,
    args=(args.world_size, ),
    nprocs=args.world_size,
    join=True
)

以下是一些使用 world_size=2 進行訓練時的範例輸出。

This will run until reward threshold of 475.0 is reached. Ctrl+C to exit.
Episode 10      Last reward: 26.00      Average reward: 10.01
Episode 20      Last reward: 16.00      Average reward: 11.27
Episode 30      Last reward: 49.00      Average reward: 18.62
Episode 40      Last reward: 45.00      Average reward: 26.09
Episode 50      Last reward: 44.00      Average reward: 30.03
Episode 60      Last reward: 111.00     Average reward: 42.23
Episode 70      Last reward: 131.00     Average reward: 70.11
Episode 80      Last reward: 87.00      Average reward: 76.51
Episode 90      Last reward: 86.00      Average reward: 95.93
Episode 100     Last reward: 13.00      Average reward: 123.93
Episode 110     Last reward: 33.00      Average reward: 91.39
Episode 120     Last reward: 73.00      Average reward: 76.38
Episode 130     Last reward: 137.00     Average reward: 88.08
Episode 140     Last reward: 89.00      Average reward: 104.96
Episode 150     Last reward: 97.00      Average reward: 98.74
Episode 160     Last reward: 150.00     Average reward: 100.87
Episode 170     Last reward: 126.00     Average reward: 104.38
Episode 180     Last reward: 500.00     Average reward: 213.74
Episode 190     Last reward: 322.00     Average reward: 300.22
Episode 200     Last reward: 165.00     Average reward: 272.71
Episode 210     Last reward: 168.00     Average reward: 233.11
Episode 220     Last reward: 184.00     Average reward: 195.02
Episode 230     Last reward: 284.00     Average reward: 208.32
Episode 240     Last reward: 395.00     Average reward: 247.37
Episode 250     Last reward: 500.00     Average reward: 335.42
Episode 260     Last reward: 500.00     Average reward: 386.30
Episode 270     Last reward: 500.00     Average reward: 405.29
Episode 280     Last reward: 500.00     Average reward: 443.29
Episode 290     Last reward: 500.00     Average reward: 464.65
Solved! Running reward is now 475.3163778435275!

在這個範例中,我們展示了如何使用 RPC 作為通訊工具來跨 workers 傳遞資料,以及如何使用 RRef 來引用遠端物件。確實,您可以直接在 ProcessGroupsendrecv API 之上構建整個結構,或者使用其他通訊/RPC 函式庫。但是,透過使用 torch.distributed.rpc,您可以獲得原生支援以及底層持續優化的效能。

接下來,我們將展示如何將 RPC 和 RRef 與分散式 autograd 和分散式 optimizer 結合使用,以執行分散式模型並行訓練。

使用分散式 Autograd 和分散式 Optimizer 的分散式 RNN

在本節中,我們使用 RNN 模型來展示如何使用 RPC API 建構分散式模型並行訓練。範例 RNN 模型非常小,可以輕鬆地放入單個 GPU 中,但我們仍然將其層劃分到兩個不同的 workers 上來演示這個概念。開發人員可以應用類似的技術來跨多個裝置和機器分佈更大的模型。

RNN 模型設計是從 PyTorch 範例 儲存庫中的單字語言模型借用的,它包含三個主要元件:一個 embedding table、一個 LSTM 層和一個 decoder。下面的程式碼將 embedding table 和 decoder 包裹到子模組中,以便它們的建構子可以傳遞給 RPC API。在 EmbeddingTable 子模組中,我們故意將 Embedding 層放在 GPU 上以涵蓋使用案例。在 v1.4 中,RPC 始終在目標 worker 上建立 CPU 張量參數或傳回值。如果函數採用 GPU 張量,則需要明確地將其移動到適當的裝置。

class EmbeddingTable(nn.Module):
    r"""
    Encoding layers of the RNNModel
    """
    def __init__(self, ntoken, ninp, dropout):
        super(EmbeddingTable, self).__init__()
        self.drop = nn.Dropout(dropout)
        self.encoder = nn.Embedding(ntoken, ninp).cuda()
        self.encoder.weight.data.uniform_(-0.1, 0.1)

    def forward(self, input):
        return self.drop(self.encoder(input.cuda()).cpu()


class Decoder(nn.Module):
    def __init__(self, ntoken, nhid, dropout):
        super(Decoder, self).__init__()
        self.drop = nn.Dropout(dropout)
        self.decoder = nn.Linear(nhid, ntoken)
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-0.1, 0.1)

    def forward(self, output):
        return self.decoder(self.drop(output))

有了上述的子模組,我們現在可以使用 RPC 將它們組合在一起,創建一個 RNN 模型。在下面的程式碼中,ps 代表一個參數伺服器(parameter server),它託管了嵌入表(embedding table)和解碼器(decoder)的參數。建構子使用 remote API 在參數伺服器上建立一個 EmbeddingTable 物件和一個 Decoder 物件,並在本地建立 LSTM 子模組。在前向傳遞(forward pass)過程中,訓練器(trainer)使用 EmbeddingTableRRef 來找到遠端的子模組,並使用 RPC 將輸入數據傳遞給 EmbeddingTable,然後獲取查找結果。接著,它將嵌入(embedding)通過本地的 LSTM 層,最後使用另一個 RPC 將輸出發送到 Decoder 子模組。一般來說,為了實現分散式模型平行訓練,開發人員可以將模型分成子模組,調用 RPC 遠端建立子模組實例,並在必要時使用 RRef 找到它們。正如您在下面的程式碼中所見,它看起來非常類似於單機模型平行訓練。主要的區別是將 Tensor.to(device) 替換為 RPC 函式。

class RNNModel(nn.Module):
    def __init__(self, ps, ntoken, ninp, nhid, nlayers, dropout=0.5):
        super(RNNModel, self).__init__()

        # setup embedding table remotely
        self.emb_table_rref = rpc.remote(ps, EmbeddingTable, args=(ntoken, ninp, dropout))
        # setup LSTM locally
        self.rnn = nn.LSTM(ninp, nhid, nlayers, dropout=dropout)
        # setup decoder remotely
        self.decoder_rref = rpc.remote(ps, Decoder, args=(ntoken, nhid, dropout))

    def forward(self, input, hidden):
        # pass input to the remote embedding table and fetch emb tensor back
        emb = _remote_method(EmbeddingTable.forward, self.emb_table_rref, input)
        output, hidden = self.rnn(emb, hidden)
        # pass output to the rremote decoder and get the decoded output back
        decoded = _remote_method(Decoder.forward, self.decoder_rref, output)
        return decoded, hidden

在介紹分散式優化器(distributed optimizer)之前,讓我們先添加一個輔助函式來產生模型參數的 RRef 列表,這些 RRef 將被分散式優化器使用。在本地訓練中,應用程式可以調用 Module.parameters() 來獲取所有參數張量(parameter tensors)的參考,並將其傳遞給本地優化器以進行後續更新。但是,相同的 API 在分散式訓練場景中不起作用,因為某些參數位於遠端機器上。因此,分散式優化器不是採用參數 Tensors 的列表,而是採用 RRefs 的列表,每個模型參數對應一個 RRef,適用於本地和遠端模型參數。輔助函式非常簡單,只需調用 Module.parameters() 並在每個參數上創建一個本地的 RRef 即可。

def _parameter_rrefs(module):
    param_rrefs = []
    for param in module.parameters():
        param_rrefs.append(RRef(param))
    return param_rrefs

然後,由於 RNNModel 包含三個子模組,我們需要調用 _parameter_rrefs 三次,並將其包裝到另一個輔助函式中。

class RNNModel(nn.Module):
    ...
    def parameter_rrefs(self):
        remote_params = []
        # get RRefs of embedding table
        remote_params.extend(_remote_method(_parameter_rrefs, self.emb_table_rref))
        # create RRefs for local parameters
        remote_params.extend(_parameter_rrefs(self.rnn))
        # get RRefs of decoder
        remote_params.extend(_remote_method(_parameter_rrefs, self.decoder_rref))
        return remote_params

現在,我們準備好實現訓練迴圈了。在初始化模型參數後,我們建立 RNNModelDistributedOptimizer。分散式優化器將取得參數 RRefs 的列表,找到所有不同的擁有者 worker,並使用給定的參數(即 lr=0.05)在每個擁有者 worker 上建立給定的本地優化器(即,在此示例中為 SGD,您也可以使用其他本地優化器)。

在訓練迴圈中,它首先建立一個分散式自動微分(autograd)上下文,這將有助於分散式自動微分引擎找到梯度和涉及的 RPC 發送/接收函式。分散式自動微分引擎的設計細節可以在其設計說明中找到。然後,它啟動前向傳遞,就像它是本地模型一樣,並運行分散式反向傳遞(backward pass)。對於分散式反向傳遞,您只需要指定一個根(roots)的列表,在本例中,它是損失 Tensor。分散式自動微分引擎將自動遍歷分散式圖,並正確地寫入梯度。接下來,它在分散式優化器上運行 step 函式,該函式將聯絡所有涉及的本地優化器以更新模型參數。與本地訓練相比,一個小的差異是您不需要運行 zero_grad(),因為每個自動微分上下文都有專用的空間來儲存梯度,並且由於我們每次迭代都會建立一個上下文,因此來自不同迭代的梯度不會累積到同一組 Tensors 中。

def run_trainer():
    batch = 5
    ntoken = 10
    ninp = 2

    nhid = 3
    nindices = 3
    nlayers = 4
    hidden = (
        torch.randn(nlayers, nindices, nhid),
        torch.randn(nlayers, nindices, nhid)
    )

    model = rnn.RNNModel('ps', ntoken, ninp, nhid, nlayers)

    # setup distributed optimizer
    opt = DistributedOptimizer(
        optim.SGD,
        model.parameter_rrefs(),
        lr=0.05,
    )

    criterion = torch.nn.CrossEntropyLoss()

    def get_next_batch():
        for _ in range(5):
            data = torch.LongTensor(batch, nindices) % ntoken
            target = torch.LongTensor(batch, ntoken) % nindices
            yield data, target

    # train for 10 iterations
    for epoch in range(10):
        for data, target in get_next_batch():
            # create distributed autograd context
            with dist_autograd.context() as context_id:
                hidden[0].detach_()
                hidden[1].detach_()
                output, hidden = model(data, hidden)
                loss = criterion(output, target)
                # run distributed backward pass
                dist_autograd.backward(context_id, [loss])
                # run distributed optimizer
                opt.step(context_id)
                # not necessary to zero grads since they are
                # accumulated into the distributed autograd context
                # which is reset every iteration.
        print("Training epoch {}".format(epoch))

最後,讓我們添加一些膠水程式碼來啟動參數伺服器和訓練器進程。

def run_worker(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    if rank == 1:
        rpc.init_rpc("trainer", rank=rank, world_size=world_size)
        _run_trainer()
    else:
        rpc.init_rpc("ps", rank=rank, world_size=world_size)
        # parameter server do nothing
        pass

    # block until all rpcs finish
    rpc.shutdown()


if __name__=="__main__":
    world_size = 2
    mp.spawn(run_worker, args=(world_size, ), nprocs=world_size, join=True)

文件

取得 PyTorch 的完整開發人員文件

查看文件

教學

取得初學者和進階開發人員的深入教學課程

查看教學課程

資源

尋找開發資源並獲得您的問題解答

查看資源