• 教學 >
  • 使用 PyTorch 編寫分散式應用程式
捷徑

使用 PyTorch 編寫分散式應用程式

建立日期:2017 年 10 月 06 日 | 最後更新:2025 年 1 月 13 日 | 最後驗證:2024 年 11 月 05 日

作者Séb Arnold

注意

editgithub 中檢視和編輯本教學。

先決條件

在本簡短的教學中,我們將介紹 PyTorch 的分散式套件。我們將了解如何設定分散式環境、使用不同的通訊策略,並深入研究套件的一些內部結構。

設定

PyTorch 中包含的分散式套件 (即 torch.distributed) 使研究人員和從業者可以輕鬆地跨處理序和機器叢集平行化其計算。為此,它利用訊息傳遞語義,允許每個處理序將資料通訊到任何其他處理序。與多處理 (torch.multiprocessing) 套件相反,處理序可以使用不同的通訊後端,並且不限於在同一機器上執行。

為了開始使用,我們需要同時執行多個處理序的能力。如果您有權存取計算叢集,您應該諮詢您的本地系統管理員或使用您最喜歡的協調工具 (例如,pdshclustershellslurm)。為了本教學的目的,我們將使用單一機器並使用以下範本產生多個處理序。

"""run.py:"""
#!/usr/bin/env python
import os
import sys
import torch
import torch.distributed as dist
import torch.multiprocessing as mp

def run(rank, size):
    """ Distributed function to be implemented later. """
    pass

def init_process(rank, size, fn, backend='gloo'):
    """ Initialize the distributed environment. """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)


if __name__ == "__main__":
    world_size = 2
    processes = []
    if "google.colab" in sys.modules:
        print("Running in Google Colab")
        mp.get_context("spawn")
    else:
        mp.set_start_method("spawn")
    for rank in range(size):
        p = mp.Process(target=init_process, args=(rank, world_size, run))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

上面的腳本產生兩個處理序,每個處理序都將設定分散式環境、初始化處理序群組 (dist.init_process_group),並最終執行給定的 run 函數。

讓我們看看 init_process 函數。它確保每個處理序都能夠透過主機,使用相同的 IP 位址和埠進行協調。請注意,我們使用了 gloo 後端,但還有其他後端可用。(參見 第 5.1 節)我們將在本教學的結尾介紹 dist.init_process_group 中發生的魔法,但它本質上允許處理序透過共享其位置來彼此通訊。

點對點通訊

Send and Recv

傳送和接收

從一個處理序到另一個處理序的資料傳輸稱為點對點通訊。這些是透過 sendrecv 函數或它們的立即對應物 isendirecv 實現的。

"""Blocking point-to-point communication."""

def run(rank, size):
    tensor = torch.zeros(1)
    if rank == 0:
        tensor += 1
        # Send the tensor to process 1
        dist.send(tensor=tensor, dst=1)
    else:
        # Receive tensor from process 0
        dist.recv(tensor=tensor, src=0)
    print('Rank ', rank, ' has data ', tensor[0])

在上面的範例中,兩個處理序都從零張量開始,然後處理序 0 遞增張量並將其傳送給處理序 1,以便它們都以 1.0 結束。請注意,處理序 1 需要分配記憶體才能儲存它將接收的資料。

另請注意,send/recv阻塞的:兩個處理序都會阻塞,直到通訊完成。另一方面,立即呼叫是非阻塞的;腳本繼續執行,並且這些方法會傳回一個 Work 物件,我們可以在其上選擇 wait()

"""Non-blocking point-to-point communication."""

def run(rank, size):
    tensor = torch.zeros(1)
    req = None
    if rank == 0:
        tensor += 1
        # Send the tensor to process 1
        req = dist.isend(tensor=tensor, dst=1)
        print('Rank 0 started sending')
    else:
        # Receive tensor from process 0
        req = dist.irecv(tensor=tensor, src=0)
        print('Rank 1 started receiving')
    req.wait()
    print('Rank ', rank, ' has data ', tensor[0])

使用立即呼叫時,我們必須小心使用傳送和接收的張量的方式。由於我們不知道資料何時會通訊到另一個處理序,因此我們不應在 req.wait() 完成之前修改傳送的張量或存取接收的張量。換句話說,

  • dist.isend() 之後寫入 tensor 將導致未定義的行為。

  • dist.irecv() 之後從 tensor 讀取將導致未定義的行為,直到執行 req.wait() 為止。

然而,在執行 req.wait() 之後,我們可以保證通訊已經發生,且 tensor[0] 中儲存的值為 1.0。

當我們想要更細緻地控制進程之間的通訊時,點對點通訊非常有用。 它們可用於實現複雜的演算法,例如 Baidu 的 DeepSpeechFacebook 的大規模實驗中所使用的演算法。(參考 第 4.1 節

集體通訊

Scatter

Scatter (分散)

Gather

Gather (收集)

Reduce

Reduce (歸約)

All-Reduce

All-Reduce (全歸約)

Broadcast

Broadcast (廣播)

All-Gather

All-Gather (全收集)

與點對點通訊相反,集體通訊允許跨 **群組** 中所有進程的通訊模式。 群組是所有進程的子集。 為了創建群組,我們可以將排名列表傳遞給 dist.new_group(group)。 預設情況下,集體通訊在所有進程上執行,也稱為 **world** (世界)。 例如,為了獲得所有進程上所有張量的總和,我們可以使用 dist.all_reduce(tensor, op, group) 集體通訊。

""" All-Reduce example."""
def run(rank, size):
    """ Simple collective communication. """
    group = dist.new_group([0, 1])
    tensor = torch.ones(1)
    dist.all_reduce(tensor, op=dist.ReduceOp.SUM, group=group)
    print('Rank ', rank, ' has data ', tensor[0])

由於我們想要群組中所有張量的總和,因此我們使用 dist.ReduceOp.SUM 作為歸約運算子。 一般來說,任何可交換的數學運算都可以用作運算子。 PyTorch 開箱即用就提供了許多此類運算子,它們都在元素層級上工作

  • dist.ReduceOp.SUM,

  • dist.ReduceOp.PRODUCT,

  • dist.ReduceOp.MAX,

  • dist.ReduceOp.MIN,

  • dist.ReduceOp.BAND,

  • dist.ReduceOp.BOR,

  • dist.ReduceOp.BXOR,

  • dist.ReduceOp.PREMUL_SUM.

支援的運算子的完整列表請參考這裡

除了 dist.all_reduce(tensor, op, group) 之外,PyTorch 目前還實現了許多其他的集體通訊。 以下是一些支援的集體通訊。

  • dist.broadcast(tensor, src, group):將 tensorsrc 複製到所有其他進程。

  • dist.reduce(tensor, dst, op, group):將 op 應用於每個 tensor 並將結果儲存在 dst 中。

  • dist.all_reduce(tensor, op, group):與 reduce 相同,但結果儲存在所有進程中。

  • dist.scatter(tensor, scatter_list, src, group):將 \(i^{\text{th}}\) 個張量 scatter_list[i] 複製到第 \(i^{\text{th}}\) 個進程。

  • dist.gather(tensor, gather_list, dst, group):從 dst 中的所有進程複製 tensor

  • dist.all_gather(tensor_list, tensor, group):將所有進程中的 tensor 複製到所有進程上的 tensor_list

  • dist.barrier(group):阻塞 group 中的所有進程,直到每個進程都進入此函數。

  • dist.all_to_all(output_tensor_list, input_tensor_list, group):將輸入張量列表分散到群組中的所有進程,並在輸出列表中傳回收集到的張量列表。

支援的集體通訊的完整列表可以在 PyTorch Distributed 的最新文檔中找到 (連結)

分散式訓練

**注意:** 您可以在 這個 GitHub 儲存庫中找到本節的範例腳本。

現在我們了解了分散式模組的工作原理,讓我們用它寫一些有用的東西。 我們的目標是複製 DistributedDataParallel 的功能。 當然,這將是一個教學範例,在真實情況下,您應該使用上面連結的官方、經過良好測試和良好優化的版本。

很簡單,我們想要實現隨機梯度下降的分散式版本。 我們的腳本將讓所有進程計算其模型在其批次資料上的梯度,然後平均其梯度。 為了確保在更改進程數量時獲得相似的收斂結果,我們首先必須分割我們的資料集。 (您也可以使用 torch.utils.data.random_split,而不是下面的程式碼片段。)

""" Dataset partitioning helper """
class Partition(object):

    def __init__(self, data, index):
        self.data = data
        self.index = index

    def __len__(self):
        return len(self.index)

    def __getitem__(self, index):
        data_idx = self.index[index]
        return self.data[data_idx]


class DataPartitioner(object):

    def __init__(self, data, sizes=[0.7, 0.2, 0.1], seed=1234):
        self.data = data
        self.partitions = []
        rng = Random()  # from random import Random
        rng.seed(seed)
        data_len = len(data)
        indexes = [x for x in range(0, data_len)]
        rng.shuffle(indexes)

        for frac in sizes:
            part_len = int(frac * data_len)
            self.partitions.append(indexes[0:part_len])
            indexes = indexes[part_len:]

    def use(self, partition):
        return Partition(self.data, self.partitions[partition])

使用上面的程式碼片段,我們現在可以簡單地使用以下幾行程式碼分割任何資料集

""" Partitioning MNIST """
def partition_dataset():
    dataset = datasets.MNIST('./data', train=True, download=True,
                             transform=transforms.Compose([
                                 transforms.ToTensor(),
                                 transforms.Normalize((0.1307,), (0.3081,))
                             ]))
    size = dist.get_world_size()
    bsz = 128 // size
    partition_sizes = [1.0 / size for _ in range(size)]
    partition = DataPartitioner(dataset, partition_sizes)
    partition = partition.use(dist.get_rank())
    train_set = torch.utils.data.DataLoader(partition,
                                         batch_size=bsz,
                                         shuffle=True)
    return train_set, bsz

假設我們有 2 個副本,那麼每個進程將有一個 60000 / 2 = 30000 個樣本的 train_set。 我們還將批次大小除以副本數量,以維持 128 的批次大小。

我們現在可以編寫我們常用的 forward-backward-optimize 訓練程式碼,並新增一個函數呼叫來平均我們模型的梯度。 (以下內容很大程度上受到官方 PyTorch MNIST 範例的啟發。)

""" Distributed Synchronous SGD Example """
def run(rank, size):
    torch.manual_seed(1234)
    train_set, bsz = partition_dataset()
    model = Net()
    optimizer = optim.SGD(model.parameters(),
                          lr=0.01, momentum=0.5)

    num_batches = ceil(len(train_set.dataset) / float(bsz))
    for epoch in range(10):
        epoch_loss = 0.0
        for data, target in train_set:
            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            epoch_loss += loss.item()
            loss.backward()
            average_gradients(model)
            optimizer.step()
        print('Rank ', dist.get_rank(), ', epoch ',
              epoch, ': ', epoch_loss / num_batches)

剩下的就是實現 average_gradients(model) 函數,它只是接受一個模型並在整個 world (世界) 中平均其梯度。

""" Gradient averaging. """
def average_gradients(model):
    size = float(dist.get_world_size())
    for param in model.parameters():
        dist.all_reduce(param.grad.data, op=dist.ReduceOp.SUM)
        param.grad.data /= size

萬歲! 我們成功地實現了分散式同步 SGD,並且可以在大型電腦叢集上訓練任何模型。

**注意:** 雖然最後一句話在技術上是正確的,但需要更多的技巧才能實現生產級別的同步 SGD 實現。 再次強調,請使用 已經過測試和優化 的版本。

我們自己的環狀 Allreduce

作為額外的挑戰,想像一下我們想要實現 DeepSpeech 的高效環狀 allreduce。 使用點對點集體通訊可以相當容易地實現這一點。

""" Implementation of a ring-reduce with addition. """
def allreduce(send, recv):
   rank = dist.get_rank()
   size = dist.get_world_size()
   send_buff = send.clone()
   recv_buff = send.clone()
   accum = send.clone()

   left = ((rank - 1) + size) % size
   right = (rank + 1) % size

   for i in range(size - 1):
       if i % 2 == 0:
           # Send send_buff
           send_req = dist.isend(send_buff, right)
           dist.recv(recv_buff, left)
           accum[:] += recv_buff[:]
       else:
           # Send recv_buff
           send_req = dist.isend(recv_buff, right)
           dist.recv(send_buff, left)
           accum[:] += send_buff[:]
       send_req.wait()
   recv[:] = accum[:]

在上面的腳本中,allreduce(send, recv) 函數的簽名與 PyTorch 中的簽名略有不同。 它接受一個 recv 張量,並將所有 send 張量的總和儲存在其中。 作為留給讀者的練習,我們的版本與 DeepSpeech 中的版本仍然存在一個差異:他們的實現將梯度張量分成區塊,以便最佳地利用通訊頻寬。 (提示:torch.chunk

進階主題

現在我們準備好探索 torch.distributed 的一些更進階的功能。由於內容很多,本節分為兩個小節

  1. 通訊後端:我們將學習如何使用 MPI 和 Gloo 進行 GPU-GPU 通訊。

  2. 初始化方法:我們將了解如何在 dist.init_process_group() 中最佳地設定初始協調階段。

通訊後端

torch.distributed 最優雅的方面之一是它能夠抽象化並建立在不同的後端之上。如前所述,PyTorch 中實現了多個後端。其中一些最受歡迎的是 Gloo、NCCL 和 MPI。它們各自具有不同的規格和權衡,取決於所需的使用情境。支援功能的比較表可以在這裡找到。

Gloo 後端

到目前為止,我們已經廣泛使用 Gloo 後端。它作為一個開發平台非常方便,因為它包含在預編譯的 PyTorch 二進位檔案中,並且適用於 Linux (自 0.2 版起) 和 macOS (自 1.3 版起)。它支援 CPU 上的所有點對點和集體運算,以及 GPU 上的所有集體運算。 CUDA tensors 的集體運算實現不如 NCCL 後端提供的優化。

您肯定已經注意到,如果將 model 放在 GPU 上,我們的分散式 SGD 範例將無法運作。為了使用多個 GPU,讓我們也進行以下修改

  1. 使用 device = torch.device("cuda:{}".format(rank))

  2. model = Net() \(\rightarrow\) model = Net().to(device)

  3. 使用 data, target = data.to(device), target.to(device)

透過以上修改,我們的模型現在正在兩個 GPU 上進行訓練,您可以使用 watch nvidia-smi 監控它們的利用率。

MPI 後端

訊息傳遞介面 (MPI) 是高效能運算領域的標準化工具。 它允許進行點對點和集體通訊,並且是 torch.distributed API 的主要靈感來源。存在多種 MPI 實作(例如 Open-MPIMVAPICH2Intel MPI),每種都針對不同的目的進行了優化。 使用 MPI 後端的優勢在於 MPI 在大型電腦叢集上的廣泛可用性和高階的優化。一些最近的實作也能夠利用 CUDA IPC 和 GPU Direct 技術來避免透過 CPU 的記憶體複製。

不幸的是,PyTorch 的二進位檔案無法包含 MPI 實作,我們必須手動重新編譯它。 幸運的是,這個過程非常簡單,因為在編譯時,PyTorch 會自行尋找可用的 MPI 實作。 以下步驟透過從 原始碼安裝 PyTorch 來安裝 MPI 後端。

  1. 建立並啟用您的 Anaconda 環境,按照 指南安裝所有先決條件,但不要執行 python setup.py install

  2. 選擇並安裝您最喜歡的 MPI 實作。 請注意,啟用 CUDA-aware MPI 可能需要一些額外的步驟。 在我們的例子中,我們將堅持使用不含 GPU 支援的 Open-MPI:conda install -c conda-forge openmpi

  3. 現在,前往您克隆的 PyTorch 儲存庫並執行 python setup.py install

為了測試我們新安裝的後端,需要進行一些修改。

  1. if __name__ == '__main__': 下的內容替換為 init_process(0, 0, run, backend='mpi')

  2. 執行 mpirun -n 4 python myscript.py

進行這些更改的原因是 MPI 需要在產生進程之前建立自己的環境。 MPI 也會產生自己的進程並執行 初始化方法中描述的握手,使 init_process_groupranksize 參數變得多餘。 這實際上非常強大,因為您可以將其他參數傳遞給 mpirun,以便為每個進程客製化運算資源。(例如每個進程的核心數、手動將機器分配給特定排名,以及更多) 這樣做,您應該獲得與其他通訊後端相同的熟悉輸出。

NCCL 後端

NCCL 後端針對 CUDA tensors 提供集體運算的優化實作。如果您僅對集體運算使用 CUDA tensors,請考慮使用此後端以獲得最佳性能。 NCCL 後端包含在具有 CUDA 支援的預構建二進位檔案中。

初始化方法

為了結束本教學課程,讓我們檢查我們調用的初始函數:dist.init_process_group(backend, init_method)。 具體來說,我們將討論各種初始化方法,這些方法負責每個進程之間的初步協調步驟。 這些方法使您可以定義如何完成此協調。

初始化方法的選擇取決於您的硬體設定,並且一種方法可能比其他方法更合適。 除了以下各節之外,請參閱官方文件以獲取更多資訊。

環境變數

在本教學課程中,我們一直在使用環境變數初始化方法。透過在所有機器上設定以下四個環境變數,所有進程將能夠正確連接到主機,獲取有關其他進程的資訊,並最終與它們握手。

  • MASTER_PORT:將託管排名為 0 的進程的機器上的可用埠。

  • MASTER_ADDR:將託管排名為 0 的進程的機器的 IP 位址。

  • WORLD_SIZE:進程總數,以便主機能夠知道要等待多少個工作進程。

  • RANK: 每個process的排序(Rank),讓它們知道自己是主節點(master)還是工作節點(worker)。

共享檔案系統

共享檔案系統要求所有 processes 都能存取一個共享的檔案系統,並且透過一個共享檔案來協調它們。這表示每個 process 都會開啟該檔案、寫入其資訊,並等待直到所有 process 都完成。之後,所有需要的資訊就能被所有 processes 輕易取得。為了避免競爭條件 (race conditions),檔案系統必須支援透過 fcntl 進行鎖定。

dist.init_process_group(
    init_method='file:///mnt/nfs/sharedfile',
    rank=args.rank,
    world_size=4)

TCP

透過 TCP 初始化可以藉由提供 Rank 0 的 process 的 IP 位址和一個可存取的埠號來達成。在這裡,所有工作節點 (workers) 都能夠連接到 Rank 0 的 process,並交換如何互相連接的資訊。

dist.init_process_group(
    init_method='tcp://10.1.1.20:23456',
    rank=args.rank,
    world_size=4)

致謝

我想感謝 PyTorch 開發者們在他們的實作、文件和測試上做得如此出色。當程式碼不清楚時,我總是可以依靠 文件測試 來找到答案。特別地,我想感謝 Soumith Chintala、Adam Paszke 和 Natalia Gimelshein 提供富有洞察力的評論並回答早期草稿的問題。

文件

存取 PyTorch 的完整開發者文件

檢視文件

教學

取得針對初學者和進階開發者的深入教學課程

檢視教學課程

資源

尋找開發資源並獲得問題解答

檢視資源