分散式自動微分設計¶
本筆記將介紹分散式自動微分的詳細設計,並逐步說明其內部機制。在繼續之前,請確保您熟悉自動微分機制和分散式 RPC 框架。
背景¶
假設你有兩個節點,以及一個非常簡單的模型被分割到兩個節點上。這可以使用 torch.distributed.rpc
來實現,如下所示:
import torch
import torch.distributed.rpc as rpc
def my_add(t1, t2):
return torch.add(t1, t2)
# On worker 0:
t1 = torch.rand((3, 3), requires_grad=True)
t2 = torch.rand((3, 3), requires_grad=True)
# Perform some computation remotely.
t3 = rpc.rpc_sync("worker1", my_add, args=(t1, t2))
# Perform some computation locally based on remote result.
t4 = torch.rand((3, 3), requires_grad=True)
t5 = torch.mul(t3, t4)
# Compute some loss.
loss = t5.sum()
分散式自動微分背後的主要動機是,能夠在這些分散式模型上運行反向傳播,並使用我們計算出的 loss
,並為所有需要梯度的張量記錄適當的梯度。
前向傳播期間的自動微分紀錄¶
PyTorch 在前向傳播期間建立自動微分圖,此圖用於執行反向傳播。有關更多詳細信息,請參閱 自動微分如何編碼歷史記錄。
對於分散式自動微分,我們需要追蹤前向傳播期間的所有 RPC,以確保反向傳播得到適當的執行。為此,當我們執行 RPC 時,我們會將 send
和 recv
函數附加到自動微分圖。
send
函數附加到 RPC 的來源,其輸出邊指向 RPC 輸入張量的自動微分函數。此函數在反向傳播期間的輸入是從目的地作為適當的recv
函數的輸出接收的。recv
函數附加到 RPC 的目的地,其輸入是使用輸入張量從目的地執行的運算符中檢索的。此函數的輸出梯度會在反向傳播期間發送到源節點,並傳送到適當的send
函數。每個
send-recv
對都被分配一個全局唯一的autograd_message_id
,以唯一識別該對。這在反向傳播期間查找遠程節點上的相應函數非常有用。對於 RRef,每當我們呼叫
torch.distributed.rpc.RRef.to_here()
時,我們都會為涉及的張量附加適當的send-recv
對。
舉例來說,以下是我們上面範例的自動微分圖的樣子(為了簡化起見,排除了 t5.sum())

分散式自動微分上下文¶
每個使用分散式自動微分的前向和反向傳播都會被分配一個唯一的 torch.distributed.autograd.context
,並且此上下文具有全局唯一的 autograd_context_id
。此上下文會根據需要在每個節點上建立。
此上下文具有以下用途:
多個運行分散式反向傳播的節點可能會在同一個張量上累積梯度,因此,在我們有機會運行優化器之前,張量的
.grad
欄位將具有來自各種分散式反向傳播的梯度。這類似於在本地多次呼叫torch.autograd.backward()
。為了提供一種分離每個反向傳播的梯度的方法,梯度會累積在每個反向傳播的torch.distributed.autograd.context
中。在前向傳播期間,我們會將每個自動微分傳遞的
send
和recv
函數儲存在此上下文中。這確保我們持有自動微分圖中適當節點的參考,以保持其活動狀態。除此之外,在反向傳播期間也很容易查找適當的send
和recv
函數。一般而言,我們也會使用此上下文來儲存每個分散式自動微分傳遞的一些元數據。
從使用者的角度來看,自動微分上下文的設定如下:
import torch.distributed.autograd as dist_autograd
with dist_autograd.context() as context_id:
loss = model.forward()
dist_autograd.backward(context_id, loss)
重要的是要注意,模型的正向傳播必須在分散式自動微分上下文管理器中調用,因為需要一個有效的上下文,以確保所有 send
和 recv
函數都已正確儲存,以便在所有參與節點上運行反向傳播。
分散式反向傳播¶
在本節中,我們概述了在分散式反向傳播期間準確計算依賴關係的挑戰,並描述了關於我們如何執行分散式反向傳播的幾種算法(具有權衡)。
計算依賴關係¶
考慮在單個機器上運行的以下程式碼片段:
import torch
a = torch.rand((3, 3), requires_grad=True)
b = torch.rand((3, 3), requires_grad=True)
c = torch.rand((3, 3), requires_grad=True)
d = a + b
e = b * c
d.sum.().backward()
以下是上述程式碼的自動微分圖的樣子:

自動微分引擎作為反向傳播的一部分執行的第一步是計算自動微分圖中每個節點的依賴關係數量。這有助於自動微分引擎了解圖中的節點何時準備好執行。add(1)
和 mul(0)
中括號中的數字表示依賴關係的數量。正如你所看到的,這意味著在反向傳播期間,add
節點需要 1 個輸入,而 mul
節點不需要任何輸入(換句話說,不需要執行)。本地自動微分引擎通過從根節點(在本例中為 d
)遍歷圖來計算這些依賴關係。
自動微分圖中的某些節點可能不會在反向傳播中執行,這對分散式自動微分提出了挑戰。考慮使用 RPC 的程式碼片段:
import torch
import torch.distributed.rpc as rpc
a = torch.rand((3, 3), requires_grad=True)
b = torch.rand((3, 3), requires_grad=True)
c = torch.rand((3, 3), requires_grad=True)
d = rpc.rpc_sync("worker1", torch.add, args=(a, b))
e = rpc.rpc_sync("worker1", torch.mul, args=(b, c))
loss = d.sum()
上述程式碼的相關自動微分圖將是:

計算此分散式自動微分圖的依賴關係更具挑戰性,並且需要一些開銷(無論是在計算還是網路通訊方面)。
對於性能敏感的應用,我們可以通過假設每個 send
和 recv
函數作為反向傳播的一部分都是有效的來避免大量的開銷(大多數應用程式不執行未使用的 RPC)。這簡化了分散式自動微分算法並且效率更高,但代價是應用程式需要了解限制。此演算法稱為 快速模式演算法,並在下面詳細描述。
在一般情況下,並非每個 send
和 recv
函式都必須在反向傳播中有效。為了處理這個問題,我們提出了一個 SMART 模式演算法,將在後面的章節中描述。請注意,目前僅實作了 FAST 模式演算法。
FAST 模式演算法¶
此演算法的關鍵假設是,當我們執行反向傳播時,每個 send
函式都有 1 個依賴項。 換句話說,我們假設我們會透過 RPC 從另一個節點接收梯度。
該演算法如下:
我們從具有反向傳播根節點的 worker 開始(所有根節點都必須是本地的)。
查詢目前 分散式 Autograd 上下文的所有
send
函式。從提供的根節點和我們檢索到的所有
send
函式開始,在本地計算依賴關係。計算依賴關係後,使用提供的根節點啟動本地 autograd 引擎。
當 autograd 引擎執行
recv
函式時,recv
函式會透過 RPC 將輸入梯度發送到適當的 worker。 每個recv
函式都知道目標 worker id,因為它已作為正向傳播的一部分記錄。recv
函式也會將autograd_context_id
和autograd_message_id
發送到遠端主機。當遠端主機收到此請求時,我們使用
autograd_context_id
和autograd_message_id
來查找適當的send
函式。如果 worker 是第一次收到指定
autograd_context_id
的請求,它將如上述第 1-3 點所述,在本地計算依賴關係。然後,將在 6. 中檢索到的
send
函式排隊,以便在該 worker 的本地 autograd 引擎上執行。最後,我們不是在 Tensor 的
.grad
欄位上累積梯度,而是針對每個 分散式 Autograd 上下文 分別累積梯度。 梯度儲存在Dict[Tensor, Tensor]
中,這基本上是從 Tensor 到其關聯梯度的映射,並且可以使用get_gradients()
API 檢索此映射。
作為範例,使用分散式 autograd 的完整程式碼如下所示:
import torch
import torch.distributed.autograd as dist_autograd
import torch.distributed.rpc as rpc
def my_add(t1, t2):
return torch.add(t1, t2)
# On worker 0:
# Setup the autograd context. Computations that take
# part in the distributed backward pass must be within
# the distributed autograd context manager.
with dist_autograd.context() as context_id:
t1 = torch.rand((3, 3), requires_grad=True)
t2 = torch.rand((3, 3), requires_grad=True)
# Perform some computation remotely.
t3 = rpc.rpc_sync("worker1", my_add, args=(t1, t2))
# Perform some computation locally based on remote result.
t4 = torch.rand((3, 3), requires_grad=True)
t5 = torch.mul(t3, t4)
# Compute some loss.
loss = t5.sum()
# Run the backward pass.
dist_autograd.backward(context_id, [loss])
# Retrieve the gradients from the context.
dist_autograd.get_gradients(context_id)
具有依賴關係的分散式 autograd 圖表如下所示(為了簡單起見,排除 t5.sum())

應用於上述範例的 FAST 模式演算法 如下:
在
Worker 0
上,我們從根節點loss
和send1
開始計算依賴關係。 因此,send1
標記為依賴項 1,而Worker 0
上的mul
標記為依賴項 1。現在,我們在
Worker 0
上啟動本地 autograd 引擎。 我們首先執行mul
函式,將其輸出在 autograd 上下文中累積為t4
的梯度。 然後,我們執行recv2
,它將梯度發送到Worker 1
。由於
Worker 1
是第一次聽說這個反向傳播,因此它會啟動依賴關係計算,並適當地標記send2
、add
和recv1
的依賴關係。接下來,我們將
send2
排隊到Worker 1
的本地 autograd 引擎上,它會依次執行add
和recv1
。當執行
recv1
時,它會將梯度傳送到Worker 0
。由於
Worker 0
已經計算出此反向傳播的依賴關係,因此它只會在本地排隊和執行send1
。最後,
t1
、t2
和t4
的梯度會在 分散式 Autograd 上下文中累積。
分散式優化器¶
DistributedOptimizer
的運作方式如下:
接受要優化的遠端參數列表 (
RRef
)。這些也可以是封裝在本地RRef
中的本地參數。接受一個
Optimizer
類別作為本地優化器,在所有不同的RRef
持有者上執行。分散式優化器會在每個 worker 節點上建立本地
Optimizer
的實例,並持有對它們的RRef
。當呼叫
torch.distributed.optim.DistributedOptimizer.step()
時,分散式優化器會使用 RPC 遠端執行適當遠端 worker 上的所有本地優化器。分散式 autogradcontext_id
必須作為torch.distributed.optim.DistributedOptimizer.step()
的輸入提供。本地優化器會使用它來應用儲存在相應上下文中的梯度。如果多個並行的分散式優化器正在更新 worker 上的相同參數,這些更新會透過鎖定序列化。
簡單的端到端範例¶
總而言之,以下是一個使用分散式 autograd 和分散式優化器的簡單端到端範例。如果程式碼放置在名為“dist_autograd_simple.py”的檔案中,則可以使用命令 MASTER_ADDR="localhost" MASTER_PORT=29500 python dist_autograd_simple.py
執行它
import torch
import torch.multiprocessing as mp
import torch.distributed.autograd as dist_autograd
from torch.distributed import rpc
from torch import optim
from torch.distributed.optim import DistributedOptimizer
def random_tensor():
return torch.rand((3, 3), requires_grad=True)
def _run_process(rank, dst_rank, world_size):
name = "worker{}".format(rank)
dst_name = "worker{}".format(dst_rank)
# Initialize RPC.
rpc.init_rpc(
name=name,
rank=rank,
world_size=world_size
)
# Use a distributed autograd context.
with dist_autograd.context() as context_id:
# Forward pass (create references on remote nodes).
rref1 = rpc.remote(dst_name, random_tensor)
rref2 = rpc.remote(dst_name, random_tensor)
loss = rref1.to_here() + rref2.to_here()
# Backward pass (run distributed autograd).
dist_autograd.backward(context_id, [loss.sum()])
# Build DistributedOptimizer.
dist_optim = DistributedOptimizer(
optim.SGD,
[rref1, rref2],
lr=0.05,
)
# Run the distributed optimizer step.
dist_optim.step(context_id)
def run_process(rank, world_size):
dst_rank = (rank + 1) % world_size
_run_process(rank, dst_rank, world_size)
rpc.shutdown()
if __name__ == '__main__':
# Run world_size workers
world_size = 2
mp.spawn(run_process, args=(world_size,), nprocs=world_size)