分散式資料並行入門¶
建立於:2019 年 4 月 23 日 | 最後更新:2024 年 10 月 30 日 | 最後驗證:2024 年 11 月 05 日
作者: Shen Li
編輯: Joe Zhu, Chirag Pandya
注意
在 github 上檢視和編輯本教學文件。
先決條件
DistributedDataParallel (DDP) 是 PyTorch 中一個強大的模組,可讓您在多個機器上並行處理您的模型,使其非常適合大規模深度學習應用程式。 若要使用 DDP,您需要產生多個程序,並且每個程序建立一個 DDP 實例。
但它是如何運作的? DDP 使用來自 torch.distributed 套件的集體通訊 (collective communications) 來同步所有程序之間的梯度和緩衝區。 這表示每個程序都會有自己的模型副本,但它們會協同工作以訓練模型,就像它在單一機器上一樣。
為了實現這一點,DDP 為模型中的每個參數註冊一個 autograd hook。 當執行反向傳播時,此 hook 會觸發並觸發所有程序之間的梯度同步。 這可確保每個程序都具有相同的梯度,然後用於更新模型。
如需 DDP 如何運作以及如何有效使用它的更多資訊,請務必查看 DDP 設計筆記。 透過 DDP,您可以比以往更快、更有效率地訓練模型!
建議使用 DDP 的方式是為每個模型副本產生一個程序。 模型副本可以跨越多個裝置。 DDP 程序可以放置在同一部機器上或跨越多部機器。 請注意,GPU 裝置無法在 DDP 程序之間共享 (即一個 DDP 程序使用一個 GPU)。
在本教學文件中,我們將從基本的 DDP 用例開始,然後展示更進階的用例,包括檢查點模型以及將 DDP 與模型並行結合。
注意
本教學文件中的程式碼在 8-GPU 伺服器上執行,但可以輕鬆地推廣到其他環境。
DataParallel
和 DistributedDataParallel
之間的比較¶
在我們深入研究之前,讓我們先釐清為什麼您會考慮使用 DistributedDataParallel
而不是 DataParallel
,儘管它增加了複雜性
首先,
DataParallel
是單程序、多執行緒的,但它只能在單一機器上運作。 相反地,DistributedDataParallel
是多程序,並且同時支援單機器和多機器訓練。 由於執行緒之間的 GIL 爭用、每次迭代複製模型,以及由分散輸入和收集輸出所引入的額外負擔,即使在單一機器上,DataParallel
通常也比DistributedDataParallel
慢。回顧一下先前的教學,如果你的模型太大而無法容納在單個 GPU 上,你必須使用模型並行將其分割到多個 GPU 上。
DistributedDataParallel
可與模型並行一起使用,而DataParallel
目前則不行。當 DDP 與模型並行結合使用時,每個 DDP 進程都會使用模型並行,而所有進程整體上將使用資料並行。
基本使用案例¶
要建立一個 DDP 模組,你必須首先正確設定進程組。更多詳細資訊可以在使用 PyTorch 編寫分散式應用程式中找到。
import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
# On Windows platform, the torch.distributed package only
# supports Gloo backend, FileStore and TcpStore.
# For FileStore, set init_method parameter in init_process_group
# to a local file. Example as follow:
# init_method="file:///f:/libtmp/some_file"
# dist.init_process_group(
# "gloo",
# rank=rank,
# init_method=init_method,
# world_size=world_size)
# For TcpStore, same way as on Linux.
def setup(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# initialize the process group
dist.init_process_group("gloo", rank=rank, world_size=world_size)
def cleanup():
dist.destroy_process_group()
現在,讓我們建立一個玩具模組,用 DDP 包裝它,並餵給它一些虛擬輸入資料。請注意,由於 DDP 會將模型狀態從 rank 0 進程廣播到 DDP 建構函式中的所有其他進程,因此你無需擔心不同的 DDP 進程從不同的初始模型參數值開始。
class ToyModel(nn.Module):
def __init__(self):
super(ToyModel, self).__init__()
self.net1 = nn.Linear(10, 10)
self.relu = nn.ReLU()
self.net2 = nn.Linear(10, 5)
def forward(self, x):
return self.net2(self.relu(self.net1(x)))
def demo_basic(rank, world_size):
print(f"Running basic DDP example on rank {rank}.")
setup(rank, world_size)
# create model and move it to GPU with id rank
model = ToyModel().to(rank)
ddp_model = DDP(model, device_ids=[rank])
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
optimizer.zero_grad()
outputs = ddp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(rank)
loss_fn(outputs, labels).backward()
optimizer.step()
cleanup()
print(f"Finished running basic DDP example on rank {rank}.")
def run_demo(demo_fn, world_size):
mp.spawn(demo_fn,
args=(world_size,),
nprocs=world_size,
join=True)
如你所見,DDP 包裝了較低層級的分散式通訊細節,並提供了一個乾淨的 API,彷彿它是一個本地模型。梯度同步通訊發生在反向傳播期間,並與反向計算重疊。當 backward()
返回時,param.grad
已經包含同步的梯度張量。對於基本的使用案例,DDP 只需要幾行程式碼來設定進程組。當將 DDP 應用於更進階的使用案例時,一些注意事項需要謹慎處理。
處理速度偏差¶
在 DDP 中,建構函式、正向傳播和反向傳播都是分散式同步點。預期不同的進程啟動相同數量的同步,並以相同的順序到達這些同步點,並在大致相同的時間進入每個同步點。否則,快速的進程可能會提前到達並在等待落後者時逾時。因此,使用者有責任平衡跨進程的工作負載分配。有時,由於網路延遲、資源爭用或無法預測的工作負載高峰等原因,處理速度的偏差是不可避免的。為了避免在這些情況下逾時,請確保在呼叫 init_process_group 時傳遞足夠大的 timeout
值。
儲存和載入檢查點¶
通常使用 torch.save
和 torch.load
來在訓練期間檢查點模組,並從檢查點恢復。有關更多詳細資訊,請參閱儲存和載入模型。當使用 DDP 時,一種最佳化方法是僅在一個進程中儲存模型,然後在所有進程上載入它,從而減少寫入開銷。這是可行的,因為所有進程都從相同的參數開始,並且梯度在反向傳播中同步,因此最佳化器應保持將參數設定為相同的值。如果你使用此最佳化(即在一個進程上儲存,但在所有進程上恢復),請確保在儲存完成之前沒有進程開始載入。此外,當載入模組時,你需要提供適當的 map_location
參數,以防止進程進入其他進程的裝置。如果缺少 map_location
,torch.load
將首先將模組載入到 CPU,然後將每個參數複製到其儲存位置,這將導致同一機器上的所有進程使用同一組裝置。有關更進階的故障恢復和彈性支援,請參閱 TorchElastic。
def demo_checkpoint(rank, world_size):
print(f"Running DDP checkpoint example on rank {rank}.")
setup(rank, world_size)
model = ToyModel().to(rank)
ddp_model = DDP(model, device_ids=[rank])
CHECKPOINT_PATH = tempfile.gettempdir() + "/model.checkpoint"
if rank == 0:
# All processes should see same parameters as they all start from same
# random parameters and gradients are synchronized in backward passes.
# Therefore, saving it in one process is sufficient.
torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)
# Use a barrier() to make sure that process 1 loads the model after process
# 0 saves it.
dist.barrier()
# configure map_location properly
map_location = {'cuda:%d' % 0: 'cuda:%d' % rank}
ddp_model.load_state_dict(
torch.load(CHECKPOINT_PATH, map_location=map_location, weights_only=True))
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
optimizer.zero_grad()
outputs = ddp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(rank)
loss_fn(outputs, labels).backward()
optimizer.step()
# Not necessary to use a dist.barrier() to guard the file deletion below
# as the AllReduce ops in the backward pass of DDP already served as
# a synchronization.
if rank == 0:
os.remove(CHECKPOINT_PATH)
cleanup()
print(f"Finished running DDP checkpoint example on rank {rank}.")
將 DDP 與模型並行性結合使用¶
DDP 也適用於多 GPU 模型。當訓練具有大量資料的大型模型時,DDP 包裝多 GPU 模型尤其有用。
class ToyMpModel(nn.Module):
def __init__(self, dev0, dev1):
super(ToyMpModel, self).__init__()
self.dev0 = dev0
self.dev1 = dev1
self.net1 = torch.nn.Linear(10, 10).to(dev0)
self.relu = torch.nn.ReLU()
self.net2 = torch.nn.Linear(10, 5).to(dev1)
def forward(self, x):
x = x.to(self.dev0)
x = self.relu(self.net1(x))
x = x.to(self.dev1)
return self.net2(x)
當將多 GPU 模型傳遞給 DDP 時,不得設定 device_ids
和 output_device
。輸入和輸出資料將由應用程式或模型 forward()
方法放置在適當的裝置中。
def demo_model_parallel(rank, world_size):
print(f"Running DDP with model parallel example on rank {rank}.")
setup(rank, world_size)
# setup mp_model and devices for this process
dev0 = rank * 2
dev1 = rank * 2 + 1
mp_model = ToyMpModel(dev0, dev1)
ddp_mp_model = DDP(mp_model)
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_mp_model.parameters(), lr=0.001)
optimizer.zero_grad()
# outputs will be on dev1
outputs = ddp_mp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(dev1)
loss_fn(outputs, labels).backward()
optimizer.step()
cleanup()
print(f"Finished running DDP with model parallel example on rank {rank}.")
if __name__ == "__main__":
n_gpus = torch.cuda.device_count()
assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}"
world_size = n_gpus
run_demo(demo_basic, world_size)
run_demo(demo_checkpoint, world_size)
world_size = n_gpus//2
run_demo(demo_model_parallel, world_size)
使用 torch.distributed.run/torchrun 初始化 DDP¶
我們可以利用 PyTorch Elastic 來簡化 DDP 程式碼並更輕鬆地初始化作業。讓我們仍然使用 Toymodel 範例並建立一個名為 elastic_ddp.py
的檔案。
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
class ToyModel(nn.Module):
def __init__(self):
super(ToyModel, self).__init__()
self.net1 = nn.Linear(10, 10)
self.relu = nn.ReLU()
self.net2 = nn.Linear(10, 5)
def forward(self, x):
return self.net2(self.relu(self.net1(x)))
def demo_basic():
torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
dist.init_process_group("nccl")
rank = dist.get_rank()
print(f"Start running basic DDP example on rank {rank}.")
# create model and move it to GPU with id rank
device_id = rank % torch.cuda.device_count()
model = ToyModel().to(device_id)
ddp_model = DDP(model, device_ids=[device_id])
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
optimizer.zero_grad()
outputs = ddp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(device_id)
loss_fn(outputs, labels).backward()
optimizer.step()
dist.destroy_process_group()
print(f"Finished running basic DDP example on rank {rank}.")
if __name__ == "__main__":
demo_basic()
然後,可以在所有節點上執行 torch elastic/torchrun 命令,以初始化上面建立的 DDP 作業
torchrun --nnodes=2 --nproc_per_node=8 --rdzv_id=100 --rdzv_backend=c10d --rdzv_endpoint=$MASTER_ADDR:29400 elastic_ddp.py
在上面的範例中,我們在兩台主機上執行 DDP 腳本,並且我們在每台主機上執行 8 個進程。也就是說,我們在 16 個 GPU 上執行此作業。請注意,$MASTER_ADDR
在所有節點上必須相同。
在這裡,torchrun
將啟動 8 個進程,並在它啟動的節點上的每個進程上調用 elastic_ddp.py
,但使用者還需要應用像 slurm 這樣的叢集管理工具,才能真正在 2 個節點上執行此命令。
例如,在啟用 SLURM 的叢集上,我們可以編寫一個腳本來執行上面的命令,並將 MASTER_ADDR
設定為
export MASTER_ADDR=$(scontrol show hostname ${SLURM_NODELIST} | head -n 1)
然後,我們可以使用 SLURM 命令:srun --nodes=2 ./torchrun_script.sh
執行此腳本。
這只是一個範例;你可以選擇自己的叢集排程工具來啟動 torchrun
作業。
有關 Elastic run 的更多資訊,請參閱快速入門文件。