• 文件 >
  • 分散式 RPC 框架
捷徑

分散式 RPC 框架

分散式 RPC 框架提供了一種多機器模型訓練機制,透過一組用於遠端通訊的原語,以及一種更高階的 API 來自動區分跨多台機器的拆分模型。

警告

RPC 套件中的 API 是穩定的。目前有多項工作正在進行中,以改善效能和錯誤處理,這些將在未來的版本中發布。

警告

CUDA 支援是在 PyTorch 1.9 中引入的,並且仍然是一個 beta 功能。RPC 套件並非所有功能都與 CUDA 支援相容,因此不建議使用。這些不受支援的功能包括:RRefs、JIT 相容性、dist autograd 和 dist optimizer,以及 profiling。這些缺點將在未來的版本中解決。

注意

請參考 PyTorch 分散式概述 以簡要介紹與分散式訓練相關的所有功能。

基礎知識

分散式 RPC 框架可以輕鬆地遠端執行函數、支援引用遠端物件而無需複製實際資料,並提供 autograd 和優化器 API 來透明地運行反向傳播並更新跨 RPC 邊界的參數。這些功能可以分為四組 API。

  1. 遠端程序呼叫 (Remote Procedure Call, RPC) 支援在指定的目標 worker 上執行函數,並傳入指定的參數,取回回傳值,或建立回傳值的參照。主要有三個 RPC API:rpc_sync() (同步)、rpc_async() (非同步) 和 remote() (非同步並回傳遠端回傳值的參照)。如果使用者程式碼在沒有回傳值的情況下無法繼續執行,請使用同步 API。否則,使用非同步 API 取得 future,並在呼叫端需要回傳值時等待 future 完成。remote() API 適用於需要在遠端建立某些東西,但永遠不需要將其取回呼叫端的情況。想像一下一個驅動程式進程正在設置參數伺服器和訓練器。驅動程式可以在參數伺服器上建立一個 embedding table,然後將 embedding table 的參照與訓練器共享,但它本身永遠不會在本地使用該 embedding table。在這種情況下,rpc_sync()rpc_async() 已經不再適用,因為它們總是暗示回傳值將立即或在未來返回給呼叫端。

  2. 遠端參考 (Remote Reference, RRef) 作為指向本地或遠端物件的分散式共享指標。它可以與其他 worker 共享,並且會透明地處理參考計數。每個 RRef 只有一個所有者,物件只存在於該所有者上。持有 RRef 的非所有者 worker 可以透過顯式請求,從所有者那裡取得物件的副本。當一個 worker 需要訪問某些資料物件,但它既不是建立者(remote() 的呼叫者)也不是物件的所有者時,這非常有用。我們將在下面討論的分散式優化器 (distributed optimizer) 就是這種用例的一個例子。

  3. 分散式自動微分 (Distributed Autograd) 將參與正向傳播的所有 worker 上的本地自動微分引擎縫合在一起,並在反向傳播期間自動聯繫它們以計算梯度。如果在執行分散式模型並行訓練、參數伺服器訓練等時,正向傳播需要跨越多個機器,這尤其有用。有了這個功能,使用者程式碼不再需要擔心如何跨 RPC 邊界傳送梯度,以及應該以什麼順序啟動本地自動微分引擎,如果正向傳播中有巢狀和相互依賴的 RPC 呼叫,這可能會變得非常複雜。

  4. 分散式優化器 (Distributed Optimizer) 的建構子接受一個 Optimizer() (例如,SGD()Adagrad() 等) 和一個參數 RRef 列表,在每個不同的 RRef 所有者上建立一個 Optimizer() 實例,並在執行 step() 時相應地更新參數。當您有分散式的正向和反向傳播時,參數和梯度將分散在多個 worker 上,因此需要在每個相關的 worker 上都有一個優化器。分散式優化器將所有這些本地優化器包裝成一個,並提供簡潔的建構子和 step() API。

RPC

在使用 RPC 和分散式自動微分 primitives 之前,必須進行初始化。要初始化 RPC 框架,我們需要使用 init_rpc(),它會初始化 RPC 框架、RRef 框架和分散式自動微分。

torch.distributed.rpc.init_rpc(name, backend=None, rank=-1, world_size=None, rpc_backend_options=None)[原始碼][原始碼]

初始化 RPC primitives,例如本地 RPC agent 和分散式自動微分,這會立即使當前進程準備好傳送和接收 RPC。

參數
  • name (str) – 此節點的全域唯一名稱。(例如,Trainer3ParameterServer2MasterWorker1) 名稱只能包含數字、字母、底線、冒號和/或破折號,並且長度必須小於 128 個字元。

  • backend (BackendType, optional) – RPC backend 實作的類型。支援的值為 BackendType.TENSORPIPE (預設)。有關更多信息,請參閱 Backends

  • rank (int) – 此節點的全域唯一 ID/rank。

  • world_size (int) – 群組中的 worker 數量。

  • rpc_backend_options ( RpcBackendOptions, optional) – 傳遞給 RpcAgent 建構子的選項。它必須是 RpcBackendOptions 的代理程式特定子類別,並包含代理程式特定的初始化配置。預設情況下,對於所有代理程式,它會將預設逾時時間設定為 60 秒,並使用 init_method = "env://"初始化的底層進程組進行 rendezvous,這表示需要正確設定環境變數 MASTER_ADDRMASTER_PORT。請參閱 Backends 以取得更多資訊,並找到可用的選項。

以下 API 允許使用者遠端執行函數,以及建立對遠端資料物件的參考 (RRefs)。在這些 API 中,當傳遞 Tensor 作為引數或傳回值時,目標 worker 將嘗試建立具有相同 meta (即 shape、stride 等) 的 Tensor。我們有意禁止傳輸 CUDA tensors,因為如果來源和目標 worker 上的裝置列表不匹配,可能會導致崩潰。在這種情況下,如果需要,應用程式始終可以在呼叫端上顯式地將輸入 tensors 移至 CPU,並在被呼叫端上將其移至所需的裝置。

警告

RPC 中的 TorchScript 支援是一項原型功能,可能會發生變化。從 v1.5.0 開始,torch.distributed.rpc 支援呼叫 TorchScript 函數作為 RPC 目標函數,這將有助於提高被呼叫端的並行性,因為執行 TorchScript 函數不需要 GIL。

torch.distributed.rpc.rpc_sync(to, func, args=None, kwargs=None, timeout=-1.0)[source][source]

進行阻塞式 RPC 呼叫,以在 worker to 上執行函數 func。 RPC 訊息的傳送和接收與 Python 程式碼的執行並行。此方法是執行緒安全的。

參數
  • to ( strWorkerInfoint) – 目標 worker 的名稱/rank/WorkerInfo

  • func (Callable) – 可呼叫的函數,例如 Python 可呼叫物件、內建運算符(例如 add())和帶註釋的 TorchScript 函數。

  • args ( tuple) – func 呼叫的引數元組。

  • kwargs ( dict) – 是 func 呼叫的關鍵字引數字典。

  • timeout ( float, optional) – 用於此 RPC 的逾時時間(以秒為單位)。如果 RPC 未在此時間內完成,則會引發一個例外,指示它已逾時。值 0 表示無限逾時,即永遠不會引發逾時錯誤。如果未提供,則使用初始化期間或使用 _set_rpc_timeout 設定的預設值。

傳回

傳回使用 argskwargs 執行 func 的結果。

範例:

請確保在兩個 worker 上正確設定 MASTER_ADDRMASTER_PORT。有關更多詳細資訊,請參閱 init_process_group() API。例如,

export MASTER_ADDR=localhost export MASTER_PORT=5678

然後在兩個不同的進程中執行以下程式碼

>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> ret = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(2), 3))
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()

以下是使用 RPC 執行 TorchScript 函數的範例。

>>> # On both workers:
>>> @torch.jit.script
>>> def my_script_add(tensor: torch.Tensor, scalar: int):
>>>    return torch.add(tensor, scalar)
>>> # On worker 0:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> ret = rpc.rpc_sync("worker1", my_script_add, args=(torch.ones(2), 3))
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()
torch.distributed.rpc.rpc_async(to, func, args=None, kwargs=None, timeout=-1.0)[source][source]

進行非阻塞式 RPC 呼叫,以在 worker to 上執行函數 func。 RPC 訊息的傳送和接收與 Python 程式碼的執行並行。此方法是執行緒安全的。此方法會立即傳回一個 Future,可以等待它完成。

參數
  • to ( strWorkerInfoint) – 目標 worker 的名稱/rank/WorkerInfo

  • func (Callable) – 可呼叫的函數,例如 Python 可呼叫物件、內建運算符(例如 add())和帶註釋的 TorchScript 函數。

  • args ( tuple) – func 呼叫的引數元組。

  • kwargs ( dict) – 是 func 呼叫的關鍵字引數字典。

  • timeout ( float, optional) – 用於此 RPC 的逾時時間(以秒為單位)。如果 RPC 未在此時間內完成,則會引發一個例外,指示它已逾時。值 0 表示無限逾時,即永遠不會引發逾時錯誤。如果未提供,則使用初始化期間或使用 _set_rpc_timeout 設定的預設值。

傳回

傳回一個 Future 物件,可以等待它完成。完成後,可以從 Future 物件中檢索 funcargskwargs 上的傳回值。

警告

由於我們不支援透過網路傳送 GPU tensors,因此不支援將 GPU tensors 用作 func 的引數或傳回值。您需要在將 GPU tensors 用作 func 的引數或傳回值之前,顯式地將它們複製到 CPU。

警告

rpc_async API 在透過網路傳送引數 tensors 時,並不會複製其儲存空間,這個動作可能會由不同的執行緒執行,取決於 RPC 後端的類型。呼叫者應確保這些 tensors 的內容在傳回的 Future 完成之前保持不變。

範例:

請確保在兩個 worker 上正確設定 MASTER_ADDRMASTER_PORT。有關更多詳細資訊,請參閱 init_process_group() API。例如,

export MASTER_ADDR=localhost export MASTER_PORT=5678

然後在兩個不同的進程中執行以下程式碼

>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> fut1 = rpc.rpc_async("worker1", torch.add, args=(torch.ones(2), 3))
>>> fut2 = rpc.rpc_async("worker1", min, args=(1, 2))
>>> result = fut1.wait() + fut2.wait()
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()

以下是使用 RPC 執行 TorchScript 函數的範例。

>>> # On both workers:
>>> @torch.jit.script
>>> def my_script_add(tensor: torch.Tensor, scalar: int):
>>>    return torch.add(tensor, scalar)
>>> # On worker 0:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> fut = rpc.rpc_async("worker1", my_script_add, args=(torch.ones(2), 3))
>>> ret = fut.wait()
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()
torch.distributed.rpc.remote(to, func, args=None, kwargs=None, timeout=-1.0)[source][source]

發起遠端呼叫,在 worker to 上執行 func,並立即傳回結果值的 RRef。 Worker to 將會是傳回的 RRef 的擁有者,而呼叫 remote 的 worker 則為使用者。 擁有者會管理其 RRef 的全域參考計數,而且只有在全域沒有對其作用中的參考時,擁有者 RRef 才會被銷毀。

參數
  • to ( strWorkerInfoint) – 目標 worker 的名稱/rank/WorkerInfo

  • func (Callable) – 可呼叫的函數,例如 Python 可呼叫物件、內建運算符(例如 add())和帶註釋的 TorchScript 函數。

  • args ( tuple) – func 呼叫的引數元組。

  • kwargs ( dict) – 是 func 呼叫的關鍵字引數字典。

  • timeout (float, optional) – 此遠端呼叫的逾時時間 (秒)。 如果 worker to 上的這個 RRef 建立未在此逾時時間內在此 worker 上成功處理,則下次嘗試使用 RRef (例如 to_here()) 時,將會引發逾時,指出此失敗。 值 0 表示無限逾時,即永遠不會引發逾時錯誤。 如果未提供,則使用初始化期間或使用 _set_rpc_timeout 設定的預設值。

傳回

結果值的使用者 RRef 實例。 使用封鎖型 API torch.distributed.rpc.RRef.to_here() 以在本機檢索結果值。

警告

remote API 在透過網路傳送引數 tensors 之前,不會複製其儲存空間,這個動作可能會由不同的執行緒執行,取決於 RPC 後端的類型。 呼叫者應確保這些 tensors 的內容在傳回的 RRef 經擁有者確認之前保持不變,可以使用 torch.distributed.rpc.RRef.confirmed_by_owner() API 進行檢查。

警告

諸如 remote API 的逾時之類的錯誤會盡力處理。 這表示當 remote 發起的遠端呼叫失敗時 (例如出現逾時錯誤),我們會採用盡力而為的方式來處理錯誤。 這表示錯誤會以非同步方式處理並設定在產生的 RRef 上。 如果在處理之前應用程式尚未使用 RRef (例如 to_here 或 fork 呼叫),則將來使用 RRef 將會適當地引發錯誤。 然而,使用者應用程式可能會在錯誤處理之前使用 RRef。 在這種情況下,可能不會引發錯誤,因為它們尚未處理。

範例

Make sure that ``MASTER_ADDR`` and ``MASTER_PORT`` are set properly
on both workers. Refer to :meth:`~torch.distributed.init_process_group`
API for more details. For example,

export MASTER_ADDR=localhost
export MASTER_PORT=5678

Then run the following code in two different processes:

>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> rref1 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3))
>>> rref2 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 1))
>>> x = rref1.to_here() + rref2.to_here()
>>> rpc.shutdown()

>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()

Below is an example of running a TorchScript function using RPC.

>>> # On both workers:
>>> @torch.jit.script
>>> def my_script_add(tensor: torch.Tensor, scalar: int):
>>>    return torch.add(tensor, scalar)

>>> # On worker 0:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> rref = rpc.remote("worker1", my_script_add, args=(torch.ones(2), 3))
>>> rref.to_here()
>>> rpc.shutdown()

>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()
torch.distributed.rpc.get_worker_info(worker_name=None)[source][source]

取得給定 worker 名稱的 WorkerInfo。 使用此 WorkerInfo 以避免在每次調用時傳遞昂貴的字串。

參數

worker_name (str) – worker 的字串名稱。 如果為 None,則傳回目前 worker 的 ID。 (預設 None)

傳回

給定 worker_nameWorkerInfo 實例,如果 worker_nameNone,則為目前 worker 的 WorkerInfo

torch.distributed.rpc.shutdown(graceful=True, timeout=0)[source][source]

執行 RPC 代理的關閉,然後銷毀 RPC 代理。這會停止本機代理接收未完成的請求,並透過終止所有 RPC 執行緒來關閉 RPC 框架。如果 graceful=True,則會封鎖直到所有本機和遠端 RPC 程序都到達此方法,並等待所有未完成的工作完成。否則,如果 graceful=False,則這是本機關閉,並且不會等待其他 RPC 程序到達此方法。

警告

對於由 Future 物件經由 rpc_async() 傳回的,在 shutdown() 之後不應呼叫 future.wait()

參數

graceful (bool) – 是否執行優雅關閉。如果為 True,則將 1) 等待直到沒有任何未完成的 UserRRefs 系統訊息並刪除它們;2) 封鎖直到所有本機和遠端 RPC 程序都已到達此方法,並等待所有未完成的工作完成。

範例:

請確保在兩個 worker 上正確設定 MASTER_ADDRMASTER_PORT。有關更多詳細資訊,請參閱 init_process_group() API。例如,

export MASTER_ADDR=localhost export MASTER_PORT=5678

然後在兩個不同的進程中執行以下程式碼

>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> # do some work
>>> result = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(1), 1))
>>> # ready to shutdown
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> # wait for worker 0 to finish work, and then shutdown.
>>> rpc.shutdown()
class torch.distributed.rpc.WorkerInfo

一個封裝系統中 worker 資訊的結構。 包含 worker 的名稱和 ID。 此類別不應直接建構,而是可以透過 get_worker_info() 檢索一個實例,並將結果傳遞到諸如 rpc_sync()rpc_async()remote() 之類的函數中,以避免每次呼叫時都複製字串。

property id

全域唯一的 ID 用於識別 worker。

property name

worker 的名稱。

RPC 套件還提供裝飾器,允許應用程式指定如何在被呼叫端處理給定的函數。

torch.distributed.rpc.functions.async_execution(fn)[原始碼][原始碼]

一個函數裝飾器,指示函數的回傳值保證是一個 Future 物件,並且此函數可以在 RPC 被呼叫端非同步執行。 更具體地說,被呼叫端會提取包裝函數傳回的 Future,並將後續處理步驟安裝為該 Future 的回呼。 已安裝的回呼將在完成時從 Future 讀取值,並將該值作為 RPC 回應傳送回去。 這也意味著傳回的 Future 僅存在於被呼叫端,並且永遠不會透過 RPC 傳送。 當包裝函數 (fn) 的執行需要暫停和恢復時,此裝飾器很有用,例如,由於包含 rpc_async() 或等待其他訊號。

注意

為了啟用非同步執行,應用程式必須將此裝飾器傳回的函數物件傳遞給 RPC API。 如果 RPC 檢測到由此裝飾器安裝的屬性,它就會知道此函數傳回一個 Future 物件,並會相應地處理。 然而,這並不意味著此裝飾器必須是定義函數時的最外層的裝飾器。 例如,當與 @staticmethod@classmethod 結合使用時,@rpc.functions.async_execution 需要是內層裝飾器,以允許目標函數被識別為靜態或類別函數。 此目標函數仍然可以非同步執行,因為在存取時,靜態或類別方法會保留由 @rpc.functions.async_execution 安裝的屬性。

範例:

傳回的 Future 物件可以來自 rpc_async()then()Future 建構子。 下面的範例顯示直接使用 then() 傳回的 Future

>>> from torch.distributed import rpc
>>>
>>> # omitting setup and shutdown RPC
>>>
>>> # On all workers
>>> @rpc.functions.async_execution
>>> def async_add_chained(to, x, y, z):
>>>     # This function runs on "worker1" and returns immediately when
>>>     # the callback is installed through the `then(cb)` API. In the
>>>     # mean time, the `rpc_async` to "worker2" can run concurrently.
>>>     # When the return value of that `rpc_async` arrives at
>>>     # "worker1", "worker1" will run the lambda function accordingly
>>>     # and set the value for the previously returned `Future`, which
>>>     # will then trigger RPC to send the result back to "worker0".
>>>     return rpc.rpc_async(to, torch.add, args=(x, y)).then(
>>>         lambda fut: fut.wait() + z
>>>     )
>>>
>>> # On worker0
>>> ret = rpc.rpc_sync(
>>>     "worker1",
>>>     async_add_chained,
>>>     args=("worker2", torch.ones(2), 1, 1)
>>> )
>>> print(ret)  # prints tensor([3., 3.])

當與 TorchScript 裝飾器結合使用時,此裝飾器必須是最外層的裝飾器。

>>> from torch import Tensor
>>> from torch.futures import Future
>>> from torch.distributed import rpc
>>>
>>> # omitting setup and shutdown RPC
>>>
>>> # On all workers
>>> @torch.jit.script
>>> def script_add(x: Tensor, y: Tensor) -> Tensor:
>>>     return x + y
>>>
>>> @rpc.functions.async_execution
>>> @torch.jit.script
>>> def async_add(to: str, x: Tensor, y: Tensor) -> Future[Tensor]:
>>>     return rpc.rpc_async(to, script_add, (x, y))
>>>
>>> # On worker0
>>> ret = rpc.rpc_sync(
>>>     "worker1",
>>>     async_add,
>>>     args=("worker2", torch.ones(2), 1)
>>> )
>>> print(ret)  # prints tensor([2., 2.])

當與靜態或類別方法結合使用時,此裝飾器必須是最內層的裝飾器。

>>> from torch.distributed import rpc
>>>
>>> # omitting setup and shutdown RPC
>>>
>>> # On all workers
>>> class AsyncExecutionClass:
>>>
>>>     @staticmethod
>>>     @rpc.functions.async_execution
>>>     def static_async_add(to, x, y, z):
>>>         return rpc.rpc_async(to, torch.add, args=(x, y)).then(
>>>             lambda fut: fut.wait() + z
>>>         )
>>>
>>>     @classmethod
>>>     @rpc.functions.async_execution
>>>     def class_async_add(cls, to, x, y, z):
>>>         ret_fut = torch.futures.Future()
>>>         rpc.rpc_async(to, torch.add, args=(x, y)).then(
>>>             lambda fut: ret_fut.set_result(fut.wait() + z)
>>>         )
>>>         return ret_fut
>>>
>>>     @rpc.functions.async_execution
>>>     def bound_async_add(self, to, x, y, z):
>>>         return rpc.rpc_async(to, torch.add, args=(x, y)).then(
>>>             lambda fut: fut.wait() + z
>>>         )
>>>
>>> # On worker0
>>> ret = rpc.rpc_sync(
>>>     "worker1",
>>>     AsyncExecutionClass.static_async_add,
>>>     args=("worker2", torch.ones(2), 1, 2)
>>> )
>>> print(ret)  # prints tensor([4., 4.])
>>>
>>> ret = rpc.rpc_sync(
>>>     "worker1",
>>>     AsyncExecutionClass.class_async_add,
>>>     args=("worker2", torch.ones(2), 1, 2)
>>> )
>>> print(ret)  # prints tensor([4., 4.])

此裝飾器也適用於 RRef 輔助函數,即 torch.distributed.rpc.RRef.rpc_sync()torch.distributed.rpc.RRef.rpc_async()torch.distributed.rpc.RRef.remote()

>>> from torch.distributed import rpc
>>>
>>> # reuse the AsyncExecutionClass class above
>>> rref = rpc.remote("worker1", AsyncExecutionClass)
>>> ret = rref.rpc_sync().static_async_add("worker2", torch.ones(2), 1, 2)
>>> print(ret)  # prints tensor([4., 4.])
>>>
>>> rref = rpc.remote("worker1", AsyncExecutionClass)
>>> ret = rref.rpc_async().static_async_add("worker2", torch.ones(2), 1, 2).wait()
>>> print(ret)  # prints tensor([4., 4.])
>>>
>>> rref = rpc.remote("worker1", AsyncExecutionClass)
>>> ret = rref.remote().static_async_add("worker2", torch.ones(2), 1, 2).to_here()
>>> print(ret)  # prints tensor([4., 4.])

後端

RPC 模組可以利用不同的後端來執行節點之間的通訊。要使用的後端可以在 init_rpc() 函數中指定,方法是傳遞 BackendType 列舉的特定值。無論使用哪個後端,RPC API 的其餘部分都不會改變。每個後端也定義了 RpcBackendOptions 類別的子類別,該類別的實例也可以傳遞給 init_rpc() 以配置後端的行為。

class torch.distributed.rpc.BackendType(value)

可用後端的列舉類別。

PyTorch 內建 BackendType.TENSORPIPE 後端。可以使用 register_backend() 函數註冊其他後端。

class torch.distributed.rpc.RpcBackendOptions

封裝傳遞到 RPC 後端的選項的抽象結構。此類別的實例可以傳遞到 init_rpc() 中,以便使用特定配置初始化 RPC,例如 RPC 超時和要使用的 init_method

property init_method

指定如何初始化處理程序群組的 URL。預設值為 env://

property rpc_timeout

一個浮點數,表示所有 RPC 使用的逾時時間。如果 RPC 未在此時間範圍內完成,它將以一個例外完成,表明它已逾時。

TensorPipe 後端

TensorPipe 代理程式是預設代理程式,它利用 TensorPipe 庫,該庫提供本質上適合機器學習的點對點通訊原語,從根本上解決了 Gloo 的一些限制。與 Gloo 相比,它具有非同步的優勢,允許大量傳輸同時發生,每個傳輸以其自身的速度進行,而不會相互阻塞。它只會在需要時按需打開節點對之間的管道,當一個節點發生故障時,只會關閉其事件管道,而所有其他管道將保持正常工作。此外,它能夠支援多種不同的傳輸方式(當然包括 TCP,還有共享記憶體、NVLink、InfiniBand 等),並且可以自動偵測它們的可用性並協商用於每個管道的最佳傳輸方式。

TensorPipe 後端已在 PyTorch v1.6 中引入,並且正在積極開發中。目前,它僅支援 CPU 張量,GPU 支援即將推出。它帶有基於 TCP 的傳輸方式,就像 Gloo 一樣。它還可以自動將大型張量分塊並多路複用到多個 Socket 和線程上,以實現非常高的頻寬。代理程式將能夠自行選擇最佳傳輸方式,而無需任何干預。

範例

>>> import os
>>> from torch.distributed import rpc
>>> os.environ['MASTER_ADDR'] = 'localhost'
>>> os.environ['MASTER_PORT'] = '29500'
>>>
>>> rpc.init_rpc(
>>>     "worker1",
>>>     rank=0,
>>>     world_size=2,
>>>     rpc_backend_options=rpc.TensorPipeRpcBackendOptions(
>>>         num_worker_threads=8,
>>>         rpc_timeout=20 # 20 second timeout
>>>     )
>>> )
>>>
>>> # omitting init_rpc invocation on worker2
class torch.distributed.rpc.TensorPipeRpcBackendOptions(*, num_worker_threads=16, rpc_timeout=60.0, init_method='env://', device_maps=None, devices=None, _transports=None, _channels=None)[source][source]

TensorPipeAgent 的後端選項,源自 RpcBackendOptions

參數
  • num_worker_threads (int, optional) – TensorPipeAgent 中用於執行請求的線程池中的線程數(預設值:16)。

  • rpc_timeout (float, optional) – RPC 請求的預設逾時時間(以秒為單位)(預設值:60 秒)。如果 RPC 未在此時間範圍內完成,將會引發一個例外,表明已逾時。呼叫者可以在 rpc_sync()rpc_async() 中覆寫個別 RPC 的此逾時時間(如果需要)。

  • init_method (str, optional) – 用於初始化用於會合的分散式儲存體的 URL。它接受與 init_process_group() 的相同參數所接受的任何值(預設值:env://)。

  • device_maps (Dict[str, Dict], optional) – 從此 worker 到被呼叫方的裝置放置對應。鍵是被呼叫方的 worker 名稱,值是字典(Dict of int, str, or torch.device),它將此 worker 的裝置對應到被呼叫方的 worker 的裝置。(預設值:None

  • devices (List[int, str, 或 torch.device], optional) – RPC Agent 使用的所有本機 CUDA 裝置。預設情況下,它會根據自身 device_maps 中的所有本機裝置,以及來自其對等節點 device_maps 中對應的裝置進行初始化。在處理 CUDA RPC 請求時,Agent 會適當地同步此 List 中所有裝置的 CUDA stream。

property device_maps

裝置映射位置。

property devices

本機 Agent 使用的所有裝置。

property init_method

指定如何初始化處理程序群組的 URL。預設值為 env://

property num_worker_threads

TensorPipeAgent 用於執行請求的執行緒池中的執行緒數量。

property rpc_timeout

一個浮點數,表示所有 RPC 使用的逾時時間。如果 RPC 未在此時間範圍內完成,它將以一個例外完成,表明它已逾時。

set_device_map(to, device_map)[source][source]

設定每個 RPC 呼叫者和被呼叫者配對之間的裝置映射。可以多次呼叫此函式來增量新增裝置放置設定。

參數
  • to (str) – 被呼叫者名稱。

  • device_map (Dict of int, str, or torch.device) – 從此 Worker 到被呼叫者的裝置放置映射。此映射必須可反轉。

範例

>>> # both workers
>>> def add(x, y):
>>>     print(x)  # tensor([1., 1.], device='cuda:1')
>>>     return x + y, (x + y).to(2)
>>>
>>> # on worker 0
>>> options = TensorPipeRpcBackendOptions(
>>>     num_worker_threads=8,
>>>     device_maps={"worker1": {0: 1}}
>>>     # maps worker0's cuda:0 to worker1's cuda:1
>>> )
>>> options.set_device_map("worker1", {1: 2})
>>> # maps worker0's cuda:1 to worker1's cuda:2
>>>
>>> rpc.init_rpc(
>>>     "worker0",
>>>     rank=0,
>>>     world_size=2,
>>>     backend=rpc.BackendType.TENSORPIPE,
>>>     rpc_backend_options=options
>>> )
>>>
>>> x = torch.ones(2)
>>> rets = rpc.rpc_sync("worker1", add, args=(x.to(0), 1))
>>> # The first argument will be moved to cuda:1 on worker1. When
>>> # sending the return value back, it will follow the invert of
>>> # the device map, and hence will be moved back to cuda:0 and
>>> # cuda:1 on worker0
>>> print(rets[0])  # tensor([2., 2.], device='cuda:0')
>>> print(rets[1])  # tensor([2., 2.], device='cuda:1')
set_devices(devices)[source][source]

設定 TensorPipe RPC Agent 使用的本機裝置。在處理 CUDA RPC 請求時,TensorPipe RPC Agent 會適當地同步此 List 中所有裝置的 CUDA stream。

參數

devices (List of int, str, or torch.device) – TensorPipe RPC Agent 使用的本機裝置。

注意

RPC 框架不會自動重試任何 rpc_sync()rpc_async()remote() 呼叫。原因是 RPC 框架無法判斷操作是否具有等冪性,以及重試是否安全。因此,應用程式有責任處理故障,並在必要時進行重試。 RPC 通訊基於 TCP,因此故障可能會因網路故障或間歇性網路連線問題而發生。在這種情況下,應用程式需要使用合理的退避策略適當地重試,以確保網路不會被過於激進的重試壓垮。

RRef

警告

使用 CUDA Tensor 時,目前不支援 RRef。

RRef (Remote REFerence) 是對遠端 Worker 上某種類型 T(例如 Tensor)的值的參考。此控制代碼使被參考的遠端值在擁有者上保持活動狀態,但並不表示該值將來會傳輸到本機 Worker。 RRef 可用於多機器訓練,方法是保留對其他 Worker 上存在的 nn.Modules 的參考,並呼叫適當的函式以在訓練期間檢索或修改其參數。有關更多詳細資訊,請參閱 遠端參考協定

class torch.distributed.rpc.PyRRef(RRef)

一個類別,封裝對遠端 Worker 上某種類型的值的參考。此控制代碼將使被參考的遠端值在 Worker 上保持活動狀態。當 1) 應用程式碼和本機 RRef 環境中都沒有對它的參考,或者 2) 應用程式呼叫了正常關機時,UserRRef 將被刪除。在已刪除的 RRef 上呼叫方法會導致未定義的行為。 RRef 實作僅提供盡力而為的錯誤檢測,並且應用程式不應在 rpc.shutdown() 之後使用 UserRRefs

警告

RRef 只能透過 RPC 模組進行序列化和反序列化。在沒有 RPC 的情況下序列化和反序列化 RRef(例如,Python pickle、torch save() / load()、JIT save() / load() 等)將導致錯誤。

參數
  • value (object) – 將被此 RRef 包裹的值。

  • type_hint (Type, optional) – 應作為 value 的類型提示傳遞給 TorchScript 編譯器的 Python 類型。

範例:

以下範例為了簡化,跳過了 RPC 初始化和關閉程式碼。 有關詳細信息,請參閱 RPC 文件。

  1. 使用 rpc.remote 建立 RRef

>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rref = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3))
>>> # get a copy of value from the RRef
>>> x = rref.to_here()
  1. 從本地物件建立 RRef

>>> import torch
>>> from torch.distributed.rpc import RRef
>>> x = torch.zeros(2, 2)
>>> rref = RRef(x)
  1. 與其他 workers 分享 RRef

>>> # On both worker0 and worker1:
>>> def f(rref):
>>>   return rref.to_here() + 1
>>> # On worker0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> from torch.distributed.rpc import RRef
>>> rref = RRef(torch.zeros(2, 2))
>>> # the following RPC shares the rref with worker1, reference
>>> # count is automatically updated.
>>> rpc.rpc_sync("worker1", f, args=(rref,))
backward(self: torch._C._distributed_rpc.PyRRef, dist_autograd_ctx_id: int = -1, retain_graph: bool = False) None

使用 RRef 作為反向傳播的根來執行反向傳播。 如果提供了 dist_autograd_ctx_id,我們將使用提供的 ctx_id 從 RRef 的所有者開始執行分散式反向傳播。 在這種情況下,應使用 get_gradients() 檢索梯度。 如果 dist_autograd_ctx_idNone,則假定這是一個本地 autograd 圖,並且我們僅執行本地反向傳播。 在本地情況下,調用此 API 的節點必須是 RRef 的所有者。 RRef 的值應為純量 Tensor。

參數
  • dist_autograd_ctx_id (int, optional) – 我們應在其中檢索梯度的分散式 autograd 上下文 ID (預設值:-1)。

  • retain_graph (bool, optional) – 如果 False,則將釋放用於計算 grad 的圖。 請注意,在幾乎所有情況下,都不需要將此選項設定為 True,並且通常可以通過更有效的方式來解決。 通常,您需要將其設定為 True 才能多次執行反向傳播(預設值:False)。

範例:
>>> import torch.distributed.autograd as dist_autograd
>>> with dist_autograd.context() as context_id:
>>>     rref.backward(context_id)
confirmed_by_owner(self: torch._C._distributed_rpc.PyRRef) bool

傳回此 RRef 是否已由所有者確認。 OwnerRRef 始終傳回 true,而 UserRRef 僅在所有者知道此 UserRRef 時才傳回 true。

is_owner(self: torch._C._distributed_rpc.PyRRef) bool

傳回目前節點是否為此 RRef 的所有者。

local_value(self: torch._C._distributed_rpc.PyRRef) object

如果目前節點是所有者,則傳回對本地值的參考。 否則,拋出例外。

owner(self: torch._C._distributed_rpc.PyRRef) torch._C._distributed_rpc.WorkerInfo

傳回擁有此 RRef 的節點的工作者資訊。

owner_name(self: torch._C._distributed_rpc.PyRRef) str

傳回擁有此 RRef 的節點的工作者名稱。

remote(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object

建立一個輔助代理 (helper proxy),以便輕鬆使用 RRef 的擁有者作為目的地來啟動 remote,從而在該 RRef 所參考的物件上執行函式。 更具體地說,rref.remote().func_name(*args, **kwargs) 與以下程式碼相同:

>>> def run(rref, func_name, args, kwargs):
>>>   return getattr(rref.local_value(), func_name)(*args, **kwargs)
>>>
>>> rpc.remote(rref.owner(), run, args=(rref, func_name, args, kwargs))
參數

timeout (float, optional) – rref.remote() 的逾時時間。 如果此 RRef 的建立未在逾時時間內成功完成,則下次嘗試使用 RRef(例如 to_here)時,將會引發逾時錯誤。 如果未提供,則將使用預設的 RPC 逾時。 請參閱 rpc.remote() 以了解 RRef 的特定逾時語意。

範例:
>>> from torch.distributed import rpc
>>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1))
>>> rref.remote().size().to_here()  # returns torch.Size([2, 2])
>>> rref.remote().view(1, 4).to_here()  # returns tensor([[1., 1., 1., 1.]])
rpc_async(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object

建立一個輔助代理,以便輕鬆使用 RRef 的擁有者作為目的地來啟動 rpc_async,從而在該 RRef 所參考的物件上執行函式。 更具體地說,rref.rpc_async().func_name(*args, **kwargs) 與以下程式碼相同:

>>> def run(rref, func_name, args, kwargs):
>>>   return getattr(rref.local_value(), func_name)(*args, **kwargs)
>>>
>>> rpc.rpc_async(rref.owner(), run, args=(rref, func_name, args, kwargs))
參數

timeout (float, optional) – rref.rpc_async() 的逾時時間。 如果呼叫未在此時限內完成,則會引發指示此情況的例外。 如果未提供此引數,則將使用預設的 RPC 逾時。

範例:
>>> from torch.distributed import rpc
>>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1))
>>> rref.rpc_async().size().wait()  # returns torch.Size([2, 2])
>>> rref.rpc_async().view(1, 4).wait()  # returns tensor([[1., 1., 1., 1.]])
rpc_sync(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object

建立一個輔助代理,以便輕鬆使用 RRef 的擁有者作為目的地來啟動 rpc_sync,從而在該 RRef 所參考的物件上執行函式。 更具體地說,rref.rpc_sync().func_name(*args, **kwargs) 與以下程式碼相同:

>>> def run(rref, func_name, args, kwargs):
>>>   return getattr(rref.local_value(), func_name)(*args, **kwargs)
>>>
>>> rpc.rpc_sync(rref.owner(), run, args=(rref, func_name, args, kwargs))
參數

timeout (float, optional) – rref.rpc_sync() 的逾時時間。 如果呼叫未在此時限內完成,則會引發指示此情況的例外。 如果未提供此引數,則將使用預設的 RPC 逾時。

範例:
>>> from torch.distributed import rpc
>>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1))
>>> rref.rpc_sync().size()  # returns torch.Size([2, 2])
>>> rref.rpc_sync().view(1, 4)  # returns tensor([[1., 1., 1., 1.]])
to_here(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object

封鎖式呼叫,將 RRef 的值從擁有者複製到本機節點並傳回。 如果目前節點是擁有者,則傳回對本機值的參考。

參數

timeout ( float, optional) – to_here 的逾時時間。如果呼叫未在此時間範圍內完成,將會引發一個例外,指出逾時。如果未提供此引數,則將使用預設的 RPC 逾時時間 (60 秒)。

RemoteModule

警告

使用 CUDA tensors 時,目前不支援 RemoteModule。

RemoteModule 提供一個簡單的方式在不同的程序上遠端建立一個 nn.Module。實際的模組位於遠端主機上,但本地主機擁有此模組的句柄,並且可以像一般的 nn.Module 一樣調用此模組。 然而,調用會產生對遠端的 RPC 呼叫,並且可以透過 RemoteModule 支援的額外 API 進行非同步執行。

class torch.distributed.nn.api.remote_module.RemoteModule(*args, **kwargs)[原始碼][原始碼]

只有在 RPC 初始化之後才能建立 RemoteModule 實例。

它在指定的遠端節點上建立一個使用者指定的模組。 它的行為類似於常規的 nn.Module,只是 forward 方法在遠端節點上執行。 它負責 autograd 記錄,以確保向後傳遞將梯度傳播回相應的遠端模組。

它基於 module_clsforward 方法的簽名產生兩個方法 forward_asyncforwardforward_async 異步運行並返回一個 Future。 forward_asyncforward 的引數與 module_cls 返回的模組的 forward 方法相同。

例如,如果 module_cls 返回一個 nn.Linear 的實例,該實例具有 forward 方法簽名: def forward(input: Tensor) -> Tensor:,則生成的 RemoteModule 將具有 2 個具有簽名的方法

def forward(input: Tensor) -> Tensor:
def forward_async(input: Tensor) -> Future[Tensor]:
參數
  • remote_device ( str) – 我們希望將此模組放置在目標 worker 上的裝置。 格式應為“<worker名稱>/<裝置>”,其中裝置欄位可以解析為 torch.device 類型。 例如,“trainer0/cpu”、“trainer0”、“ps0/cuda:0”。 此外,裝置欄位可以是可選的,預設值為“cpu”。

  • module_cls ( nn.Module) –

    要在遠端建立的模組的類別。 例如,

    >>> class MyModule(nn.Module):
    >>>     def forward(input):
    >>>         return input + 1
    >>>
    >>> module_cls = MyModule
    

  • args (Sequence, optional) – 要傳遞給 module_cls 的 args。

  • kwargs (Dict, optional) – 要傳遞給 module_cls 的 kwargs。

傳回

遠端模組實例,它封裝了使用者提供的 module_cls 建立的 Module,它具有一個阻塞的 forward 方法和一個非同步的 forward_async 方法,該方法返回遠端使用者提供的模組上 forward 呼叫的 future。

範例:

在兩個不同的程序中運行以下程式碼

>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> from torch import nn, Tensor
>>> from torch.distributed.nn.api.remote_module import RemoteModule
>>>
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> remote_linear_module = RemoteModule(
>>>     "worker1/cpu", nn.Linear, args=(20, 30),
>>> )
>>> input = torch.randn(128, 20)
>>> ret_fut = remote_linear_module.forward_async(input)
>>> ret = ret_fut.wait()
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>>
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()

此外,可以在此教學中找到一個更實際的範例,該範例與DistributedDataParallel (DDP) 結合使用。

get_module_rref()[原始碼]

返回指向遠端模組的 RRef (RRef[nn.Module])。

返回類型

RRef[Module]

remote_parameters(recurse=True)[原始碼]

返回指向遠端模組參數的 RRef 列表。

這通常可以與 DistributedOptimizer 結合使用。

參數

recurse (bool) – 如果為 True,則傳回遠端模組及其所有子模組的參數。否則,僅傳回屬於遠端模組直接成員的參數。

傳回

遠端模組參數的 RRef 列表 (List[RRef[nn.Parameter]])。

返回類型

List[RRef[Parameter]]

分散式 Autograd 框架

警告

目前在使用 CUDA tensors 時,不支援分散式 autograd。

此模組提供基於 RPC 的分散式 autograd 框架,可用於模型並行訓練等應用程式。簡而言之,應用程式可以通過 RPC 發送和接收梯度記錄 tensors。在正向傳遞中,我們記錄何時通過 RPC 發送梯度記錄 tensors,在反向傳遞期間,我們使用此資訊執行使用 RPC 的分散式反向傳遞。 更多詳情請參閱 分散式 Autograd 設計

torch.distributed.autograd.backward(context_id: int, roots: List[Tensor], retain_graph=False) None

使用提供的根啟動分散式反向傳遞。 目前,此方法實作 FAST 模式演算法,該演算法假設跨 worker 在同一個分散式 autograd 內容中發送的所有 RPC 訊息都將成為反向傳遞期間 autograd 圖的一部分。

我們使用提供的根來發現 autograd 圖並計算適當的依賴關係。 此方法會阻塞,直到完成整個 autograd 計算。

我們在每個節點上適當的 torch.distributed.autograd.context 中累積梯度。 要使用的 autograd 內容會根據呼叫 torch.distributed.autograd.backward() 時傳入的 context_id 進行查閱。 如果沒有與給定 ID 對應的有效 autograd 內容,我們會拋出錯誤。 您可以使用 get_gradients() API 檢索累積的梯度。

參數
  • context_id (int) – 我們應該為其檢索梯度的 autograd 內容 ID。

  • roots (list) – 代表 autograd 計算的根的 Tensors。 所有 tensors 都應該是純量。

  • retain_graph (bool, optional) – 如果為 False,則將釋放用於計算 grad 的圖。 請注意,在幾乎所有情況下,都不需要將此選項設定為 True,並且通常可以以更有效的方式解決。 通常,您需要將此設定為 True 才能多次執行 backward。

範例:
>>> import torch.distributed.autograd as dist_autograd
>>> with dist_autograd.context() as context_id:
>>>     pred = model.forward()
>>>     loss = loss_func(pred, loss)
>>>     dist_autograd.backward(context_id, loss)
class torch.distributed.autograd.context[source][source]

當使用分散式 autograd 時,包裝正向和反向傳遞的內容物件。 在 with 陳述式中產生的 context_id 是唯一識別所有 worker 上的分散式反向傳遞所必需的。 每個 worker 都儲存與此 context_id 相關聯的中繼資料,這是正確執行分散式 autograd 傳遞所必需的。

範例:
>>> import torch.distributed.autograd as dist_autograd
>>> with dist_autograd.context() as context_id:
>>>     t1 = torch.rand((3, 3), requires_grad=True)
>>>     t2 = torch.rand((3, 3), requires_grad=True)
>>>     loss = rpc.rpc_sync("worker1", torch.add, args=(t1, t2)).sum()
>>>     dist_autograd.backward(context_id, [loss])
torch.distributed.autograd.get_gradients(context_id: int) Dict[Tensor, Tensor]

檢索從 Tensor 到與給定 context_id 對應的提供內容中累積的該 Tensor 的適當梯度的對應,作為分散式 autograd 反向傳遞的一部分。

參數

context_id (int) – 我們應該為其檢索梯度的 autograd 內容 ID。

傳回

一種對應,其中鍵是 Tensor,值是該 Tensor 的關聯梯度。

範例:
>>> import torch.distributed.autograd as dist_autograd
>>> with dist_autograd.context() as context_id:
>>>     t1 = torch.rand((3, 3), requires_grad=True)
>>>     t2 = torch.rand((3, 3), requires_grad=True)
>>>     loss = t1 + t2
>>>     dist_autograd.backward(context_id, [loss.sum()])
>>>     grads = dist_autograd.get_gradients(context_id)
>>>     print(grads[t1])
>>>     print(grads[t2])

分散式優化器

請參閱 torch.distributed.optim 頁面,以取得關於分散式優化器的文件。

設計筆記

分散式自動微分設計筆記涵蓋了基於 RPC 的分散式自動微分框架的設計,該框架對於模型並行訓練等應用非常有用。

RRef 設計筆記涵蓋了 RRef (遠端參考) 協議的設計,該協議用於透過框架引用遠端工作節點上的值。

教學課程

RPC 教學課程向使用者介紹 RPC 框架,提供使用 torch.distributed.rpc API 的幾個範例應用,並示範如何使用 分析器 (profiler) 來分析基於 RPC 的工作負載。

文件

存取 PyTorch 的完整開發人員文件

檢視文件

教學課程

取得適用於初學者和進階開發人員的深入教學課程

檢視教學課程

資源

尋找開發資源並獲得您的問題解答

檢視資源