• 教學 >
  • TorchRL 目標:編寫 DDPG 損失
捷徑

TorchRL 目標:編寫 DDPG 損失

建立於:2023 年 8 月 14 日 | 最後更新:2025 年 1 月 27 日 | 最後驗證:未驗證

作者: Vincent Moens

概述

TorchRL 將 RL 演算法的訓練分成多個部分,這些部分將在您的訓練腳本中組裝:環境、資料收集和儲存、模型,以及最終的損失函數。

TorchRL 損失(或“目標”)是有狀態的物件,其中包含可訓練的參數(策略和價值模型)。本教學將引導您完成從頭開始使用 TorchRL 編寫損失的步驟。

為此,我們將專注於 DDPG,這是一種相對簡單的演算法,可以進行編碼。Deep Deterministic Policy Gradient (DDPG) 是一種簡單的連續控制演算法。它包括學習動作-觀察對的參數化價值函數,然後學習一種策略,該策略輸出在給定特定觀察時最大化此價值函數的動作。

您將學到什麼

  • 如何編寫損失模組並自訂其價值估計器;

  • 如何在 TorchRL 中建立環境,包括轉換(例如,資料正規化)和平行執行;

  • 如何設計策略和價值網路;

  • 如何有效地從您的環境收集資料並將其儲存在重播緩衝區中;

  • 如何在您的重播緩衝區中儲存軌跡(而不是轉換);

  • 如何評估您的模型。

先決條件

本教學假設您已完成PPO 教學,其中概述了 TorchRL 的組件和依賴關係,例如tensordict.TensorDicttensordict.nn.TensorDictModules,儘管它應該足夠透明,以便在不深入了解這些類別的情況下理解。

注意

我們的目的不是提供該演算法的 SOTA 實作,而是提供 TorchRL 損失實作和庫功能的概要說明,這些功能將在此演算法的上下文中使用。

導入和設定

%%bash
pip3 install torchrl mujoco glfw
import torch
import tqdm

如果可用,我們將在 CUDA 上執行策略

is_fork = multiprocessing.get_start_method() == "fork"
device = (
    torch.device(0)
    if torch.cuda.is_available() and not is_fork
    else torch.device("cpu")
)
collector_device = torch.device("cpu")  # Change the device to ``cuda`` to use CUDA

TorchRL LossModule

TorchRL 提供了一系列損失,可在您的訓練腳本中使用。目的是擁有易於重複使用/交換且具有簡單簽名的損失。

TorchRL 損失的主要特徵是

  • 它們是有狀態的物件:它們包含可訓練參數的副本,因此loss_module.parameters()提供了訓練演算法所需的任何內容。

  • 它們遵循TensorDict約定:torch.nn.Module.forward()方法將接收一個 TensorDict 作為輸入,其中包含返回損失值所需的所有資訊。

    >>> data = replay_buffer.sample()
    >>> loss_dict = loss_module(data)
    
  • 它們輸出一個tensordict.TensorDict實例,其中損失值寫在"loss_<smth>"下,其中smth是一個描述損失的字串。 TensorDict中的其他鍵可能是訓練期間記錄的有用指標。

    注意

    我們返回獨立損失的原因是讓使用者可以為不同的參數集使用不同的最佳化器。可以簡單地透過以下方式對損失求和

    >>> loss_val = sum(loss for key, loss in loss_dict.items() if key.startswith("loss_"))
    

__init__方法

所有損失的父類別是LossModule。與該庫的許多其他組件一樣,其forward()方法期望輸入一個從體驗重播緩衝區或任何類似資料結構中取樣的tensordict.TensorDict實例。使用這種格式可以跨模態重複使用該模組,或在模型需要讀取多個條目的複雜環境中使用。換句話說,它允許我們編寫一個損失模組,該模組不知道正在給它的資料類型,並且專注於運行損失函數的基本步驟,並且僅限於這些步驟。

為了使本教學盡可能具有教育意義,我們將獨立顯示該類的每個方法,並且稍後將填充該類。

讓我們從 __init__() 方法開始。DDPG 的目標是以一個簡單的策略解決控制任務:訓練一個策略來輸出動作,以最大化由價值網路預測的價值。因此,我們的損失模組需要在其建構子中接收兩個網路:一個 actor 和一個 value 網路。我們期望這兩者都是與 TensorDict 相容的物件,例如 tensordict.nn.TensorDictModule。我們的損失函數需要計算一個目標價值,並將價值網路擬合到這個目標價值,並且產生一個動作並擬合策略,以便使其價值估計最大化。

LossModule.__init__() 方法的關鍵步驟是呼叫 convert_to_functional()。這個方法會從模組中提取參數,並將其轉換為 functional 模組。嚴格來說,這不是必要的,並且可以完美地編寫所有沒有它的損失函數。但是,我們鼓勵使用它,原因如下。

TorchRL 這樣做的原因是,RL 演算法經常使用不同的參數集來執行相同的模型,這些參數集稱為 “trainable” 和 “target” 參數。“Trainable” 參數是最佳化器需要擬合的參數。“Target” 參數通常是前者的副本,具有一些時間延遲(絕對或透過移動平均稀釋)。這些目標參數用於計算與下一個觀測相關聯的價值。使用一組目標參數來進行價值模型,而不完全匹配目前配置的優點之一是,它們提供了正在計算的價值函數的悲觀界限。請注意下面的 create_target_params 關鍵字參數:這個參數告訴 convert_to_functional() 方法在損失模組中建立一組目標參數,用於目標價值計算。如果這被設置為 False(例如,參見 actor 網路),則 target_actor_network_params 屬性仍然可以訪問,但這只會返回 actor 參數的分離 (detached) 版本。

稍後,我們將看到如何在 TorchRL 中更新目標參數。

from tensordict.nn import TensorDictModule, TensorDictSequential


def _init(
    self,
    actor_network: TensorDictModule,
    value_network: TensorDictModule,
) -> None:
    super(type(self), self).__init__()

    self.convert_to_functional(
        actor_network,
        "actor_network",
        create_target_params=True,
    )
    self.convert_to_functional(
        value_network,
        "value_network",
        create_target_params=True,
        compare_against=list(actor_network.parameters()),
    )

    self.actor_in_keys = actor_network.in_keys

    # Since the value we'll be using is based on the actor and value network,
    # we put them together in a single actor-critic container.
    actor_critic = ActorCriticWrapper(actor_network, value_network)
    self.actor_critic = actor_critic
    self.loss_function = "l2"

價值估計器損失方法

在許多 RL 演算法中,價值網路(或 Q-value 網路)是基於經驗價值估計來訓練的。這可以是 bootstrapped (TD(0),低變異數,高偏差),意味著目標價值是使用下一個獎勵而不是其他東西來獲得的,或者可以獲得 Monte-Carlo 估計 (TD(1)),在這種情況下,將使用即將到來的獎勵的整個序列(高變異數,低偏差)。也可以使用中間估計器 (TD(\(\lambda\))) 來折衷偏差和變異數。TorchRL 可以透過 ValueEstimators Enum 類別輕鬆地使用其中一個或另一個估計器,該類別包含指向所有已實作的價值估計器的指標。讓我們在這裡定義預設價值函數。我們將採用最簡單的版本 (TD(0)),並在稍後展示如何更改它。

from torchrl.objectives.utils import ValueEstimators

default_value_estimator = ValueEstimators.TD0

我們還需要給 DDPG 一些關於如何建構價值估計器的指示,具體取決於使用者的查詢。根據提供的估計器,我們將建構相應的模組,以便在訓練時使用。

from torchrl.objectives.utils import default_value_kwargs
from torchrl.objectives.value import TD0Estimator, TD1Estimator, TDLambdaEstimator


def make_value_estimator(self, value_type: ValueEstimators, **hyperparams):
    hp = dict(default_value_kwargs(value_type))
    if hasattr(self, "gamma"):
        hp["gamma"] = self.gamma
    hp.update(hyperparams)
    value_key = "state_action_value"
    if value_type == ValueEstimators.TD1:
        self._value_estimator = TD1Estimator(value_network=self.actor_critic, **hp)
    elif value_type == ValueEstimators.TD0:
        self._value_estimator = TD0Estimator(value_network=self.actor_critic, **hp)
    elif value_type == ValueEstimators.GAE:
        raise NotImplementedError(
            f"Value type {value_type} it not implemented for loss {type(self)}."
        )
    elif value_type == ValueEstimators.TDLambda:
        self._value_estimator = TDLambdaEstimator(value_network=self.actor_critic, **hp)
    else:
        raise NotImplementedError(f"Unknown value type {value_type}")
    self._value_estimator.set_keys(value=value_key)

make_value_estimator 方法可以但不需要被呼叫:如果沒有,LossModule 將使用其預設估計器查詢此方法。

Actor 損失方法

RL 演算法的核心部分是 actor 的訓練損失。在 DDPG 的情況下,這個函數非常簡單:我們只需要計算與使用策略計算的動作相關聯的價值,並最佳化 actor 權重以最大化這個價值。

在計算這個價值時,我們必須確保將價值參數從圖表中取出,否則 actor 和價值損失將會混淆。為此,可以使用 hold_out_params() 函數。

def _loss_actor(
    self,
    tensordict,
) -> torch.Tensor:
    td_copy = tensordict.select(*self.actor_in_keys)
    # Get an action from the actor network: since we made it functional, we need to pass the params
    with self.actor_network_params.to_module(self.actor_network):
        td_copy = self.actor_network(td_copy)
    # get the value associated with that action
    with self.value_network_params.detach().to_module(self.value_network):
        td_copy = self.value_network(td_copy)
    return -td_copy.get("state_action_value")

價值損失方法

我們現在需要最佳化我們的價值網路參數。為此,我們將依賴於我們類別的價值估計器。

from torchrl.objectives.utils import distance_loss


def _loss_value(
    self,
    tensordict,
):
    td_copy = tensordict.clone()

    # V(s, a)
    with self.value_network_params.to_module(self.value_network):
        self.value_network(td_copy)
    pred_val = td_copy.get("state_action_value").squeeze(-1)

    # we manually reconstruct the parameters of the actor-critic, where the first
    # set of parameters belongs to the actor and the second to the value function.
    target_params = TensorDict(
        {
            "module": {
                "0": self.target_actor_network_params,
                "1": self.target_value_network_params,
            }
        },
        batch_size=self.target_actor_network_params.batch_size,
        device=self.target_actor_network_params.device,
    )
    with target_params.to_module(self.actor_critic):
        target_value = self.value_estimator.value_estimate(tensordict).squeeze(-1)

    # Computes the value loss: L2, L1 or smooth L1 depending on `self.loss_function`
    loss_value = distance_loss(pred_val, target_value, loss_function=self.loss_function)
    td_error = (pred_val - target_value).pow(2)

    return loss_value, td_error, pred_val, target_value

將事物放在一個 forward 呼叫中

唯一缺少的部分是 forward 方法,它將價值和 actor 損失粘合在一起,收集成本值並將它們寫入傳遞給使用者的 TensorDict 中。

from tensordict import TensorDict, TensorDictBase


def _forward(self, input_tensordict: TensorDictBase) -> TensorDict:
    loss_value, td_error, pred_val, target_value = self.loss_value(
        input_tensordict,
    )
    td_error = td_error.detach()
    td_error = td_error.unsqueeze(input_tensordict.ndimension())
    if input_tensordict.device is not None:
        td_error = td_error.to(input_tensordict.device)
    input_tensordict.set(
        "td_error",
        td_error,
        inplace=True,
    )
    loss_actor = self.loss_actor(input_tensordict)
    return TensorDict(
        source={
            "loss_actor": loss_actor.mean(),
            "loss_value": loss_value.mean(),
            "pred_value": pred_val.mean().detach(),
            "target_value": target_value.mean().detach(),
            "pred_value_max": pred_val.max().detach(),
            "target_value_max": target_value.max().detach(),
        },
        batch_size=[],
    )


from torchrl.objectives import LossModule


class DDPGLoss(LossModule):
    default_value_estimator = default_value_estimator
    make_value_estimator = make_value_estimator

    __init__ = _init
    forward = _forward
    loss_value = _loss_value
    loss_actor = _loss_actor

現在我們有了損失函數,我們可以使用它來訓練一個策略來解決控制任務。

環境

在大多數演算法中,首先需要處理的是環境的建構,因為它決定了訓練腳本的其餘部分。

對於這個範例,我們將使用 "cheetah" 任務。目標是讓半獵豹跑得盡可能快。

在 TorchRL 中,可以透過依賴 dm_controlgym 來建立這樣的任務。

env = GymEnv("HalfCheetah-v4")

env = DMControlEnv("cheetah", "run")

預設情況下,這些環境會禁用渲染。從狀態訓練通常比從圖像訓練更容易。為了簡化起見,我們只專注於從狀態學習。要將像素傳遞給由 env.step() 收集的 tensordicts,只需將 from_pixels=True 參數傳遞給建構子。

env = GymEnv("HalfCheetah-v4", from_pixels=True, pixels_only=True)

我們編寫一個 make_env() 輔助函數,它將使用上面考慮的兩個後端之一 (dm-controlgym) 建立一個環境。

from torchrl.envs.libs.dm_control import DMControlEnv
from torchrl.envs.libs.gym import GymEnv

env_library = None
env_name = None


def make_env(from_pixels=False):
    """Create a base ``env``."""
    global env_library
    global env_name

    if backend == "dm_control":
        env_name = "cheetah"
        env_task = "run"
        env_args = (env_name, env_task)
        env_library = DMControlEnv
    elif backend == "gym":
        env_name = "HalfCheetah-v4"
        env_args = (env_name,)
        env_library = GymEnv
    else:
        raise NotImplementedError

    env_kwargs = {
        "device": device,
        "from_pixels": from_pixels,
        "pixels_only": from_pixels,
        "frame_skip": 2,
    }
    env = env_library(*env_args, **env_kwargs)
    return env

轉換

現在我們有了一個基礎環境,我們可能想要修改它的表示,使其更適合策略。在 TorchRL 中,轉換會以專門的 torchr.envs.TransformedEnv 類別附加到基礎環境。

  • 在 DDPG 中,常見的做法是使用一些啟發式的值來重新縮放獎勵。在這個例子中,我們將獎勵乘以 5。

  • 如果我們使用 dm_control,建立一個模擬器和腳本之間的介面也很重要,模擬器使用雙精度浮點數,而我們的腳本可能使用單精度浮點數。這種轉換是雙向的:當調用 env.step() 時,我們的動作需要以雙精度浮點數表示,並且輸出需要轉換為單精度浮點數。DoubleToFloat 轉換的作用正是如此:in_keys 列表指的是需要從雙精度轉換為單精度的鍵,而 in_keys_inv 則指的是在傳遞給環境之前需要轉換為雙精度的鍵。

  • 我們使用 CatTensors 轉換將狀態鍵連接在一起。

  • 最後,我們也保留了狀態標準化的可能性:我們將在稍後計算標準化常數。

from torchrl.envs import (
    CatTensors,
    DoubleToFloat,
    EnvCreator,
    InitTracker,
    ObservationNorm,
    ParallelEnv,
    RewardScaling,
    StepCounter,
    TransformedEnv,
)


def make_transformed_env(
    env,
):
    """Apply transforms to the ``env`` (such as reward scaling and state normalization)."""

    env = TransformedEnv(env)

    # we append transforms one by one, although we might as well create the
    # transformed environment using the `env = TransformedEnv(base_env, transforms)`
    # syntax.
    env.append_transform(RewardScaling(loc=0.0, scale=reward_scaling))

    # We concatenate all states into a single "observation_vector"
    # even if there is a single tensor, it'll be renamed in "observation_vector".
    # This facilitates the downstream operations as we know the name of the
    # output tensor.
    # In some environments (not half-cheetah), there may be more than one
    # observation vector: in this case this code snippet will concatenate them
    # all.
    selected_keys = list(env.observation_spec.keys())
    out_key = "observation_vector"
    env.append_transform(CatTensors(in_keys=selected_keys, out_key=out_key))

    # we normalize the states, but for now let's just instantiate a stateless
    # version of the transform
    env.append_transform(ObservationNorm(in_keys=[out_key], standard_normal=True))

    env.append_transform(DoubleToFloat())

    env.append_transform(StepCounter(max_frames_per_traj))

    # We need a marker for the start of trajectories for our Ornstein-Uhlenbeck (OU)
    # exploration:
    env.append_transform(InitTracker())

    return env

平行執行

以下輔助函數允許我們平行運行環境。平行運行環境可以顯著加快收集吞吐量。當使用轉換後的環境時,我們需要選擇是要為每個環境單獨執行轉換,還是集中數據並批量轉換。這兩種方法都很容易編碼

env = ParallelEnv(
    lambda: TransformedEnv(GymEnv("HalfCheetah-v4"), transforms),
    num_workers=4
)
env = TransformedEnv(
    ParallelEnv(lambda: GymEnv("HalfCheetah-v4"), num_workers=4),
    transforms
)

為了利用 PyTorch 的向量化功能,我們採用第一種方法

def parallel_env_constructor(
    env_per_collector,
    transform_state_dict,
):
    if env_per_collector == 1:

        def make_t_env():
            env = make_transformed_env(make_env())
            env.transform[2].init_stats(3)
            env.transform[2].loc.copy_(transform_state_dict["loc"])
            env.transform[2].scale.copy_(transform_state_dict["scale"])
            return env

        env_creator = EnvCreator(make_t_env)
        return env_creator

    parallel_env = ParallelEnv(
        num_workers=env_per_collector,
        create_env_fn=EnvCreator(lambda: make_env()),
        create_env_kwargs=None,
        pin_memory=False,
    )
    env = make_transformed_env(parallel_env)
    # we call `init_stats` for a limited number of steps, just to instantiate
    # the lazy buffers.
    env.transform[2].init_stats(3, cat_dim=1, reduce_dim=[0, 1])
    env.transform[2].load_state_dict(transform_state_dict)
    return env


# The backend can be ``gym`` or ``dm_control``
backend = "gym"

注意

frame_skip 將多個步驟與單個動作批量處理。如果 > 1,則需要調整其他幀計數(例如,frames_per_batch、total_frames),以使跨實驗收集的總幀數保持一致。這很重要,因為提高 frame-skip 但保持總幀數不變可能看起來像作弊:在所有條件都相同的情況下,以 frame-skip 為 2 收集的 10M 元素數據集和以 frame-skip 為 1 收集的另一個數據集,實際上與環境的交互比率為 2:1!簡而言之,在處理幀跳過時,應謹慎對待訓練腳本的幀計數,因為這可能導致訓練策略之間的比較產生偏差。

縮放獎勵有助於我們控制訊號幅度,以實現更有效的學習。

reward_scaling = 5.0

我們也定義何時截斷一個軌跡。對於 cheetah 任務,一千步(如果 frame-skip = 2,則為 500 步)是一個很好的數字。

max_frames_per_traj = 500

觀察值的標準化

為了計算標準化統計信息,我們在環境中運行任意數量的隨機步驟,並計算收集到的觀察值的平均值和標準差。ObservationNorm.init_stats() 方法可以用於此目的。為了獲得摘要統計信息,我們創建一個虛擬環境並運行給定的步數,在給定的步數中收集數據並計算其摘要統計信息。

def get_env_stats():
    """Gets the stats of an environment."""
    proof_env = make_transformed_env(make_env())
    t = proof_env.transform[2]
    t.init_stats(init_env_steps)
    transform_state_dict = t.state_dict()
    proof_env.close()
    return transform_state_dict

標準化統計

使用 ObservationNorm 進行統計計算時使用的隨機步驟數

init_env_steps = 5000

transform_state_dict = get_env_stats()

每個數據收集器中的環境數量

env_per_collector = 4

我們將之前計算的統計信息傳遞給我們的環境輸出進行標準化

parallel_env = parallel_env_constructor(
    env_per_collector=env_per_collector,
    transform_state_dict=transform_state_dict,
)


from torchrl.data import CompositeSpec

建立模型

現在我們轉向模型的設置。正如我們所看到的,DDPG 需要一個價值網絡,經過訓練以估計狀態-動作對的價值,以及一個參數化的演員,學習如何選擇使該價值最大化的動作。

回想一下,建立 TorchRL 模塊需要兩個步驟

  • 編寫將用作網絡的 torch.nn.Module

  • 將網絡包裝在 tensordict.nn.TensorDictModule 中,其中通過指定輸入和輸出鍵來處理數據流。

在更複雜的場景中,也可以使用 tensordict.nn.TensorDictSequential

Q-Value 網絡被包裝在 ValueOperator 中,它會自動將 out_keys 設置為 q-value 網絡的 "state_action_value,以及其他價值網絡的 state_value

TorchRL 提供了一個內置版本的 DDPG 網絡,如原始論文中所述。這些可以在 DdpgMlpActorDdpgMlpQNet 下找到。

由於我們使用惰性模塊,因此必須在能夠將策略從設備移動到設備並實現其他操作之前,將惰性模塊實體化。因此,最好使用小樣本數據運行模塊。為此,我們從環境規範生成虛假數據。

from torchrl.modules import (
    ActorCriticWrapper,
    DdpgMlpActor,
    DdpgMlpQNet,
    OrnsteinUhlenbeckProcessModule,
    ProbabilisticActor,
    TanhDelta,
    ValueOperator,
)


def make_ddpg_actor(
    transform_state_dict,
    device="cpu",
):
    proof_environment = make_transformed_env(make_env())
    proof_environment.transform[2].init_stats(3)
    proof_environment.transform[2].load_state_dict(transform_state_dict)

    out_features = proof_environment.action_spec.shape[-1]

    actor_net = DdpgMlpActor(
        action_dim=out_features,
    )

    in_keys = ["observation_vector"]
    out_keys = ["param"]

    actor = TensorDictModule(
        actor_net,
        in_keys=in_keys,
        out_keys=out_keys,
    )

    actor = ProbabilisticActor(
        actor,
        distribution_class=TanhDelta,
        in_keys=["param"],
        spec=CompositeSpec(action=proof_environment.action_spec),
    ).to(device)

    q_net = DdpgMlpQNet()

    in_keys = in_keys + ["action"]
    qnet = ValueOperator(
        in_keys=in_keys,
        module=q_net,
    ).to(device)

    # initialize lazy modules
    qnet(actor(proof_environment.reset().to(device)))
    return actor, qnet


actor, qnet = make_ddpg_actor(
    transform_state_dict=transform_state_dict,
    device=device,
)

探索

如原始論文中建議的,策略被傳遞到 OrnsteinUhlenbeckProcessModule 探索模塊中。讓我們定義 OU 噪聲達到其最小值之前的幀數

annealing_frames = 1_000_000

actor_model_explore = TensorDictSequential(
    actor,
    OrnsteinUhlenbeckProcessModule(
        spec=actor.spec.clone(),
        annealing_num_steps=annealing_frames,
    ).to(device),
)
if device == torch.device("cpu"):
    actor_model_explore.share_memory()

數據收集器

TorchRL 提供了專門的類,可以幫助您通過在環境中執行策略來收集數據。這些“數據收集器”迭代地計算在給定時間要執行的動作,然後在環境中執行一個步驟,並在需要時重置它。數據收集器的設計旨在幫助開發人員嚴格控制每批數據的幀數、此收集的(異步)性質以及分配給數據收集的資源(例如 GPU、工作人員數量等等)。

在這裡,我們將使用 SyncDataCollector,一個簡單的單進程數據收集器。TorchRL 提供了其他收集器,例如 MultiaSyncDataCollector,它以異步方式執行 rollout(例如,在優化策略時將收集數據,從而將訓練和數據收集解耦)。

要指定的參數是

  • 環境工廠或環境,

  • 策略,

  • 在收集器被認為是空之前的總幀數,

  • 每個軌跡的最大幀數(對於非終止環境(如 dm_control 環境)很有用)。

    注意

    傳遞給收集器的 max_frames_per_traj 會產生註冊新的 StepCounter 轉換,並用於推論的環境。我們可以手動達成相同的結果,就像我們在這個腳本中所做的一樣。

也應該傳遞

  • 每個收集批次中的影格數量、

  • 獨立於策略執行的隨機步驟數、

  • 用於策略執行的裝置

  • 以及在資料傳遞到主程序之前,用於儲存資料的裝置。

我們在訓練期間使用的總影格數應約為 100 萬。

total_frames = 10_000  # 1_000_000

收集器在外部迴圈每次迭代中傳回的影格數等於每個子軌跡的長度乘以每個收集器中並行運行的環境數量。

換句話說,我們預期收集器的批次具有 [env_per_collector, traj_len] 的形狀,其中 traj_len=frames_per_batch/env_per_collector

traj_len = 200
frames_per_batch = env_per_collector * traj_len
init_random_frames = 5000
num_collectors = 2

from torchrl.collectors import SyncDataCollector
from torchrl.envs import ExplorationType

collector = SyncDataCollector(
    parallel_env,
    policy=actor_model_explore,
    total_frames=total_frames,
    frames_per_batch=frames_per_batch,
    init_random_frames=init_random_frames,
    reset_at_each_iter=False,
    split_trajs=False,
    device=collector_device,
    exploration_type=ExplorationType.RANDOM,
)

評估器:建立您的記錄器物件

由於訓練資料是使用某種探索策略獲得的,因此我們需要以確定性模式評估演算法的真實效能。我們使用一個專用類別 Recorder 來做到這一點,它以給定的頻率在環境中執行策略,並傳回從這些模擬中獲得的一些統計資料。

以下輔助函式可建立此物件

from torchrl.trainers import Recorder


def make_recorder(actor_model_explore, transform_state_dict, record_interval):
    base_env = make_env()
    environment = make_transformed_env(base_env)
    environment.transform[2].init_stats(
        3
    )  # must be instantiated to load the state dict
    environment.transform[2].load_state_dict(transform_state_dict)

    recorder_obj = Recorder(
        record_frames=1000,
        policy_exploration=actor_model_explore,
        environment=environment,
        exploration_type=ExplorationType.DETERMINISTIC,
        record_interval=record_interval,
    )
    return recorder_obj

我們將記錄每收集 10 個批次的效能

record_interval = 10

recorder = make_recorder(
    actor_model_explore, transform_state_dict, record_interval=record_interval
)

from torchrl.data.replay_buffers import (
    LazyMemmapStorage,
    PrioritizedSampler,
    RandomSampler,
    TensorDictReplayBuffer,
)

重播緩衝區

重播緩衝區有兩種:優先的(其中一些錯誤訊號用於使某些項目比其他項目更有可能被取樣)和常規的、循環經驗重播。

TorchRL 重播緩衝區是可組合的:可以選擇儲存、取樣和寫入策略。也可以使用記憶體映射陣列將張量儲存在實體記憶體上。以下函式負責使用所需的超參數建立重播緩衝區

from torchrl.envs import RandomCropTensorDict


def make_replay_buffer(buffer_size, batch_size, random_crop_len, prefetch=3, prb=False):
    if prb:
        sampler = PrioritizedSampler(
            max_capacity=buffer_size,
            alpha=0.7,
            beta=0.5,
        )
    else:
        sampler = RandomSampler()
    replay_buffer = TensorDictReplayBuffer(
        storage=LazyMemmapStorage(
            buffer_size,
            scratch_dir=buffer_scratch_dir,
        ),
        batch_size=batch_size,
        sampler=sampler,
        pin_memory=False,
        prefetch=prefetch,
        transform=RandomCropTensorDict(random_crop_len, sample_dim=1),
    )
    return replay_buffer

我們將把重播緩衝區儲存在磁碟上的臨時目錄中

import tempfile

tmpdir = tempfile.TemporaryDirectory()
buffer_scratch_dir = tmpdir.name

重播緩衝區儲存和批次大小

TorchRL 重播緩衝區會計算沿著第一個維度的元素數量。由於我們會將軌跡饋送到緩衝區,因此我們需要將緩衝區大小除以資料收集器產生的子軌跡長度來調整緩衝區大小。關於批次大小,我們的取樣策略將包括取樣長度為 traj_len=200 的軌跡,然後選擇長度為 random_crop_len=25 的子軌跡,並在這些子軌跡上計算損失。此策略平衡了儲存一定長度的完整軌跡的選擇與為我們的損失提供具有足夠異質性的樣本的需求。下圖顯示了資料流,該資料流來自於在每個批次中獲得 8 個影格且並行運行 2 個環境的收集器,將它們饋送到包含 1000 個軌跡的重播緩衝區,並取樣每個時間步長為 2 的子軌跡。

Storing trajectories in the replay buffer

讓我們先從儲存在緩衝區中的影格數開始

def ceil_div(x, y):
    return -x // (-y)


buffer_size = 1_000_000
buffer_size = ceil_div(buffer_size, traj_len)

預設情況下,優先重播緩衝區已停用

prb = False

我們還需要定義每個收集的資料批次要執行多少次更新。這被稱為 update-to-data 或 UTD 比率

update_to_data = 64

我們將使用長度為 25 的軌跡饋送損失

random_crop_len = 25

在原始論文中,作者針對每個收集的影格使用一批 64 個元素執行一次更新。在這裡,我們重現了相同的比率,但同時在每個批次收集時實現了幾次更新。我們調整我們的批次大小以達到相同的 update-per-frame 比率

batch_size = ceil_div(64 * frames_per_batch, update_to_data * random_crop_len)

replay_buffer = make_replay_buffer(
    buffer_size=buffer_size,
    batch_size=batch_size,
    random_crop_len=random_crop_len,
    prefetch=3,
    prb=prb,
)

損失模組建構

我們使用剛建立的 actor 和 qnet 建構我們的損失模組。因為我們有目標參數要更新,所以我們 _必須_ 建立目標網路更新器。

gamma = 0.99
lmbda = 0.9
tau = 0.001  # Decay factor for the target network

loss_module = DDPGLoss(actor, qnet)

讓我們使用 TD(lambda) 估計器!

loss_module.make_value_estimator(ValueEstimators.TDLambda, gamma=gamma, lmbda=lmbda)

注意

Off-policy 通常規定使用 TD(0) 估計器。在這裡,我們使用 TD(\(\lambda\)) 估計器,由於遵循特定狀態的軌跡是使用過時的策略收集的,因此會引入一些偏差。這個技巧,以及可以在資料收集期間使用的多步技巧,是「hack」的替代版本,我們通常發現這些技巧在實踐中效果很好,儘管它們在回報估計中引入了一些偏差。

目標網路更新器

目標網路是 off-policy RL 演算法的關鍵部分。由於 HardUpdateSoftUpdate 類別,更新目標網路參數變得容易。它們以損失模組作為參數來建立,並且通過在訓練迴圈中的適當位置呼叫 updater.step() 來實現更新。

from torchrl.objectives.utils import SoftUpdate

target_net_updater = SoftUpdate(loss_module, eps=1 - tau)

最佳化器

最後,我們將使用 Adam 最佳化器來優化策略和價值網路

from torch import optim

optimizer_actor = optim.Adam(
    loss_module.actor_network_params.values(True, True), lr=1e-4, weight_decay=0.0
)
optimizer_value = optim.Adam(
    loss_module.value_network_params.values(True, True), lr=1e-3, weight_decay=1e-2
)
total_collection_steps = total_frames // frames_per_batch

開始訓練策略

現在我們已經建立了所有需要的模組,訓練迴圈非常簡單。

rewards = []
rewards_eval = []

# Main loop

collected_frames = 0
pbar = tqdm.tqdm(total=total_frames)
r0 = None
for i, tensordict in enumerate(collector):

    # update weights of the inference policy
    collector.update_policy_weights_()

    if r0 is None:
        r0 = tensordict["next", "reward"].mean().item()
    pbar.update(tensordict.numel())

    # extend the replay buffer with the new data
    current_frames = tensordict.numel()
    collected_frames += current_frames
    replay_buffer.extend(tensordict.cpu())

    # optimization steps
    if collected_frames >= init_random_frames:
        for _ in range(update_to_data):
            # sample from replay buffer
            sampled_tensordict = replay_buffer.sample().to(device)

            # Compute loss
            loss_dict = loss_module(sampled_tensordict)

            # optimize
            loss_dict["loss_actor"].backward()
            gn1 = torch.nn.utils.clip_grad_norm_(
                loss_module.actor_network_params.values(True, True), 10.0
            )
            optimizer_actor.step()
            optimizer_actor.zero_grad()

            loss_dict["loss_value"].backward()
            gn2 = torch.nn.utils.clip_grad_norm_(
                loss_module.value_network_params.values(True, True), 10.0
            )
            optimizer_value.step()
            optimizer_value.zero_grad()

            gn = (gn1**2 + gn2**2) ** 0.5

            # update priority
            if prb:
                replay_buffer.update_tensordict_priority(sampled_tensordict)
            # update target network
            target_net_updater.step()

    rewards.append(
        (
            i,
            tensordict["next", "reward"].mean().item(),
        )
    )
    td_record = recorder(None)
    if td_record is not None:
        rewards_eval.append((i, td_record["r_evaluation"].item()))
    if len(rewards_eval) and collected_frames >= init_random_frames:
        target_value = loss_dict["target_value"].item()
        loss_value = loss_dict["loss_value"].item()
        loss_actor = loss_dict["loss_actor"].item()
        rn = sampled_tensordict["next", "reward"].mean().item()
        rs = sampled_tensordict["next", "reward"].std().item()
        pbar.set_description(
            f"reward: {rewards[-1][1]: 4.2f} (r0 = {r0: 4.2f}), "
            f"reward eval: reward: {rewards_eval[-1][1]: 4.2f}, "
            f"reward normalized={rn :4.2f}/{rs :4.2f}, "
            f"grad norm={gn: 4.2f}, "
            f"loss_value={loss_value: 4.2f}, "
            f"loss_actor={loss_actor: 4.2f}, "
            f"target value: {target_value: 4.2f}"
        )

    # update the exploration strategy
    actor_model_explore[1].step(current_frames)

collector.shutdown()
del collector
  0%|          | 0/10000 [00:00<?, ?it/s]
  8%|8         | 800/10000 [00:00<00:05, 1661.43it/s]
 16%|#6        | 1600/10000 [00:02<00:17, 485.69it/s]
 24%|##4       | 2400/10000 [00:03<00:10, 700.39it/s]
 32%|###2      | 3200/10000 [00:03<00:07, 902.67it/s]
 40%|####      | 4000/10000 [00:04<00:05, 1069.24it/s]
 48%|####8     | 4800/10000 [00:04<00:04, 1211.98it/s]
 56%|#####6    | 5600/10000 [00:05<00:03, 1323.34it/s]
reward: -2.39 (r0 = -2.56), reward eval: reward:  0.00, reward normalized=-2.66/6.70, grad norm= 74.38, loss_value= 523.18, loss_actor= 16.09, target value: -16.30:  56%|#####6    | 5600/10000 [00:06<00:03, 1323.34it/s]
reward: -2.39 (r0 = -2.56), reward eval: reward:  0.00, reward normalized=-2.66/6.70, grad norm= 74.38, loss_value= 523.18, loss_actor= 16.09, target value: -16.30:  64%|######4   | 6400/10000 [00:06<00:04, 890.40it/s]
reward: -0.14 (r0 = -2.56), reward eval: reward:  0.00, reward normalized=-2.44/5.41, grad norm= 111.10, loss_value= 194.69, loss_actor= 12.23, target value: -14.96:  64%|######4   | 6400/10000 [00:07<00:04, 890.40it/s]
reward: -0.14 (r0 = -2.56), reward eval: reward:  0.00, reward normalized=-2.44/5.41, grad norm= 111.10, loss_value= 194.69, loss_actor= 12.23, target value: -14.96:  72%|#######2  | 7200/10000 [00:08<00:03, 721.06it/s]
reward: -3.69 (r0 = -2.56), reward eval: reward:  0.00, reward normalized=-2.90/5.73, grad norm= 354.29, loss_value= 294.10, loss_actor= 17.08, target value: -19.76:  72%|#######2  | 7200/10000 [00:09<00:03, 721.06it/s]
reward: -3.69 (r0 = -2.56), reward eval: reward:  0.00, reward normalized=-2.90/5.73, grad norm= 354.29, loss_value= 294.10, loss_actor= 17.08, target value: -19.76:  80%|########  | 8000/10000 [00:10<00:03, 641.86it/s]
reward: -4.01 (r0 = -2.56), reward eval: reward:  0.00, reward normalized=-3.23/5.05, grad norm= 151.43, loss_value= 212.15, loss_actor= 18.62, target value: -20.38:  80%|########  | 8000/10000 [00:10<00:03, 641.86it/s]
reward: -4.01 (r0 = -2.56), reward eval: reward:  0.00, reward normalized=-3.23/5.05, grad norm= 151.43, loss_value= 212.15, loss_actor= 18.62, target value: -20.38:  88%|########8 | 8800/10000 [00:11<00:02, 598.09it/s]
reward: -5.22 (r0 = -2.56), reward eval: reward: -3.73, reward normalized=-2.95/4.88, grad norm= 72.95, loss_value= 167.81, loss_actor= 17.47, target value: -19.98:  88%|########8 | 8800/10000 [00:14<00:02, 598.09it/s]
reward: -5.22 (r0 = -2.56), reward eval: reward: -3.73, reward normalized=-2.95/4.88, grad norm= 72.95, loss_value= 167.81, loss_actor= 17.47, target value: -19.98:  96%|#########6| 9600/10000 [00:15<00:00, 402.51it/s]
reward: -5.42 (r0 = -2.56), reward eval: reward: -3.73, reward normalized=-3.03/5.25, grad norm= 95.94, loss_value= 229.37, loss_actor= 18.00, target value: -21.38:  96%|#########6| 9600/10000 [00:15<00:00, 402.51it/s]
reward: -5.42 (r0 = -2.56), reward eval: reward: -3.73, reward normalized=-3.03/5.25, grad norm= 95.94, loss_value= 229.37, loss_actor= 18.00, target value: -21.38: : 10400it [00:17, 363.35it/s]
reward: -3.54 (r0 = -2.56), reward eval: reward: -3.73, reward normalized=-3.36/4.25, grad norm= 48.75, loss_value= 178.35, loss_actor= 19.60, target value: -23.75: : 10400it [00:18, 363.35it/s]

實驗結果

我們繪製了訓練期間的平均獎勵的簡單圖表。我們可以觀察到我們的策略學習得很好,可以解決這個任務。

注意

如上所述,為了獲得更合理的效能,請使用更大的 total_frames 值,例如 100 萬。

from matplotlib import pyplot as plt

plt.figure()
plt.plot(*zip(*rewards), label="training")
plt.plot(*zip(*rewards_eval), label="eval")
plt.legend()
plt.xlabel("iter")
plt.ylabel("reward")
plt.tight_layout()
coding ddpg

結論

在本教程中,我們學習了如何在給定 DDPG 的具體範例的情況下,在 TorchRL 中編寫損失模組。

主要的重點是

  • 如何使用 LossModule 類別來編寫新的損失元件;

  • 如何使用(或不使用)目標網路,以及如何更新其參數;

  • 如何建立與損失模組相關聯的最佳化器。

下一步

為了在這個損失模組上進一步迭代,我們可能會考慮

腳本的總運行時間:(0 分鐘 28.653 秒)

由 Sphinx-Gallery 產生

文件

存取 PyTorch 的完整開發人員文件

檢視文件

教學

取得初學者和進階開發人員的深入教學課程

檢視教學課程

資源

尋找開發資源並獲得您問題的解答

檢視資源