• 教學 >
  • 使用非同步執行實現批次 RPC 處理
捷徑

使用非同步執行實現批次 RPC 處理

建立於:2020 年 7 月 28 日 | 最後更新:2024 年 11 月 13 日 | 最後驗證:未驗證

作者: Shen Li

注意

editgithub 中檢視和編輯本教學。

先決條件

本教學示範如何使用 @rpc.functions.async_execution 裝飾器構建批次處理 RPC 應用程式,這有助於透過減少被封鎖的 RPC 執行緒數量並整合呼叫端的 CUDA 運算來加速訓練。這與 使用 TorchServe 進行批次推論 共享相同的概念。

注意

本教學需要 PyTorch v1.6.0 或以上版本。

基本概念

先前的教學已展示使用 torch.distributed.rpc 構建分散式訓練應用程式的步驟,但它們並未詳細說明在處理 RPC 請求時,呼叫端會發生什麼事。從 PyTorch v1.5 開始,每個 RPC 請求都會封鎖呼叫端上的一個執行緒,以執行該請求中的函數,直到該函數返回。這適用於許多用例,但有一個警告。如果使用者函數封鎖 IO,例如,使用巢狀 RPC 調用,或發送信號,例如,等待不同的 RPC 請求解除封鎖,則呼叫端上的 RPC 執行緒將必須閒置等待,直到 IO 完成或發送信號事件發生。因此,RPC 呼叫端可能使用比必要更多的執行緒。這個問題的原因是 RPC 將使用者函數視為黑盒子,並且對函數中發生的事情知之甚少。為了允許使用者函數讓出並釋放 RPC 執行緒,需要向 RPC 系統提供更多提示。

從 v1.6.0 開始,PyTorch 透過引入兩個新概念來解決此問題

  • 一個 torch.futures.Future 類型,封裝了非同步執行,也支援安裝回呼函數。

  • 一個 @rpc.functions.async_execution 裝飾器,允許應用程式告訴呼叫端,目標函數將返回一個 future,並且可以在執行期間暫停和讓出多次。

透過這兩個工具,應用程式程式碼可以將使用者函數分解為多個較小的函數,將它們鏈接在一起作為 Future 物件上的回呼,並返回包含最終結果的 Future。在呼叫端,當獲取 Future 物件時,它也會安裝後續的 RPC 回應準備和通訊作為回呼,這些回呼將在最終結果準備好時觸發。透過這種方式,呼叫端不再需要封鎖一個執行緒並等待,直到最終傳回值準備好。請參閱 @rpc.functions.async_execution 的 API 文件,以獲取簡單的範例。

除了減少呼叫端上的閒置執行緒數量之外,這些工具還有助於使批次 RPC 處理更容易且更快。本教學的以下兩個章節示範如何使用 @rpc.functions.async_execution 裝飾器構建分散式批次更新參數伺服器和批次處理強化學習應用程式。

批次更新參數伺服器

考慮一個同步參數伺服器訓練應用程式,其中包含一個參數伺服器 (PS) 和多個訓練器。 在此應用程式中,PS 保留參數並等待所有訓練器報告梯度。 在每次迭代中,它會等到收到所有訓練器的梯度,然後一次性更新所有參數。 下面的程式碼顯示了 PS 類別的實作。 update_and_fetch_model 方法使用 @rpc.functions.async_execution 裝飾,並將由訓練器呼叫。 每次調用都會傳回一個 Future 物件,該物件將填充更新後的模型。 大多數訓練器啟動的調用僅將梯度累積到 .grad 欄位,立即傳回,並讓出 PS 上的 RPC 執行緒。 最後到達的訓練器將觸發最佳化器步驟並消耗所有先前報告的梯度。 然後,它會使用更新後的模型設定 future_model,進而透過 Future 物件通知來自其他訓練器的所有先前請求,並將更新後的模型傳送給所有訓練器。

import threading
import torchvision
import torch
import torch.distributed.rpc as rpc
from torch import optim

num_classes, batch_update_size = 30, 5

class BatchUpdateParameterServer(object):
    def __init__(self, batch_update_size=batch_update_size):
        self.model = torchvision.models.resnet50(num_classes=num_classes)
        self.lock = threading.Lock()
        self.future_model = torch.futures.Future()
        self.batch_update_size = batch_update_size
        self.curr_update_size = 0
        self.optimizer = optim.SGD(self.model.parameters(), lr=0.001, momentum=0.9)
        for p in self.model.parameters():
            p.grad = torch.zeros_like(p)

    def get_model(self):
        return self.model

    @staticmethod
    @rpc.functions.async_execution
    def update_and_fetch_model(ps_rref, grads):
        # Using the RRef to retrieve the local PS instance
        self = ps_rref.local_value()
        with self.lock:
            self.curr_update_size += 1
            # accumulate gradients into .grad field
            for p, g in zip(self.model.parameters(), grads):
                p.grad += g

            # Save the current future_model and return it to make sure the
            # returned Future object holds the correct model even if another
            # thread modifies future_model before this thread returns.
            fut = self.future_model

            if self.curr_update_size >= self.batch_update_size:
                # update the model
                for p in self.model.parameters():
                    p.grad /= self.batch_update_size
                self.curr_update_size = 0
                self.optimizer.step()
                self.optimizer.zero_grad()
                # by settiing the result on the Future object, all previous
                # requests expecting this updated model will be notified and
                # the their responses will be sent accordingly.
                fut.set_result(self.model)
                self.future_model = torch.futures.Future()

        return fut

對於訓練器,它們都使用來自 PS 的同一組參數進行初始化。 在每次迭代中,每個訓練器首先運行前向和後向傳遞,以在本地產生梯度。 然後,每個訓練器使用 RPC 將其梯度報告給 PS,並透過同一 RPC 請求的傳回值取回更新後的參數。 在訓練器的實作中,目標函數是否標記有 @rpc.functions.async_execution 沒有任何區別。 訓練器僅使用 rpc_sync 呼叫 update_and_fetch_model,這將在訓練器上阻塞,直到傳回更新後的模型。

batch_size, image_w, image_h  = 20, 64, 64

class Trainer(object):
    def __init__(self, ps_rref):
        self.ps_rref, self.loss_fn = ps_rref, torch.nn.MSELoss()
        self.one_hot_indices = torch.LongTensor(batch_size) \
                                    .random_(0, num_classes) \
                                    .view(batch_size, 1)

    def get_next_batch(self):
        for _ in range(6):
            inputs = torch.randn(batch_size, 3, image_w, image_h)
            labels = torch.zeros(batch_size, num_classes) \
                        .scatter_(1, self.one_hot_indices, 1)
            yield inputs.cuda(), labels.cuda()

    def train(self):
        name = rpc.get_worker_info().name
        # get initial model parameters
        m = self.ps_rref.rpc_sync().get_model().cuda()
        # start training
        for inputs, labels in self.get_next_batch():
            self.loss_fn(m(inputs), labels).backward()
            m = rpc.rpc_sync(
                self.ps_rref.owner(),
                BatchUpdateParameterServer.update_and_fetch_model,
                args=(self.ps_rref, [p.grad for p in m.cpu().parameters()]),
            ).cuda()

我們在本教學中跳過啟動多個進程的程式碼,請參考 examples 儲存庫以取得完整的實作。 請注意,可以實現批次處理而無需 @rpc.functions.async_execution 裝飾器。 但是,這需要阻塞 PS 上更多的 RPC 執行緒,或者使用另一輪 RPC 來提取更新後的模型,而後者會增加程式碼複雜性和更多的通訊開銷。

本節使用一個簡單的參數伺服器訓練範例來展示如何使用 @rpc.functions.async_execution 裝飾器實作批次 RPC 應用程式。 在下一節中,我們使用批次處理重新實作先前 分散式 RPC 框架入門 教學中的強化學習範例,並展示其對訓練速度的影響。

批次處理 CartPole 求解器

本節使用 OpenAI Gym 中的 CartPole-v1 作為範例來展示批次處理 RPC 的效能影響。 請注意,由於目標是展示 @rpc.functions.async_execution 的用法,而不是構建最佳的 CartPole 求解器或解決大多數不同的 RL 問題,因此我們使用非常簡單的策略和獎勵計算策略,並專注於多觀察者單代理批次 RPC 實作。 我們使用與先前教學類似的 Policy 模型,如下所示。 與先前的教學相比,不同之處在於它的建構子採用了一個額外的 batch 參數,該參數控制 F.softmaxdim 參數,因為透過批次處理,forward 函數中的 x 參數包含來自多個觀察者的狀態,因此需要正確地更改維度。 其他一切都保持不變。

import argparse
import torch.nn as nn
import torch.nn.functional as F

parser = argparse.ArgumentParser(description='PyTorch RPC Batch RL example')
parser.add_argument('--gamma', type=float, default=1.0, metavar='G',
                    help='discount factor (default: 1.0)')
parser.add_argument('--seed', type=int, default=543, metavar='N',
                    help='random seed (default: 543)')
parser.add_argument('--num-episode', type=int, default=10, metavar='E',
                    help='number of episodes (default: 10)')
args = parser.parse_args()

torch.manual_seed(args.seed)

class Policy(nn.Module):
    def __init__(self, batch=True):
        super(Policy, self).__init__()
        self.affine1 = nn.Linear(4, 128)
        self.dropout = nn.Dropout(p=0.6)
        self.affine2 = nn.Linear(128, 2)
        self.dim = 2 if batch else 1

    def forward(self, x):
        x = self.affine1(x)
        x = self.dropout(x)
        x = F.relu(x)
        action_scores = self.affine2(x)
        return F.softmax(action_scores, dim=self.dim)

Observer 的建構子也會相應地調整。 它還採用一個 batch 參數,該參數控制它使用哪個 Agent 函數來選擇動作。 在批次模式下,它在 Agent 上呼叫 select_action_batch 函數,稍後將會介紹,並且該函數將使用 @rpc.functions.async_execution 裝飾。

import gym
import torch.distributed.rpc as rpc

class Observer:
    def __init__(self, batch=True):
        self.id = rpc.get_worker_info().id - 1
        self.env = gym.make('CartPole-v1')
        self.env.seed(args.seed)
        self.select_action = Agent.select_action_batch if batch else Agent.select_action

與先前的教學 分散式 RPC 框架入門 相比,觀察者的行為略有不同。 它並非在環境停止時退出,而是在每個 episode 中始終運行 n_steps 次迭代。 當環境傳回時,觀察者只需重置環境並重新開始。 透過此設計,代理將從每個觀察者接收固定數量的狀態,因此可以將它們打包成固定大小的張量。 在每個步驟中,Observer 使用 RPC 將其狀態傳送到 Agent,並透過傳回值提取動作。 在每個 episode 的結尾,它會將所有步驟的獎勵傳回給 Agent。 請注意,此 run_episode 函數將由 Agent 使用 RPC 呼叫。 因此,此函數中的 rpc_sync 呼叫將是一個巢狀 RPC 調用。 我們也可以將此函數標記為 @rpc.functions.async_execution,以避免阻塞 Observer 上的一個執行緒。 但是,由於瓶頸是 Agent 而不是 Observer,因此阻塞 Observer 進程上的一個執行緒應該沒問題。

import torch

class Observer:
    ...

    def run_episode(self, agent_rref, n_steps):
        state, ep_reward = self.env.reset(), NUM_STEPS
        rewards = torch.zeros(n_steps)
        start_step = 0
        for step in range(n_steps):
            state = torch.from_numpy(state).float().unsqueeze(0)
            # send the state to the agent to get an action
            action = rpc.rpc_sync(
                agent_rref.owner(),
                self.select_action,
                args=(agent_rref, self.id, state)
            )

            # apply the action to the environment, and get the reward
            state, reward, done, _ = self.env.step(action)
            rewards[step] = reward

            if done or step + 1 >= n_steps:
                curr_rewards = rewards[start_step:(step + 1)]
                R = 0
                for i in range(curr_rewards.numel() -1, -1, -1):
                    R = curr_rewards[i] + args.gamma * R
                    curr_rewards[i] = R
                state = self.env.reset()
                if start_step == 0:
                    ep_reward = min(ep_reward, step - start_step + 1)
                start_step = step + 1

        return [rewards, ep_reward]

Agent 的建構子也採用一個 batch 參數,該參數控制如何批次處理動作機率。 在批次模式下,saved_log_probs 包含一個張量列表,其中每個張量包含來自一個步驟中所有觀察者的動作機率。 如果沒有批次處理,則 saved_log_probs 是一個字典,其中鍵是觀察者 ID,值是該觀察者的動作機率列表。

import threading
from torch.distributed.rpc import RRef

class Agent:
    def __init__(self, world_size, batch=True):
        self.ob_rrefs = []
        self.agent_rref = RRef(self)
        self.rewards = {}
        self.policy = Policy(batch).cuda()
        self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-2)
        self.running_reward = 0

        for ob_rank in range(1, world_size):
            ob_info = rpc.get_worker_info(OBSERVER_NAME.format(ob_rank))
            self.ob_rrefs.append(rpc.remote(ob_info, Observer, args=(batch,)))
            self.rewards[ob_info.id] = []

        self.states = torch.zeros(len(self.ob_rrefs), 1, 4)
        self.batch = batch
        self.saved_log_probs = [] if batch else {k:[] for k in range(len(self.ob_rrefs))}
        self.future_actions = torch.futures.Future()
        self.lock = threading.Lock()
        self.pending_states = len(self.ob_rrefs)

非批次處理 select_acion 只需將狀態丟入策略,儲存動作機率,並立即將動作傳回給觀察者。

from torch.distributions import Categorical

class Agent:
    ...

    @staticmethod
    def select_action(agent_rref, ob_id, state):
        self = agent_rref.local_value()
        probs = self.policy(state.cuda())
        m = Categorical(probs)
        action = m.sample()
        self.saved_log_probs[ob_id].append(m.log_prob(action))
        return action.item()

透過批次處理,狀態會儲存在一個二維張量 self.states 中,使用觀察者 ID 作為列 ID。然後,它會透過安裝回呼函數來將 Future 串連到批次產生的 self.future_actions Future 物件。該物件將會使用該觀察者的 ID 來填入特定的列。最後一個到達的觀察者會一次性地透過 policy 執行所有批次處理的狀態,並相應地設定 self.future_actions。當這種情況發生時,所有安裝在 self.future_actions 上的回呼函數將會被觸發,並且它們的回傳值將會被用來填入串連的 Future 物件,這反過來會通知 Agent 為所有先前來自其他觀察者的 RPC 請求準備並傳達回應。

class Agent:
    ...

    @staticmethod
    @rpc.functions.async_execution
    def select_action_batch(agent_rref, ob_id, state):
        self = agent_rref.local_value()
        self.states[ob_id].copy_(state)
        future_action = self.future_actions.then(
            lambda future_actions: future_actions.wait()[ob_id].item()
        )

        with self.lock:
            self.pending_states -= 1
            if self.pending_states == 0:
                self.pending_states = len(self.ob_rrefs)
                probs = self.policy(self.states.cuda())
                m = Categorical(probs)
                actions = m.sample()
                self.saved_log_probs.append(m.log_prob(actions).t()[0])
                future_actions = self.future_actions
                self.future_actions = torch.futures.Future()
                future_actions.set_result(actions.cpu())
        return future_action

現在讓我們定義不同的 RPC 函數是如何拼接在一起的。Agent 控制每個 episode 的執行。它首先使用 rpc_async 在所有觀察者上啟動 episode,並阻塞於回傳的 future,該 future 將會填入觀察者回報。請注意,下面的程式碼使用 RRef 輔助函數 ob_rref.rpc_async()ob_rref RRef 的擁有者上啟動 run_episode 函數,並提供參數。然後,它會將儲存的動作機率和回傳的觀察者回報轉換為預期的資料格式,並啟動訓練步驟。最後,它會重置所有狀態並回傳當前 episode 的回報。此函數是執行一個 episode 的入口點。

class Agent:
    ...

    def run_episode(self, n_steps=0):
        futs = []
        for ob_rref in self.ob_rrefs:
            # make async RPC to kick off an episode on all observers
            futs.append(ob_rref.rpc_async().run_episode(self.agent_rref, n_steps))

        # wait until all obervers have finished this episode
        rets = torch.futures.wait_all(futs)
        rewards = torch.stack([ret[0] for ret in rets]).cuda().t()
        ep_rewards = sum([ret[1] for ret in rets]) / len(rets)

        # stack saved probs into one tensor
        if self.batch:
            probs = torch.stack(self.saved_log_probs)
        else:
            probs = [torch.stack(self.saved_log_probs[i]) for i in range(len(rets))]
            probs = torch.stack(probs)

        policy_loss = -probs * rewards / len(rets)
        policy_loss.sum().backward()
        self.optimizer.step()
        self.optimizer.zero_grad()

        # reset variables
        self.saved_log_probs = [] if self.batch else {k:[] for k in range(len(self.ob_rrefs))}
        self.states = torch.zeros(len(self.ob_rrefs), 1, 4)

        # calculate running rewards
        self.running_reward = 0.5 * ep_rewards + 0.5 * self.running_reward
        return ep_rewards, self.running_reward

其餘程式碼是正常的程序啟動和日誌記錄,與其他 RPC 教學類似。在本教學中,所有觀察者都會被動地等待來自 agent 的命令。請參考 examples repo 以取得完整實作。

def run_worker(rank, world_size, n_episode, batch, print_log=True):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    if rank == 0:
        # rank0 is the agent
        rpc.init_rpc(AGENT_NAME, rank=rank, world_size=world_size)

        agent = Agent(world_size, batch)
        for i_episode in range(n_episode):
            last_reward, running_reward = agent.run_episode(n_steps=NUM_STEPS)

            if print_log:
                print('Episode {}\tLast reward: {:.2f}\tAverage reward: {:.2f}'.format(
                    i_episode, last_reward, running_reward))
    else:
        # other ranks are the observer
        rpc.init_rpc(OBSERVER_NAME.format(rank), rank=rank, world_size=world_size)
        # observers passively waiting for instructions from agents
    rpc.shutdown()


def main():
    for world_size in range(2, 12):
        delays = []
        for batch in [True, False]:
            tik = time.time()
            mp.spawn(
                run_worker,
                args=(world_size, args.num_episode, batch),
                nprocs=world_size,
                join=True
            )
            tok = time.time()
            delays.append(tok - tik)

        print(f"{world_size}, {delays[0]}, {delays[1]}")


if __name__ == '__main__':
    main()

批次 RPC 有助於將動作推論整合到較少的 CUDA 運算中,因此降低了分攤的 overhead。上面的 main 函數使用不同數量的觀察者(範圍從 1 到 10)在批次和非批次模式下執行相同的程式碼。下圖繪製了使用預設參數值時,不同 world size 的執行時間。結果證實了我們的預期,即批次處理有助於加速訓練。

文件

取得 PyTorch 的完整開發者文件

檢視文件

教學

取得適用於初學者和進階開發人員的深入教學課程

檢視教學課程

資源

尋找開發資源並取得您問題的解答

檢視資源