分散式 RPC 框架入門¶
建立於:2020 年 1 月 1 日 | 最後更新:2025 年 1 月 21 日 | 最後驗證:2024 年 11 月 5 日
作者:Shen Li
注意
在 github 中檢視和編輯本教學。
先決條件
本教學使用兩個簡單的範例來示範如何使用 torch.distributed.rpc 套件建立分散式訓練,該套件最初在 PyTorch v1.4 中作為實驗性功能推出。 這兩個範例的原始碼可以在 PyTorch 範例中找到。
之前的教學課程,分散式數據平行入門 和 使用 PyTorch 編寫分散式應用程式,描述了 DistributedDataParallel,它支援一種特定的訓練範例,其中模型在多個進程中複製,並且每個進程處理輸入數據的分割。 有時,您可能會遇到需要不同訓練範例的情況。 例如
在強化學習中,從環境中獲取訓練數據可能相對昂貴,而模型本身可能很小。 在這種情況下,產生多個並行運行的觀察器並共享單一代理可能很有用。 在這種情況下,代理在本地負責訓練,但應用程式仍然需要函式庫來在觀察器和訓練器之間發送和接收數據。
您的模型可能太大而無法放入單台機器上的 GPU 中,因此需要一個函式庫來幫助將模型拆分到多台機器上。 或者您可能正在實現一個 參數伺服器 訓練框架,其中模型參數和訓練器位於不同的機器上。
torch.distributed.rpc 套件可以幫助處理上述情況。 在情況 1 中,RPC 和 RRef 允許將數據從一個工作人員發送到另一個工作人員,同時輕鬆引用遠端數據物件。 在情況 2 中,分散式自動梯度 和 分散式優化器 使執行反向傳播和優化器步驟就像本地訓練一樣。 在接下來的兩個章節中,我們將使用強化學習範例和語言模型範例來示範 torch.distributed.rpc 的 API。 請注意,本教學不旨在建立最準確或最有效的模型來解決給定的問題,相反,這裡的主要目標是展示如何使用 torch.distributed.rpc 套件來建立分散式訓練應用程式。
使用 RPC 和 RRef 的分散式強化學習¶
本節描述使用 RPC 建立玩具分散式強化學習模型以從 OpenAI Gym 解決 CartPole-v1 的步驟。 策略程式碼主要借用自現有的單線程 範例,如下所示。 我們將跳過 Policy
設計的詳細資訊,並專注於 RPC 用法。
import torch.nn as nn
import torch.nn.functional as F
class Policy(nn.Module):
def __init__(self):
super(Policy, self).__init__()
self.affine1 = nn.Linear(4, 128)
self.dropout = nn.Dropout(p=0.6)
self.affine2 = nn.Linear(128, 2)
def forward(self, x):
x = self.affine1(x)
x = self.dropout(x)
x = F.relu(x)
action_scores = self.affine2(x)
return F.softmax(action_scores, dim=1)
我們準備好展示觀察器。 在此範例中,每個觀察器建立自己的環境,並等待代理的命令來運行一個 episode。 在每個 episode 中,一個觀察器最多循環 n_steps
次迭代,並且在每次迭代中,它使用 RPC 將其環境狀態傳遞給代理並取回一個動作。 然後,它將該動作應用於其環境,並從環境中獲得獎勵和下一個狀態。 之後,觀察器使用另一個 RPC 向代理報告獎勵。 再次,請注意,這顯然不是最有效的觀察器實現。 例如,一種簡單的優化可能是將當前狀態和最後一次獎勵打包在一個 RPC 中,以減少通信開銷。 但是,目標是示範 RPC API,而不是為 CartPole 建立最佳解算器。 因此,讓我們保持邏輯簡單,並在此範例中明確兩個步驟。
import argparse
import gym
import torch.distributed.rpc as rpc
parser = argparse.ArgumentParser(
description="RPC Reinforcement Learning Example",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument('--world_size', default=2, type=int, metavar='W',
help='number of workers')
parser.add_argument('--log_interval', type=int, default=10, metavar='N',
help='interval between training status logs')
parser.add_argument('--gamma', type=float, default=0.99, metavar='G',
help='how much to value future rewards')
parser.add_argument('--seed', type=int, default=1, metavar='S',
help='random seed for reproducibility')
args = parser.parse_args()
class Observer:
def __init__(self):
self.id = rpc.get_worker_info().id
self.env = gym.make('CartPole-v1')
self.env.seed(args.seed)
def run_episode(self, agent_rref):
state, ep_reward = self.env.reset(), 0
for _ in range(10000):
# send the state to the agent to get an action
action = agent_rref.rpc_sync().select_action(self.id, state)
# apply the action to the environment, and get the reward
state, reward, done, _ = self.env.step(action)
# report the reward to the agent for training purpose
agent_rref.rpc_sync().report_reward(self.id, reward)
# finishes after the number of self.env._max_episode_steps
if done:
break
Agent 的程式碼稍微複雜,我們會將其拆解成多個部分。在這個範例中,agent 同時作為訓練器和主控端,它會將命令傳送給多個分散式觀察者以執行 episodes,並且也會在本地記錄所有動作和獎勵,這些資訊將在每個 episode 之後的訓練階段中使用。下面的程式碼展示了 Agent
建構子,其中大部分的程式碼行都是在初始化各種元件。最後的迴圈會在其他 workers 上遠端初始化觀察者,並在本地持有這些觀察者的 RRefs
。agent 稍後會使用這些觀察者的 RRefs
來傳送命令。應用程式不需要擔心 RRefs
的生命週期。每個 RRef
的擁有者都會維護一個引用計數映射來追蹤其生命週期,並保證只要有任何該 RRef
的作用使用者存在,遠端的資料物件就不會被刪除。請參考 RRef
的 設計文件 以獲取更多詳細資訊。
import gym
import numpy as np
import torch
import torch.distributed.rpc as rpc
import torch.optim as optim
from torch.distributed.rpc import RRef, rpc_async, remote
from torch.distributions import Categorical
class Agent:
def __init__(self, world_size):
self.ob_rrefs = []
self.agent_rref = RRef(self)
self.rewards = {}
self.saved_log_probs = {}
self.policy = Policy()
self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-2)
self.eps = np.finfo(np.float32).eps.item()
self.running_reward = 0
self.reward_threshold = gym.make('CartPole-v1').spec.reward_threshold
for ob_rank in range(1, world_size):
ob_info = rpc.get_worker_info(OBSERVER_NAME.format(ob_rank))
self.ob_rrefs.append(remote(ob_info, Observer))
self.rewards[ob_info.id] = []
self.saved_log_probs[ob_info.id] = []
接下來,agent 暴露了兩個 API 給觀察者,用於選擇動作和報告獎勵。這些函數只會在 agent 本機上執行,但會由觀察者透過 RPC 觸發。
class Agent:
...
def select_action(self, ob_id, state):
state = torch.from_numpy(state).float().unsqueeze(0)
probs = self.policy(state)
m = Categorical(probs)
action = m.sample()
self.saved_log_probs[ob_id].append(m.log_prob(action))
return action.item()
def report_reward(self, ob_id, reward):
self.rewards[ob_id].append(reward)
讓我們在 agent 上新增一個 run_episode
函數,它會告訴所有觀察者執行一個 episode。在這個函數中,它首先建立一個清單來收集來自非同步 RPC 的 futures,然後迴圈遍歷所有觀察者的 RRefs
以進行非同步 RPC。在這些 RPC 中,agent 也將自身的一個 RRef
傳遞給觀察者,以便觀察者也可以呼叫 agent 上的函數。如上所示,每個觀察者都會向 agent 發出 RPC,這些 RPC 是巢狀 RPC。在每個 episode 之後,saved_log_probs
和 rewards
將包含記錄的動作機率和獎勵。
class Agent:
...
def run_episode(self):
futs = []
for ob_rref in self.ob_rrefs:
# make async RPC to kick off an episode on all observers
futs.append(
rpc_async(
ob_rref.owner(),
ob_rref.rpc_sync().run_episode,
args=(self.agent_rref,)
)
)
# wait until all obervers have finished this episode
for fut in futs:
fut.wait()
最後,在一個 episode 之後,agent 需要訓練模型,這是在下面的 finish_episode
函數中實作的。這個函數中沒有 RPC,而且大部分是從單執行緒的 範例 中借用的。因此,我們跳過對其內容的描述。
class Agent:
...
def finish_episode(self):
# joins probs and rewards from different observers into lists
R, probs, rewards = 0, [], []
for ob_id in self.rewards:
probs.extend(self.saved_log_probs[ob_id])
rewards.extend(self.rewards[ob_id])
# use the minimum observer reward to calculate the running reward
min_reward = min([sum(self.rewards[ob_id]) for ob_id in self.rewards])
self.running_reward = 0.05 * min_reward + (1 - 0.05) * self.running_reward
# clear saved probs and rewards
for ob_id in self.rewards:
self.rewards[ob_id] = []
self.saved_log_probs[ob_id] = []
policy_loss, returns = [], []
for r in rewards[::-1]:
R = r + args.gamma * R
returns.insert(0, R)
returns = torch.tensor(returns)
returns = (returns - returns.mean()) / (returns.std() + self.eps)
for log_prob, R in zip(probs, returns):
policy_loss.append(-log_prob * R)
self.optimizer.zero_grad()
policy_loss = torch.cat(policy_loss).sum()
policy_loss.backward()
self.optimizer.step()
return min_reward
有了 Policy
、Observer
和 Agent
類別,我們就可以啟動多個進程來執行分散式訓練。在這個範例中,所有進程都執行相同的 run_worker
函數,並且它們使用 rank 來區分它們的角色。Rank 0 永遠是 agent,而所有其他 rank 都是觀察者。Agent 作為 master,透過重複呼叫 run_episode
和 finish_episode
,直到運行的獎勵超過環境指定的獎勵閾值。所有觀察者都被動地等待來自 agent 的命令。程式碼被 rpc.init_rpc 和 rpc.shutdown 包裹,它們分別初始化和終止 RPC 實例。更多細節可在 API 頁面 中找到。
import os
from itertools import count
import torch.multiprocessing as mp
AGENT_NAME = "agent"
OBSERVER_NAME="obs{}"
def run_worker(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'
if rank == 0:
# rank0 is the agent
rpc.init_rpc(AGENT_NAME, rank=rank, world_size=world_size)
agent = Agent(world_size)
print(f"This will run until reward threshold of {agent.reward_threshold}"
" is reached. Ctrl+C to exit.")
for i_episode in count(1):
agent.run_episode()
last_reward = agent.finish_episode()
if i_episode % args.log_interval == 0:
print(f"Episode {i_episode}\tLast reward: {last_reward:.2f}\tAverage reward: "
f"{agent.running_reward:.2f}")
if agent.running_reward > agent.reward_threshold:
print(f"Solved! Running reward is now {agent.running_reward}!")
break
else:
# other ranks are the observer
rpc.init_rpc(OBSERVER_NAME.format(rank), rank=rank, world_size=world_size)
# observers passively waiting for instructions from the agent
# block until all rpcs finish, and shutdown the RPC instance
rpc.shutdown()
mp.spawn(
run_worker,
args=(args.world_size, ),
nprocs=args.world_size,
join=True
)
以下是一些使用 world_size=2 進行訓練時的範例輸出。
This will run until reward threshold of 475.0 is reached. Ctrl+C to exit.
Episode 10 Last reward: 26.00 Average reward: 10.01
Episode 20 Last reward: 16.00 Average reward: 11.27
Episode 30 Last reward: 49.00 Average reward: 18.62
Episode 40 Last reward: 45.00 Average reward: 26.09
Episode 50 Last reward: 44.00 Average reward: 30.03
Episode 60 Last reward: 111.00 Average reward: 42.23
Episode 70 Last reward: 131.00 Average reward: 70.11
Episode 80 Last reward: 87.00 Average reward: 76.51
Episode 90 Last reward: 86.00 Average reward: 95.93
Episode 100 Last reward: 13.00 Average reward: 123.93
Episode 110 Last reward: 33.00 Average reward: 91.39
Episode 120 Last reward: 73.00 Average reward: 76.38
Episode 130 Last reward: 137.00 Average reward: 88.08
Episode 140 Last reward: 89.00 Average reward: 104.96
Episode 150 Last reward: 97.00 Average reward: 98.74
Episode 160 Last reward: 150.00 Average reward: 100.87
Episode 170 Last reward: 126.00 Average reward: 104.38
Episode 180 Last reward: 500.00 Average reward: 213.74
Episode 190 Last reward: 322.00 Average reward: 300.22
Episode 200 Last reward: 165.00 Average reward: 272.71
Episode 210 Last reward: 168.00 Average reward: 233.11
Episode 220 Last reward: 184.00 Average reward: 195.02
Episode 230 Last reward: 284.00 Average reward: 208.32
Episode 240 Last reward: 395.00 Average reward: 247.37
Episode 250 Last reward: 500.00 Average reward: 335.42
Episode 260 Last reward: 500.00 Average reward: 386.30
Episode 270 Last reward: 500.00 Average reward: 405.29
Episode 280 Last reward: 500.00 Average reward: 443.29
Episode 290 Last reward: 500.00 Average reward: 464.65
Solved! Running reward is now 475.3163778435275!
在這個範例中,我們展示了如何使用 RPC 作為通訊工具來跨 workers 傳遞資料,以及如何使用 RRef 來引用遠端物件。確實,您可以直接在 ProcessGroup
的 send
和 recv
API 之上構建整個結構,或者使用其他通訊/RPC 函式庫。但是,透過使用 torch.distributed.rpc,您可以獲得原生支援以及底層持續優化的效能。
接下來,我們將展示如何將 RPC 和 RRef 與分散式 autograd 和分散式 optimizer 結合使用,以執行分散式模型並行訓練。
使用分散式 Autograd 和分散式 Optimizer 的分散式 RNN¶
在本節中,我們使用 RNN 模型來展示如何使用 RPC API 建構分散式模型並行訓練。範例 RNN 模型非常小,可以輕鬆地放入單個 GPU 中,但我們仍然將其層劃分到兩個不同的 workers 上來演示這個概念。開發人員可以應用類似的技術來跨多個裝置和機器分佈更大的模型。
RNN 模型設計是從 PyTorch 範例 儲存庫中的單字語言模型借用的,它包含三個主要元件:一個 embedding table、一個 LSTM
層和一個 decoder。下面的程式碼將 embedding table 和 decoder 包裹到子模組中,以便它們的建構子可以傳遞給 RPC API。在 EmbeddingTable
子模組中,我們故意將 Embedding
層放在 GPU 上以涵蓋使用案例。在 v1.4 中,RPC 始終在目標 worker 上建立 CPU 張量參數或傳回值。如果函數採用 GPU 張量,則需要明確地將其移動到適當的裝置。
class EmbeddingTable(nn.Module):
r"""
Encoding layers of the RNNModel
"""
def __init__(self, ntoken, ninp, dropout):
super(EmbeddingTable, self).__init__()
self.drop = nn.Dropout(dropout)
self.encoder = nn.Embedding(ntoken, ninp).cuda()
self.encoder.weight.data.uniform_(-0.1, 0.1)
def forward(self, input):
return self.drop(self.encoder(input.cuda()).cpu()
class Decoder(nn.Module):
def __init__(self, ntoken, nhid, dropout):
super(Decoder, self).__init__()
self.drop = nn.Dropout(dropout)
self.decoder = nn.Linear(nhid, ntoken)
self.decoder.bias.data.zero_()
self.decoder.weight.data.uniform_(-0.1, 0.1)
def forward(self, output):
return self.decoder(self.drop(output))
有了上述的子模組,我們現在可以使用 RPC 將它們組合在一起,創建一個 RNN 模型。在下面的程式碼中,ps
代表一個參數伺服器(parameter server),它託管了嵌入表(embedding table)和解碼器(decoder)的參數。建構子使用 remote API 在參數伺服器上建立一個 EmbeddingTable
物件和一個 Decoder
物件,並在本地建立 LSTM
子模組。在前向傳遞(forward pass)過程中,訓練器(trainer)使用 EmbeddingTable
的 RRef
來找到遠端的子模組,並使用 RPC 將輸入數據傳遞給 EmbeddingTable
,然後獲取查找結果。接著,它將嵌入(embedding)通過本地的 LSTM
層,最後使用另一個 RPC 將輸出發送到 Decoder
子模組。一般來說,為了實現分散式模型平行訓練,開發人員可以將模型分成子模組,調用 RPC 遠端建立子模組實例,並在必要時使用 RRef
找到它們。正如您在下面的程式碼中所見,它看起來非常類似於單機模型平行訓練。主要的區別是將 Tensor.to(device)
替換為 RPC 函式。
class RNNModel(nn.Module):
def __init__(self, ps, ntoken, ninp, nhid, nlayers, dropout=0.5):
super(RNNModel, self).__init__()
# setup embedding table remotely
self.emb_table_rref = rpc.remote(ps, EmbeddingTable, args=(ntoken, ninp, dropout))
# setup LSTM locally
self.rnn = nn.LSTM(ninp, nhid, nlayers, dropout=dropout)
# setup decoder remotely
self.decoder_rref = rpc.remote(ps, Decoder, args=(ntoken, nhid, dropout))
def forward(self, input, hidden):
# pass input to the remote embedding table and fetch emb tensor back
emb = _remote_method(EmbeddingTable.forward, self.emb_table_rref, input)
output, hidden = self.rnn(emb, hidden)
# pass output to the rremote decoder and get the decoded output back
decoded = _remote_method(Decoder.forward, self.decoder_rref, output)
return decoded, hidden
在介紹分散式優化器(distributed optimizer)之前,讓我們先添加一個輔助函式來產生模型參數的 RRef 列表,這些 RRef 將被分散式優化器使用。在本地訓練中,應用程式可以調用 Module.parameters()
來獲取所有參數張量(parameter tensors)的參考,並將其傳遞給本地優化器以進行後續更新。但是,相同的 API 在分散式訓練場景中不起作用,因為某些參數位於遠端機器上。因此,分散式優化器不是採用參數 Tensors
的列表,而是採用 RRefs
的列表,每個模型參數對應一個 RRef
,適用於本地和遠端模型參數。輔助函式非常簡單,只需調用 Module.parameters()
並在每個參數上創建一個本地的 RRef
即可。
def _parameter_rrefs(module):
param_rrefs = []
for param in module.parameters():
param_rrefs.append(RRef(param))
return param_rrefs
然後,由於 RNNModel
包含三個子模組,我們需要調用 _parameter_rrefs
三次,並將其包裝到另一個輔助函式中。
class RNNModel(nn.Module):
...
def parameter_rrefs(self):
remote_params = []
# get RRefs of embedding table
remote_params.extend(_remote_method(_parameter_rrefs, self.emb_table_rref))
# create RRefs for local parameters
remote_params.extend(_parameter_rrefs(self.rnn))
# get RRefs of decoder
remote_params.extend(_remote_method(_parameter_rrefs, self.decoder_rref))
return remote_params
現在,我們準備好實現訓練迴圈了。在初始化模型參數後,我們建立 RNNModel
和 DistributedOptimizer
。分散式優化器將取得參數 RRefs
的列表,找到所有不同的擁有者 worker,並使用給定的參數(即 lr=0.05
)在每個擁有者 worker 上建立給定的本地優化器(即,在此示例中為 SGD
,您也可以使用其他本地優化器)。
在訓練迴圈中,它首先建立一個分散式自動微分(autograd)上下文,這將有助於分散式自動微分引擎找到梯度和涉及的 RPC 發送/接收函式。分散式自動微分引擎的設計細節可以在其設計說明中找到。然後,它啟動前向傳遞,就像它是本地模型一樣,並運行分散式反向傳遞(backward pass)。對於分散式反向傳遞,您只需要指定一個根(roots)的列表,在本例中,它是損失 Tensor
。分散式自動微分引擎將自動遍歷分散式圖,並正確地寫入梯度。接下來,它在分散式優化器上運行 step
函式,該函式將聯絡所有涉及的本地優化器以更新模型參數。與本地訓練相比,一個小的差異是您不需要運行 zero_grad()
,因為每個自動微分上下文都有專用的空間來儲存梯度,並且由於我們每次迭代都會建立一個上下文,因此來自不同迭代的梯度不會累積到同一組 Tensors
中。
def run_trainer():
batch = 5
ntoken = 10
ninp = 2
nhid = 3
nindices = 3
nlayers = 4
hidden = (
torch.randn(nlayers, nindices, nhid),
torch.randn(nlayers, nindices, nhid)
)
model = rnn.RNNModel('ps', ntoken, ninp, nhid, nlayers)
# setup distributed optimizer
opt = DistributedOptimizer(
optim.SGD,
model.parameter_rrefs(),
lr=0.05,
)
criterion = torch.nn.CrossEntropyLoss()
def get_next_batch():
for _ in range(5):
data = torch.LongTensor(batch, nindices) % ntoken
target = torch.LongTensor(batch, ntoken) % nindices
yield data, target
# train for 10 iterations
for epoch in range(10):
for data, target in get_next_batch():
# create distributed autograd context
with dist_autograd.context() as context_id:
hidden[0].detach_()
hidden[1].detach_()
output, hidden = model(data, hidden)
loss = criterion(output, target)
# run distributed backward pass
dist_autograd.backward(context_id, [loss])
# run distributed optimizer
opt.step(context_id)
# not necessary to zero grads since they are
# accumulated into the distributed autograd context
# which is reset every iteration.
print("Training epoch {}".format(epoch))
最後,讓我們添加一些膠水程式碼來啟動參數伺服器和訓練器進程。
def run_worker(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'
if rank == 1:
rpc.init_rpc("trainer", rank=rank, world_size=world_size)
_run_trainer()
else:
rpc.init_rpc("ps", rank=rank, world_size=world_size)
# parameter server do nothing
pass
# block until all rpcs finish
rpc.shutdown()
if __name__=="__main__":
world_size = 2
mp.spawn(run_worker, args=(world_size, ), nprocs=world_size, join=True)