使用分散式 RPC 框架實作參數伺服器¶
建立於:2020 年 4 月 6 日 | 最後更新:2024 年 5 月 7 日 | 最後驗證:未驗證
作者: Rohan Varma
注意
在 github 中檢視和編輯本教學。
先決條件
本教學逐步介紹使用 PyTorch 的 分散式 RPC 框架 實作參數伺服器的簡單範例。 參數伺服器框架是一種範例,其中一組伺服器儲存參數(例如大型嵌入表),並且多個訓練器查詢參數伺服器以檢索最新的參數。 這些訓練器可以在本地運行訓練迴圈,並偶爾與參數伺服器同步以獲取最新的參數。 有關參數伺服器方法的更多閱讀,請查看 本文。
使用分散式 RPC 框架,我們將建立一個範例,其中多個訓練器使用 RPC 與同一個參數伺服器進行通訊,並使用 RRef 存取遠端參數伺服器實例上的狀態。 每個訓練器都將透過使用分散式 autograd 跨多個節點縫合 autograd 圖的方式,以分散式方式啟動其專用的反向傳遞。
注意:本教學涵蓋分散式 RPC 框架的使用,該框架對於將模型分割到多台機器上,或對於實作參數伺服器訓練策略(其中網路訓練器獲取託管在不同機器上的參數)很有用。 如果您正在尋找跨多個 GPU 複製模型,請參閱 分散式數據平行教學。 還有另一個 RPC 教學,涵蓋了強化學習和 RNN 用例。
讓我們從熟悉的開始:匯入我們需要的模組並定義一個簡單的 ConvNet,它將在 MNIST 數據集上進行訓練。 下面的網路很大程度上採用了 pytorch/examples repo 中定義的網路。
import argparse
import os
import time
from threading import Lock
import torch
import torch.distributed.autograd as dist_autograd
import torch.distributed.rpc as rpc
import torch.multiprocessing as mp
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
from torch.distributed.optim import DistributedOptimizer
from torchvision import datasets, transforms
# --------- MNIST Network to train, from pytorch/examples -----
class Net(nn.Module):
def __init__(self, num_gpus=0):
super(Net, self).__init__()
print(f"Using {num_gpus} GPUs to train")
self.num_gpus = num_gpus
device = torch.device(
"cuda:0" if torch.cuda.is_available() and self.num_gpus > 0 else "cpu")
print(f"Putting first 2 convs on {str(device)}")
# Put conv layers on the first cuda device, or CPU if no cuda device
self.conv1 = nn.Conv2d(1, 32, 3, 1).to(device)
self.conv2 = nn.Conv2d(32, 64, 3, 1).to(device)
# Put rest of the network on the 2nd cuda device, if there is one
if "cuda" in str(device) and num_gpus > 1:
device = torch.device("cuda:1")
print(f"Putting rest of layers on {str(device)}")
self.dropout1 = nn.Dropout2d(0.25).to(device)
self.dropout2 = nn.Dropout2d(0.5).to(device)
self.fc1 = nn.Linear(9216, 128).to(device)
self.fc2 = nn.Linear(128, 10).to(device)
def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
# Move tensor to next device if necessary
next_device = next(self.fc1.parameters()).device
x = x.to(next_device)
x = self.fc1(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output
接下來,讓我們定義一些有助於我們腳本其餘部分使用的輔助函數。 以下使用 rpc_sync 和 RRef 來定義一個函數,該函數在位於遠端節點上的物件上調用給定的方法。 在下面,我們對遠端物件的處理由 rref
參數給出,我們在其擁有節點上運行它:rref.owner()
。 在呼叫者節點上,我們透過使用 rpc_sync
同步運行此命令,這意味著我們將阻塞直到收到回應。
# --------- Helper Methods --------------------
# On the local node, call a method with first arg as the value held by the
# RRef. Other args are passed in as arguments to the function called.
# Useful for calling instance methods. method could be any matching function, including
# class methods.
def call_method(method, rref, *args, **kwargs):
return method(rref.local_value(), *args, **kwargs)
# Given an RRef, return the result of calling the passed in method on the value
# held by the RRef. This call is done on the remote node that owns
# the RRef and passes along the given argument.
# Example: If the value held by the RRef is of type Foo, then
# remote_method(Foo.bar, rref, arg1, arg2) is equivalent to calling
# <foo_instance>.bar(arg1, arg2) on the remote node and getting the result
# back.
def remote_method(method, rref, *args, **kwargs):
args = [method, rref] + list(args)
return rpc.rpc_sync(rref.owner(), call_method, args=args, kwargs=kwargs)
現在,我們準備好定義我們的參數伺服器。 我們將對 nn.Module
進行子類別化,並儲存對上面定義的網路的處理。 我們還將儲存一個輸入設備,該設備將是我們的輸入在調用模型之前傳輸到的設備。
# --------- Parameter Server --------------------
class ParameterServer(nn.Module):
def __init__(self, num_gpus=0):
super().__init__()
model = Net(num_gpus=num_gpus)
self.model = model
self.input_device = torch.device(
"cuda:0" if torch.cuda.is_available() and num_gpus > 0 else "cpu")
接下來,我們將定義我們的正向傳遞。 請注意,無論模型輸出的設備如何,我們都會將輸出移動到 CPU,因為分散式 RPC 框架目前僅支援透過 RPC 發送 CPU 張量。 由於呼叫者/被呼叫者上可能存在不同的設備 (CPU/GPU),因此我們有意禁用透過 RPC 發送 CUDA 張量,但在未來的版本中可能會支援。
class ParameterServer(nn.Module):
...
def forward(self, inp):
inp = inp.to(self.input_device)
out = self.model(inp)
# This output is forwarded over RPC, which as of 1.5.0 only accepts CPU tensors.
# Tensors must be moved in and out of GPU memory due to this.
out = out.to("cpu")
return out
接下來,我們將定義幾個有助於訓練和驗證的雜項函數。第一個是 get_dist_gradients
,它會接收一個 Distributed Autograd context ID,並呼叫 dist_autograd.get_gradients
API,以檢索由分散式自動微分計算的梯度。更多資訊可以在分散式自動微分文件中找到。請注意,我們也會迭代結果字典,並將每個張量轉換為 CPU 張量,因為框架目前僅支援透過 RPC 發送張量。接下來,get_param_rrefs
將迭代我們的模型參數,並將它們包裝為 (本地) RRef。此方法將由訓練器節點透過 RPC 呼叫,並將傳回要優化的參數列表。這是 Distributed Optimizer 的必要輸入,它要求所有要優化的參數都作為 RRef
的列表。
# Use dist autograd to retrieve gradients accumulated for this model.
# Primarily used for verification.
def get_dist_gradients(self, cid):
grads = dist_autograd.get_gradients(cid)
# This output is forwarded over RPC, which as of 1.5.0 only accepts CPU tensors.
# Tensors must be moved in and out of GPU memory due to this.
cpu_grads = {}
for k, v in grads.items():
k_cpu, v_cpu = k.to("cpu"), v.to("cpu")
cpu_grads[k_cpu] = v_cpu
return cpu_grads
# Wrap local parameters in a RRef. Needed for building the
# DistributedOptimizer which optimizes paramters remotely.
def get_param_rrefs(self):
param_rrefs = [rpc.RRef(param) for param in self.model.parameters()]
return param_rrefs
最後,我們將建立初始化參數伺服器的方法。請注意,在所有進程中只會有一個參數伺服器的實例,並且所有訓練器都將與同一個參數伺服器通訊並更新相同的儲存模型。如 run_parameter_server
中所見,伺服器本身不採取任何獨立的動作;它等待來自訓練器(尚未定義)的請求,並透過執行請求的函數來回應它們。
# The global parameter server instance.
param_server = None
# A lock to ensure we only have one parameter server.
global_lock = Lock()
def get_parameter_server(num_gpus=0):
"""
Returns a singleton parameter server to all trainer processes
"""
global param_server
# Ensure that we get only one handle to the ParameterServer.
with global_lock:
if not param_server:
# construct it once
param_server = ParameterServer(num_gpus=num_gpus)
return param_server
def run_parameter_server(rank, world_size):
# The parameter server just acts as a host for the model and responds to
# requests from trainers.
# rpc.shutdown() will wait for all workers to complete by default, which
# in this case means that the parameter server will wait for all trainers
# to complete, and then exit.
print("PS master initializing RPC")
rpc.init_rpc(name="parameter_server", rank=rank, world_size=world_size)
print("RPC initialized! Running parameter server...")
rpc.shutdown()
print("RPC shutdown on parameter server.")
請注意,在上面,rpc.shutdown()
不會立即關閉參數伺服器。相反,它將等待所有 worker(在本例中為訓練器)也呼叫 rpc.shutdown()
。這保證了參數伺服器不會在所有訓練器(尚未定義)完成其訓練過程之前離線。
接下來,我們將定義我們的 TrainerNet
類別。這也將是 nn.Module
的子類別,並且我們的 __init__
方法將使用 rpc.remote
API 來取得對我們的參數伺服器的 RRef(或 Remote Reference)。請注意,在這裡我們沒有將參數伺服器複製到我們的本地進程,相反,我們可以將 self.param_server_rref
視為指向在單獨進程上執行的參數伺服器的分散式共享指標。
# --------- Trainers --------------------
# nn.Module corresponding to the network trained by this trainer. The
# forward() method simply invokes the network on the given parameter
# server.
class TrainerNet(nn.Module):
def __init__(self, num_gpus=0):
super().__init__()
self.num_gpus = num_gpus
self.param_server_rref = rpc.remote(
"parameter_server", get_parameter_server, args=(num_gpus,))
接下來,我們將定義一個名為 get_global_param_rrefs
的方法。為了激勵對此方法的需求,值得閱讀關於 DistributedOptimizer 的文件,特別是 API 簽章。優化器必須傳遞一個 RRef
列表,這些 RRef 對應於要優化的遠端參數,因此在這裡我們獲得必要的 RRef
。由於給定的 TrainerNet
與之交互的唯一遠端 worker 是 ParameterServer
,我們只需在 ParameterServer
上呼叫一個 remote_method
。我們使用在 ParameterServer
類別中定義的 get_param_rrefs
方法。此方法將傳回一個 RRef
列表,這些 RRef 指向需要優化的參數。請注意,在這種情況下,我們的 TrainerNet
沒有定義自己的參數;如果它定義了自己的參數,我們也需要將每個參數包裝在一個 RRef
中,並將其包含在我們的 DistributedOptimizer
的輸入中。
class TrainerNet(nn.Module):
...
def get_global_param_rrefs(self):
remote_params = remote_method(
ParameterServer.get_param_rrefs,
self.param_server_rref)
return remote_params
現在,我們準備好定義我們的 forward
方法,它將呼叫(同步)RPC 來執行在 ParameterServer
上定義的網路的前向傳遞。請注意,我們傳入 self.param_server_rref
,這是我們 ParameterServer
的遠端句柄,到我們的 RPC 呼叫。此呼叫將向我們的 ParameterServer
正在執行的節點發送一個 RPC,呼叫 forward
傳遞,並傳回對應於模型輸出的 Tensor
。
class TrainerNet(nn.Module):
...
def forward(self, x):
model_output = remote_method(
ParameterServer.forward, self.param_server_rref, x)
return model_output
在我們的訓練器完全定義後,現在是時候編寫我們的神經網路訓練迴圈了,它將建立我們的網路和優化器,透過網路運行一些輸入並計算損失。訓練迴圈看起來很像本地訓練程式的迴圈,但由於我們的網路分散在多台機器上,因此做了一些修改。
在下面,我們初始化我們的 TrainerNet
並建立一個 DistributedOptimizer
。請注意,正如上面提到的,我們必須傳入所有我們想要優化的全域(跨所有參與分散式訓練的節點)參數。此外,我們傳入要使用的本地優化器,在本例中為 SGD。請注意,我們可以以與建立本地優化器相同的方式配置底層優化器演算法 - optimizer.SGD
的所有參數都將被正確轉發。作為一個範例,我們傳入一個自訂學習率,它將用作所有本地優化器的學習率。
def run_training_loop(rank, num_gpus, train_loader, test_loader):
# Runs the typical nueral network forward + backward + optimizer step, but
# in a distributed fashion.
net = TrainerNet(num_gpus=num_gpus)
# Build DistributedOptimizer.
param_rrefs = net.get_global_param_rrefs()
opt = DistributedOptimizer(optim.SGD, param_rrefs, lr=0.03)
接下來,我們定義我們的主訓練迴圈。我們循環遍歷 PyTorch 的 DataLoader 給出的迭代器。在編寫我們典型的向前/向後/優化器迴圈之前,我們首先將邏輯包裝在 Distributed Autograd context 中。請注意,這是記錄在模型的前向傳遞中呼叫的 RPC 所必需的,以便可以建構一個適當的圖,其中包括向後傳遞中的所有參與分散式 worker。分散式自動微分 context 傳回一個 context_id
,它用作累積和優化對應於特定迭代的梯度的識別碼。
與呼叫典型的 loss.backward()
(這將在此本地 worker 上啟動向後傳遞)相反,我們呼叫 dist_autograd.backward()
並傳入我們的 context_id 以及 loss
,它是我們希望向後傳遞開始的根。此外,我們將此 context_id
傳遞到我們的優化器呼叫中,這是能夠查閱由跨所有節點的此特定向後傳遞計算的相應梯度所必需的。
def run_training_loop(rank, num_gpus, train_loader, test_loader):
...
for i, (data, target) in enumerate(train_loader):
with dist_autograd.context() as cid:
model_output = net(data)
target = target.to(model_output.device)
loss = F.nll_loss(model_output, target)
if i % 5 == 0:
print(f"Rank {rank} training batch {i} loss {loss.item()}")
dist_autograd.backward(cid, [loss])
# Ensure that dist autograd ran successfully and gradients were
# returned.
assert remote_method(
ParameterServer.get_dist_gradients,
net.param_server_rref,
cid) != {}
opt.step(cid)
print("Training complete!")
print("Getting accuracy....")
get_accuracy(test_loader, net)
以下內容僅僅是在我們完成訓練後計算我們模型的準確性,很像傳統的本地模型。但是,請注意,我們傳遞到上面這個函數中的 net
是 TrainerNet
的一個實例,因此前向傳遞以透明的方式呼叫 RPC。
def get_accuracy(test_loader, model):
model.eval()
correct_sum = 0
# Use GPU to evaluate if possible
device = torch.device("cuda:0" if model.num_gpus > 0
and torch.cuda.is_available() else "cpu")
with torch.no_grad():
for i, (data, target) in enumerate(test_loader):
out = model(data, -1)
pred = out.argmax(dim=1, keepdim=True)
pred, target = pred.to(device), target.to(device)
correct = pred.eq(target.view_as(pred)).sum().item()
correct_sum += correct
print(f"Accuracy {correct_sum / len(test_loader.dataset)}")
接下來,類似於我們如何將 run_parameter_server
定義為我們的 ParameterServer
的主迴圈,它負責初始化 RPC,讓我們為我們的訓練器定義一個類似的迴圈。不同之處在於,我們的訓練器必須運行我們上面定義的訓練迴圈。
# Main loop for trainers.
def run_worker(rank, world_size, num_gpus, train_loader, test_loader):
print(f"Worker rank {rank} initializing RPC")
rpc.init_rpc(
name=f"trainer_{rank}",
rank=rank,
world_size=world_size)
print(f"Worker {rank} done initializing RPC")
run_training_loop(rank, num_gpus, train_loader, test_loader)
rpc.shutdown()
請注意,與 run_parameter_server
類似,預設情況下,rpc.shutdown()
會等待所有工作節點(包含訓練器和參數伺服器)都呼叫 rpc.shutdown()
之後,此節點才會退出。這確保了節點會優雅地終止,且不會發生某個節點在另一個節點預期它在線上時離線的情況。
我們現在已完成訓練器和參數伺服器專用的程式碼,剩下的就是加入啟動訓練器和參數伺服器的程式碼。首先,我們必須接收適用於參數伺服器和訓練器的各種引數。world_size
對應於將參與訓練的節點總數,是所有訓練器和參數伺服器的總和。我們還必須為每個個別進程傳入一個唯一的 rank
,從 0(我們將在此執行單個參數伺服器)到 world_size - 1
。master_addr
和 master_port
是可用於識別 rank 0 進程在哪裡執行的引數,並且將被個別節點用於互相發現。若要在本地測試此範例,只需將 localhost
和相同的 master_port
傳遞給所有產生的執行個體。請注意,為了示範目的,此範例僅支援 0-2 個 GPU,儘管可以擴展此模式以使用更多 GPU。
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description="Parameter-Server RPC based training")
parser.add_argument(
"--world_size",
type=int,
default=4,
help="""Total number of participating processes. Should be the sum of
master node and all training nodes.""")
parser.add_argument(
"--rank",
type=int,
default=None,
help="Global rank of this process. Pass in 0 for master.")
parser.add_argument(
"--num_gpus",
type=int,
default=0,
help="""Number of GPUs to use for training, Currently supports between 0
and 2 GPUs. Note that this argument will be passed to the parameter servers.""")
parser.add_argument(
"--master_addr",
type=str,
default="localhost",
help="""Address of master, will default to localhost if not provided.
Master must be able to accept network traffic on the address + port.""")
parser.add_argument(
"--master_port",
type=str,
default="29500",
help="""Port that master is listening on, will default to 29500 if not
provided. Master must be able to accept network traffic on the host and port.""")
args = parser.parse_args()
assert args.rank is not None, "must provide rank argument."
assert args.num_gpus <= 3, f"Only 0-2 GPUs currently supported (got {args.num_gpus})."
os.environ['MASTER_ADDR'] = args.master_addr
os.environ["MASTER_PORT"] = args.master_port
現在,我們將根據命令列引數建立一個對應於參數伺服器或訓練器的進程。如果我們傳入的 rank 為 0,我們將建立一個 ParameterServer
,否則建立一個 TrainerNet
。請注意,我們正在使用 torch.multiprocessing
來啟動一個對應於我們要執行的函式的子進程,並使用 p.join()
從主執行緒等待此進程完成。在初始化訓練器的情況下,我們還使用 PyTorch 的 資料載入器,以便在 MNIST 資料集上指定訓練和測試資料載入器。
processes = []
world_size = args.world_size
if args.rank == 0:
p = mp.Process(target=run_parameter_server, args=(0, world_size))
p.start()
processes.append(p)
else:
# Get data to train on
train_loader = torch.utils.data.DataLoader(
datasets.MNIST('../data', train=True, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])),
batch_size=32, shuffle=True,)
test_loader = torch.utils.data.DataLoader(
datasets.MNIST(
'../data',
train=False,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])),
batch_size=32,
shuffle=True,
)
# start training worker on this node
p = mp.Process(
target=run_worker,
args=(
args.rank,
world_size, args.num_gpus,
train_loader,
test_loader))
p.start()
processes.append(p)
for p in processes:
p.join()
若要在本地執行此範例,請在不同的終端機視窗中,針對伺服器和您想要產生的每個工作節點執行以下命令:python rpc_parameter_server.py --world_size=WORLD_SIZE --rank=RANK
。例如,對於 world size 為 2 的主節點,命令將為 python rpc_parameter_server.py --world_size=2 --rank=0
。然後可以使用命令 python rpc_parameter_server.py --world_size=2 --rank=1
在另一個視窗中啟動訓練器,這將開始使用一個伺服器和一個訓練器進行訓練。請注意,本教學課程假設訓練發生在使用 0 到 2 個 GPU 之間,並且可以通過將 --num_gpus=N
傳遞到訓練腳本來配置此引數。
您可以傳入命令列引數 --master_addr=ADDRESS
和 --master_port=PORT
來指示主工作節點正在監聽的位址和端口,例如,測試訓練器和主節點在不同機器上運行的功能。