• 文件 >
  • TorchRL 目標:編寫 DDPG 損失函數
捷徑

TorchRL 目標:編寫 DDPG 損失函數

作者: Vincent Moens

概述

TorchRL 將 RL 演算法的訓練分成各種部分,這些部分將在您的訓練腳本中組合在一起:環境、資料收集和儲存、模型以及最終的損失函數。

TorchRL 損失函數(或「目標」)是有狀態的物件,包含可訓練的參數(策略和價值模型)。本教學課程將引導您完成從頭開始使用 TorchRL 編寫損失函數的步驟。

為此,我們將專注於 DDPG,這是一種相對簡單的演算法。 深度確定性策略梯度 (DDPG) 是一種簡單的連續控制演算法。它包括學習動作-觀察配對的參數價值函數,然後學習一種策略,該策略輸出在給定特定觀察時最大化此價值函數的動作。

您將學到的內容

  • 如何編寫損失函數模組並自訂其價值估算器;

  • 如何在 TorchRL 中建構環境,包括轉換(例如,資料正規化)和平行執行;

  • 如何設計策略和價值網路;

  • 如何有效地從您的環境收集資料並將其儲存在重播緩衝區中;

  • 如何在您的重播緩衝區中儲存軌跡(而不是轉移);

  • 如何評估您的模型。

先決條件

本教學課程假設您已完成 PPO 教學課程,其中概述了 TorchRL 的元件和依賴項,例如 tensordict.TensorDicttensordict.nn.TensorDictModules,儘管它應該足夠透明,以便在不深入了解這些類別的情況下也能理解。

注意

我們的目標不是提供該演算法的 SOTA 實作,而是提供 TorchRL 損失函數實作以及將在此演算法的上下文中使用的函式庫功能的高階說明。

匯入和設定

%%bash
pip3 install torchrl mujoco glfw
import torch
import tqdm

如果可用,我們將在 CUDA 上執行策略

is_fork = multiprocessing.get_start_method() == "fork"
device = (
    torch.device(0)
    if torch.cuda.is_available() and not is_fork
    else torch.device("cpu")
)
collector_device = torch.device("cpu")  # Change the device to ``cuda`` to use CUDA

TorchRL LossModule

TorchRL 提供了一系列損失函數,可在您的訓練腳本中使用。 目標是擁有易於重複使用/交換且具有簡單簽名的損失函數。

TorchRL 損失函數的主要特性是

  • 它們是有狀態的物件:它們包含可訓練參數的副本,因此 loss_module.parameters() 提供訓練演算法所需的任何內容。

  • 它們遵循 TensorDict 慣例: torch.nn.Module.forward() 方法將接收一個 TensorDict 作為輸入,其中包含傳回損失值所需的所有資訊。

    >>> data = replay_buffer.sample()
    >>> loss_dict = loss_module(data)
    
  • 它們會輸出一個 tensordict.TensorDict 實例,其中損失值寫在 "loss_<smth>" 下,其中 smth 是一個描述損失的字串。 TensorDict 中的其他鍵可能是訓練期間記錄的有用指標。

    注意

    我們返回獨立損失的原因是讓使用者可以為不同的參數集使用不同的優化器。損失的總和可以簡單地透過以下方式完成:

    >>> loss_val = sum(loss for key, loss in loss_dict.items() if key.startswith("loss_"))
    

__init__ 方法

所有損失的父類別是 LossModule。與該庫的許多其他元件一樣,其 forward() 方法期望輸入一個從經驗回放緩衝區或任何類似資料結構中採樣的 tensordict.TensorDict 實例。 使用這種格式可以跨模態重複使用該模組,或在模型需要讀取多個條目的複雜設定中使用。換句話說,它允許我們編寫一個損失模組,該模組不知道給定的資料類型,而只專注於運行損失函數的基本步驟,僅此而已。

為了使本教程盡可能具有教學性,我們將獨立顯示該類的每個方法,並在稍後階段填充該類。

讓我們從 __init__() 方法開始。 DDPG 旨在透過一個簡單的策略來解決控制任務:訓練一個策略來輸出最大化價值網路預測的動作。因此,我們的損失模組需要在其建構子中接收兩個網路:一個 actor 和一個 value 網路。我們期望這兩者都是與 TensorDict 相容的物件,例如 tensordict.nn.TensorDictModule。我們的損失函數將需要計算一個目標值並使 value 網路適應該目標值,並產生一個動作並使策略適應,使其價值估計最大化。

LossModule.__init__() 方法的關鍵步驟是呼叫 convert_to_functional()。此方法將從模組中提取參數並將其轉換為 functional 模組。 嚴格來說,這不是必需的,並且可以完全編寫所有沒有它的損失。 但是,我們鼓勵使用它的原因如下。

TorchRL 這樣做的原因是 RL 演算法經常使用不同的參數集(稱為「可訓練」和「目標」參數)執行相同的模型。「可訓練」參數是優化器需要擬合的參數。「目標」參數通常是前者的副本,具有一定的時間延遲(絕對的或透過移動平均數稀釋)。這些目標參數用於計算與下一個觀察相關聯的值。使用一組與目前配置不完全匹配的 value 模型的目標參數的優點之一是,它們提供了正在計算的 value 函數的悲觀界限。請注意下面的 create_target_params 關鍵字參數:此參數告訴 convert_to_functional() 方法在損失模組中建立一組用於目標 value 計算的目標參數。如果將其設定為 False(例如,請參閱 actor 網路),則 target_actor_network_params 屬性仍然可以訪問,但這只會傳回 actor 參數的**分離**版本。

稍後,我們將看到如何在 TorchRL 中更新目標參數。

from tensordict.nn import TensorDictModule, TensorDictSequential


def _init(
    self,
    actor_network: TensorDictModule,
    value_network: TensorDictModule,
) -> None:
    super(type(self), self).__init__()

    self.convert_to_functional(
        actor_network,
        "actor_network",
        create_target_params=True,
    )
    self.convert_to_functional(
        value_network,
        "value_network",
        create_target_params=True,
        compare_against=list(actor_network.parameters()),
    )

    self.actor_in_keys = actor_network.in_keys

    # Since the value we'll be using is based on the actor and value network,
    # we put them together in a single actor-critic container.
    actor_critic = ActorCriticWrapper(actor_network, value_network)
    self.actor_critic = actor_critic
    self.loss_function = "l2"

value 估計器損失方法

在許多 RL 演算法中,value 網路(或 Q-value 網路)是基於經驗 value 估計訓練的。這可以是 bootstrapping 的(TD(0),低變異數、高偏差),意味著目標 value 是使用下一個獎勵獲得的,沒有其他東西,或者可以獲得蒙地卡羅估計 (TD(1)),在這種情況下,將使用即將到來的獎勵的整個序列(高變異數,低偏差)。也可以使用中間估計器 (TD(\(\lambda\))) 來折衷偏差和變異數。TorchRL 使您可以輕鬆地透過 ValueEstimators 列舉類別使用一個或另一個估計器,該列舉類別包含指向所有已實作的 value 估計器的指標。讓我們在這裡定義預設 value 函數。 我們將採用最簡單的版本 (TD(0)),並在稍後展示如何更改它。

from torchrl.objectives.utils import ValueEstimators

default_value_estimator = ValueEstimators.TD0

我們還需要向 DDPG 提供有關如何根據使用者查詢建立 value 估計器的說明。根據提供的估計器,我們將建立在訓練時間使用的相應模組

from torchrl.objectives.utils import default_value_kwargs
from torchrl.objectives.value import TD0Estimator, TD1Estimator, TDLambdaEstimator


def make_value_estimator(self, value_type: ValueEstimators, **hyperparams):
    hp = dict(default_value_kwargs(value_type))
    if hasattr(self, "gamma"):
        hp["gamma"] = self.gamma
    hp.update(hyperparams)
    value_key = "state_action_value"
    if value_type == ValueEstimators.TD1:
        self._value_estimator = TD1Estimator(value_network=self.actor_critic, **hp)
    elif value_type == ValueEstimators.TD0:
        self._value_estimator = TD0Estimator(value_network=self.actor_critic, **hp)
    elif value_type == ValueEstimators.GAE:
        raise NotImplementedError(
            f"Value type {value_type} it not implemented for loss {type(self)}."
        )
    elif value_type == ValueEstimators.TDLambda:
        self._value_estimator = TDLambdaEstimator(value_network=self.actor_critic, **hp)
    else:
        raise NotImplementedError(f"Unknown value type {value_type}")
    self._value_estimator.set_keys(value=value_key)

可以但不一定要呼叫 make_value_estimator 方法:如果沒有,LossModule 將使用其預設估計器查詢此方法。

actor 損失方法

RL 演算法的核心部分是 actor 的訓練損失。在 DDPG 的情況下,此函數非常簡單:我們只需要計算與使用策略計算的動作相關聯的 value,並優化 actor 權重以最大化此 value。

在計算此 value 時,我們必須確保將 value 參數從圖中取出,否則 actor 和 value 損失將混合在一起。為此,可以使用 hold_out_params() 函數。

def _loss_actor(
    self,
    tensordict,
) -> torch.Tensor:
    td_copy = tensordict.select(*self.actor_in_keys)
    # Get an action from the actor network: since we made it functional, we need to pass the params
    with self.actor_network_params.to_module(self.actor_network):
        td_copy = self.actor_network(td_copy)
    # get the value associated with that action
    with self.value_network_params.detach().to_module(self.value_network):
        td_copy = self.value_network(td_copy)
    return -td_copy.get("state_action_value")

value 損失方法

我們現在需要優化我們的 value 網路參數。為此,我們將依賴於我們類的 value 估計器

from torchrl.objectives.utils import distance_loss


def _loss_value(
    self,
    tensordict,
):
    td_copy = tensordict.clone()

    # V(s, a)
    with self.value_network_params.to_module(self.value_network):
        self.value_network(td_copy)
    pred_val = td_copy.get("state_action_value").squeeze(-1)

    # we manually reconstruct the parameters of the actor-critic, where the first
    # set of parameters belongs to the actor and the second to the value function.
    target_params = TensorDict(
        {
            "module": {
                "0": self.target_actor_network_params,
                "1": self.target_value_network_params,
            }
        },
        batch_size=self.target_actor_network_params.batch_size,
        device=self.target_actor_network_params.device,
    )
    with target_params.to_module(self.actor_critic):
        target_value = self.value_estimator.value_estimate(tensordict).squeeze(-1)

    # Computes the value loss: L2, L1 or smooth L1 depending on `self.loss_function`
    loss_value = distance_loss(pred_val, target_value, loss_function=self.loss_function)
    td_error = (pred_val - target_value).pow(2)

    return loss_value, td_error, pred_val, target_value

將所有內容放在一起進行 forward 呼叫

唯一缺少的部分是 forward 方法,它將 value 和 actor 損失粘合在一起,收集成本值並將它們寫入傳遞給使用者的 TensorDict 中。

from tensordict import TensorDict, TensorDictBase


def _forward(self, input_tensordict: TensorDictBase) -> TensorDict:
    loss_value, td_error, pred_val, target_value = self.loss_value(
        input_tensordict,
    )
    td_error = td_error.detach()
    td_error = td_error.unsqueeze(input_tensordict.ndimension())
    if input_tensordict.device is not None:
        td_error = td_error.to(input_tensordict.device)
    input_tensordict.set(
        "td_error",
        td_error,
        inplace=True,
    )
    loss_actor = self.loss_actor(input_tensordict)
    return TensorDict(
        source={
            "loss_actor": loss_actor.mean(),
            "loss_value": loss_value.mean(),
            "pred_value": pred_val.mean().detach(),
            "target_value": target_value.mean().detach(),
            "pred_value_max": pred_val.max().detach(),
            "target_value_max": target_value.max().detach(),
        },
        batch_size=[],
    )


from torchrl.objectives import LossModule


class DDPGLoss(LossModule):
    default_value_estimator = default_value_estimator
    make_value_estimator = make_value_estimator

    __init__ = _init
    forward = _forward
    loss_value = _loss_value
    loss_actor = _loss_actor

現在我們有了損失,我們可以利用它來訓練策略以解決控制任務。

環境

在大多數演算法中,首先需要關注的是環境的建構,因為它決定了訓練腳本的其餘部分。

在這個範例中,我們將使用 "cheetah" 任務。目標是讓半人馬獵豹盡可能快速地奔跑。

在 TorchRL 中,可以透過依賴 dm_controlgym 來建立這樣的任務

env = GymEnv("HalfCheetah-v4")

或者

env = DMControlEnv("cheetah", "run")

預設情況下,這些環境會停用渲染。從狀態進行訓練通常比從圖像進行訓練更容易。為了簡化起見,我們只專注於從狀態進行學習。要將像素傳遞給由 env.step() 收集的 tensordicts,只需將 from_pixels=True 參數傳遞給建構函式即可

env = GymEnv("HalfCheetah-v4", from_pixels=True, pixels_only=True)

我們編寫一個 make_env() 輔助函式,它將使用上面考慮的兩個後端(dm-controlgym)之一來建立環境。

from torchrl.envs.libs.dm_control import DMControlEnv
from torchrl.envs.libs.gym import GymEnv

env_library = None
env_name = None


def make_env(from_pixels=False):
    """Create a base ``env``."""
    global env_library
    global env_name

    if backend == "dm_control":
        env_name = "cheetah"
        env_task = "run"
        env_args = (env_name, env_task)
        env_library = DMControlEnv
    elif backend == "gym":
        env_name = "HalfCheetah-v4"
        env_args = (env_name,)
        env_library = GymEnv
    else:
        raise NotImplementedError

    env_kwargs = {
        "device": device,
        "from_pixels": from_pixels,
        "pixels_only": from_pixels,
        "frame_skip": 2,
    }
    env = env_library(*env_args, **env_kwargs)
    return env

轉換 (Transforms)

現在我們有了基礎環境,我們可能希望修改其表示形式,使其對策略更加友善。在 TorchRL 中,轉換會附加到專用的 torchr.envs.TransformedEnv 類別中的基礎環境。

  • 在 DDPG 中,通常會使用一些啟發式值來重新縮放獎勵。在這個範例中,我們將獎勵乘以 5。

  • 如果我們使用 dm_control,那麼建立一個在處理雙精度數字的模擬器和我們可能使用單精度的腳本之間的介面也很重要。這種轉換是雙向的:當呼叫 env.step() 時,我們的動作需要以雙精度表示,並且輸出需要轉換為單精度。DoubleToFloat 轉換正是這樣做的:in_keys 列表指的是需要從雙精度轉換為浮點數的鍵,而 in_keys_inv 則指的是那些在傳遞給環境之前需要轉換為雙精度的鍵。

  • 我們使用 CatTensors 轉換將狀態鍵連接在一起。

  • 最後,我們也保留了標準化狀態的可能性:我們將在稍後負責計算標準化常數。

from torchrl.envs import (
    CatTensors,
    DoubleToFloat,
    EnvCreator,
    InitTracker,
    ObservationNorm,
    ParallelEnv,
    RewardScaling,
    StepCounter,
    TransformedEnv,
)


def make_transformed_env(
    env,
):
    """Apply transforms to the ``env`` (such as reward scaling and state normalization)."""

    env = TransformedEnv(env)

    # we append transforms one by one, although we might as well create the
    # transformed environment using the `env = TransformedEnv(base_env, transforms)`
    # syntax.
    env.append_transform(RewardScaling(loc=0.0, scale=reward_scaling))

    # We concatenate all states into a single "observation_vector"
    # even if there is a single tensor, it'll be renamed in "observation_vector".
    # This facilitates the downstream operations as we know the name of the
    # output tensor.
    # In some environments (not half-cheetah), there may be more than one
    # observation vector: in this case this code snippet will concatenate them
    # all.
    selected_keys = list(env.observation_spec.keys())
    out_key = "observation_vector"
    env.append_transform(CatTensors(in_keys=selected_keys, out_key=out_key))

    # we normalize the states, but for now let's just instantiate a stateless
    # version of the transform
    env.append_transform(ObservationNorm(in_keys=[out_key], standard_normal=True))

    env.append_transform(DoubleToFloat())

    env.append_transform(StepCounter(max_frames_per_traj))

    # We need a marker for the start of trajectories for our Ornstein-Uhlenbeck (OU)
    # exploration:
    env.append_transform(InitTracker())

    return env

平行執行 (Parallel execution)

以下輔助函式允許我們平行執行環境。平行執行環境可以顯著加快收集吞吐量。當使用轉換後的環境時,我們需要選擇是希望為每個環境個別執行轉換,還是集中資料並批量轉換。這兩種方法都易於編碼

env = ParallelEnv(
    lambda: TransformedEnv(GymEnv("HalfCheetah-v4"), transforms),
    num_workers=4
)
env = TransformedEnv(
    ParallelEnv(lambda: GymEnv("HalfCheetah-v4"), num_workers=4),
    transforms
)

為了利用 PyTorch 的向量化功能,我們採用第一種方法

def parallel_env_constructor(
    env_per_collector,
    transform_state_dict,
):
    if env_per_collector == 1:

        def make_t_env():
            env = make_transformed_env(make_env())
            env.transform[2].init_stats(3)
            env.transform[2].loc.copy_(transform_state_dict["loc"])
            env.transform[2].scale.copy_(transform_state_dict["scale"])
            return env

        env_creator = EnvCreator(make_t_env)
        return env_creator

    parallel_env = ParallelEnv(
        num_workers=env_per_collector,
        create_env_fn=EnvCreator(lambda: make_env()),
        create_env_kwargs=None,
        pin_memory=False,
    )
    env = make_transformed_env(parallel_env)
    # we call `init_stats` for a limited number of steps, just to instantiate
    # the lazy buffers.
    env.transform[2].init_stats(3, cat_dim=1, reduce_dim=[0, 1])
    env.transform[2].load_state_dict(transform_state_dict)
    return env


# The backend can be ``gym`` or ``dm_control``
backend = "gym"

注意

frame_skip 將多個步驟與單個動作批量處理。如果 > 1,則需要調整其他幀計數(例如,frames_per_batch、total_frames),以使跨實驗收集的幀總數保持一致。這很重要,因為提高 frame-skip 但保持幀總數不變可能看起來像作弊:所有事物相比之下,以 frame-skip 為 2 收集的 10M 元素資料集與以 frame-skip 為 1 收集的資料集實際上與環境的互動比率為 2:1!簡而言之,在處理 frame skipping 時,應該注意訓練腳本的幀數,因為這可能會導致訓練策略之間的比較產生偏差。

縮放獎勵有助於我們控制訊號幅度,以實現更有效的學習。

reward_scaling = 5.0

我們也定義了何時會截斷軌跡。一千步(如果 frame-skip = 2 則為 500)是 cheetah 任務的一個不錯的數字

max_frames_per_traj = 500

觀察的標準化 (Normalization of the observations)

為了計算標準化統計量,我們在環境中執行任意數量的隨機步驟,並計算收集的觀察值的平均值和標準差。ObservationNorm.init_stats() 方法可用於此目的。為了獲得摘要統計量,我們建立一個虛擬環境並執行給定的步數,在給定的步數中收集資料並計算其摘要統計量。

def get_env_stats():
    """Gets the stats of an environment."""
    proof_env = make_transformed_env(make_env())
    t = proof_env.transform[2]
    t.init_stats(init_env_steps)
    transform_state_dict = t.state_dict()
    proof_env.close()
    return transform_state_dict

標準化統計量 (Normalization stats)

使用 ObservationNorm 進行統計量計算時使用的隨機步驟數

init_env_steps = 5000

transform_state_dict = get_env_stats()

每個資料收集器中的環境數量

env_per_collector = 4

我們傳遞先前計算的統計量來標準化我們環境的輸出

parallel_env = parallel_env_constructor(
    env_per_collector=env_per_collector,
    transform_state_dict=transform_state_dict,
)


from torchrl.data import Composite

建立模型 (Building the model)

現在我們開始設定模型。正如我們所看到的,DDPG 需要一個價值網路,用於訓練以估計狀態-動作對的值,以及一個參數化 actor,用於學習如何選擇最大化此值的動作。

回想一下,建立 TorchRL 模組需要兩個步驟

  • 編寫將用作網路的 torch.nn.Module

  • 將網路包裝在 tensordict.nn.TensorDictModule 中,其中資料流由指定輸入和輸出鍵來處理。

在更複雜的場景中,也可以使用 tensordict.nn.TensorDictSequential

Q-Value 網路包裝在 ValueOperator 中,該網路會自動將 out_keys 設定為 q-value 網路的 "state_action_value 和其他價值網路的 state_value

TorchRL 提供原始論文中提出的 DDPG 網路的內建版本。這些可以在 DdpgMlpActorDdpgMlpQNet 下找到。

由於我們使用延遲模組,因此有必要在能夠將策略從裝置移動到裝置並完成其他操作之前實現延遲模組。因此,最好使用一個小型的資料樣本來執行模組。為此,我們從環境規格產生虛擬資料。

from torchrl.modules import (
    ActorCriticWrapper,
    DdpgMlpActor,
    DdpgMlpQNet,
    OrnsteinUhlenbeckProcessModule,
    ProbabilisticActor,
    TanhDelta,
    ValueOperator,
)


def make_ddpg_actor(
    transform_state_dict,
    device="cpu",
):
    proof_environment = make_transformed_env(make_env())
    proof_environment.transform[2].init_stats(3)
    proof_environment.transform[2].load_state_dict(transform_state_dict)

    out_features = proof_environment.action_spec.shape[-1]

    actor_net = DdpgMlpActor(
        action_dim=out_features,
    )

    in_keys = ["observation_vector"]
    out_keys = ["param"]

    actor = TensorDictModule(
        actor_net,
        in_keys=in_keys,
        out_keys=out_keys,
    )

    actor = ProbabilisticActor(
        actor,
        distribution_class=TanhDelta,
        in_keys=["param"],
        spec=Composite(action=proof_environment.action_spec),
    ).to(device)

    q_net = DdpgMlpQNet()

    in_keys = in_keys + ["action"]
    qnet = ValueOperator(
        in_keys=in_keys,
        module=q_net,
    ).to(device)

    # initialize lazy modules
    qnet(actor(proof_environment.reset().to(device)))
    return actor, qnet


actor, qnet = make_ddpg_actor(
    transform_state_dict=transform_state_dict,
    device=device,
)

探索 (Exploration)

策略傳遞到 OrnsteinUhlenbeckProcessModule 探索模組中,正如原始論文中所建議的那樣。讓我們定義 OU 雜訊達到其最小值之前的幀數

annealing_frames = 1_000_000

actor_model_explore = TensorDictSequential(
    actor,
    OrnsteinUhlenbeckProcessModule(
        spec=actor.spec.clone(),
        annealing_num_steps=annealing_frames,
    ).to(device),
)
if device == torch.device("cpu"):
    actor_model_explore.share_memory()

資料收集器 (Data collector)

TorchRL 提供了專用的類別,可協助您透過在環境中執行策略來收集資料。這些「資料收集器」會反覆運算以計算在給定時間要執行的動作,然後在環境中執行一個步驟,並在需要時重設它。資料收集器的設計旨在協助開發人員嚴格控制每批資料的幀數、此收集的 (非)同步性質以及分配給資料收集的資源(例如 GPU、工作人員數量等等)。

在這裡,我們將使用 SyncDataCollector,這是一個簡單的單進程資料收集器。TorchRL 提供了其他的收集器,例如 MultiaSyncDataCollector,它以非同步方式執行 rollouts(例如,在優化策略時會收集資料,從而將訓練和資料收集分開)。

需要指定的參數為:

  • 環境工廠或環境、

  • 策略、

  • 收集器被視為空之前的總幀數、

  • 每個軌跡的最大幀數(對於非終止環境(如 dm_control)很有用)。

    注意

    傳遞給收集器的 max_frames_per_traj 將具有註冊新的 StepCounter 轉換的效果,該轉換用於推論的環境。我們可以手動實現相同的結果,就像我們在這個腳本中所做的那樣。

還應該傳遞:

  • 每個收集批次中的幀數、

  • 獨立於策略執行的隨機步數、

  • 用於策略執行的裝置

  • 用於在資料傳遞到主進程之前儲存資料的裝置。

我們在訓練期間使用的總幀數應約為 1M。

total_frames = 10_000  # 1_000_000

收集器在外部迴圈的每次迭代中傳回的幀數等於每個子軌跡的長度乘以每個收集器中並行執行的環境數量。

換句話說,我們期望來自收集器的批次具有形狀 [env_per_collector, traj_len],其中 traj_len=frames_per_batch/env_per_collector

traj_len = 200
frames_per_batch = env_per_collector * traj_len
init_random_frames = 5000
num_collectors = 2

from torchrl.collectors import SyncDataCollector
from torchrl.envs import ExplorationType

collector = SyncDataCollector(
    parallel_env,
    policy=actor_model_explore,
    total_frames=total_frames,
    frames_per_batch=frames_per_batch,
    init_random_frames=init_random_frames,
    reset_at_each_iter=False,
    split_trajs=False,
    device=collector_device,
    exploration_type=ExplorationType.RANDOM,
)

評估器:建立您的記錄器物件

由於訓練資料是使用某些探索策略獲得的,因此我們需要以確定性模式評估演算法的真實效能。我們使用專用的類別 Recorder 來執行此操作,該類別以給定的頻率在環境中執行策略,並傳回從這些模擬中獲得的一些統計數據。

以下輔助函式建立此物件

from torchrl.trainers import Recorder


def make_recorder(actor_model_explore, transform_state_dict, record_interval):
    base_env = make_env()
    environment = make_transformed_env(base_env)
    environment.transform[2].init_stats(
        3
    )  # must be instantiated to load the state dict
    environment.transform[2].load_state_dict(transform_state_dict)

    recorder_obj = Recorder(
        record_frames=1000,
        policy_exploration=actor_model_explore,
        environment=environment,
        exploration_type=ExplorationType.DETERMINISTIC,
        record_interval=record_interval,
    )
    return recorder_obj

我們將記錄每收集 10 個批次的效能

record_interval = 10

recorder = make_recorder(
    actor_model_explore, transform_state_dict, record_interval=record_interval
)

from torchrl.data.replay_buffers import (
    LazyMemmapStorage,
    PrioritizedSampler,
    RandomSampler,
    TensorDictReplayBuffer,
)

重播緩衝區

重播緩衝區有兩種形式:優先級(其中使用一些錯誤訊號來給予某些項目比其他項目更高的採樣可能性)和常規的循環經驗重播。

TorchRL 重播緩衝區是可組合的:可以選擇儲存、採樣和寫入策略。也可以使用記憶體對應陣列將張量儲存在實體記憶體上。以下函式負責使用所需的超參數建立重播緩衝區

from torchrl.envs import RandomCropTensorDict


def make_replay_buffer(buffer_size, batch_size, random_crop_len, prefetch=3, prb=False):
    if prb:
        sampler = PrioritizedSampler(
            max_capacity=buffer_size,
            alpha=0.7,
            beta=0.5,
        )
    else:
        sampler = RandomSampler()
    replay_buffer = TensorDictReplayBuffer(
        storage=LazyMemmapStorage(
            buffer_size,
            scratch_dir=buffer_scratch_dir,
        ),
        batch_size=batch_size,
        sampler=sampler,
        pin_memory=False,
        prefetch=prefetch,
        transform=RandomCropTensorDict(random_crop_len, sample_dim=1),
    )
    return replay_buffer

我們將把重播緩衝區儲存在磁碟上的臨時目錄中

import tempfile

tmpdir = tempfile.TemporaryDirectory()
buffer_scratch_dir = tmpdir.name

重播緩衝區儲存和批次大小

TorchRL 重播緩衝區計算沿第一個維度的元素數量。由於我們會將軌跡饋送到我們的緩衝區,因此我們需要透過將緩衝區大小除以資料收集器產生的子軌跡長度來調整緩衝區大小。關於批次大小,我們的採樣策略將包括採樣長度為 traj_len=200 的軌跡,然後選擇長度為 random_crop_len=25 的子軌跡,並在這些子軌跡上計算損失。此策略平衡了儲存特定長度的整個軌跡的選擇與為我們的損失提供具有足夠異質性的樣本的需求。下圖顯示了從每個批次中獲得 8 個幀並以 2 個並行運行的環境運行的收集器的資料流程,將它們饋送到包含 1000 個軌跡的重播緩衝區,並採樣每個 2 個時間步長的子軌跡。

Storing trajectories in the replay buffer

讓我們從儲存在緩衝區中的幀數開始

def ceil_div(x, y):
    return -x // (-y)


buffer_size = 1_000_000
buffer_size = ceil_div(buffer_size, traj_len)

預設情況下已停用優先級重播緩衝區

prb = False

我們還需要定義為每個收集的資料批次執行多少次更新。這被稱為更新到資料或 UTD 比率

update_to_data = 64

我們將使用長度為 25 的軌跡饋送損失

random_crop_len = 25

在原始論文中,作者對收集的每個幀執行一次更新,批次大小為 64 個元素。在這裡,我們重現相同的比率,同時在每次批次收集時實現多次更新。我們調整我們的批次大小以實現相同的每個幀更新比率

batch_size = ceil_div(64 * frames_per_batch, update_to_data * random_crop_len)

replay_buffer = make_replay_buffer(
    buffer_size=buffer_size,
    batch_size=batch_size,
    random_crop_len=random_crop_len,
    prefetch=3,
    prb=prb,
)

損失模組建構

我們使用剛才建立的 actor 和 qnet 建立我們的損失模組。因為我們有要更新的目標參數,所以我們 _必須_ 建立目標網路更新器。

gamma = 0.99
lmbda = 0.9
tau = 0.001  # Decay factor for the target network

loss_module = DDPGLoss(actor, qnet)

讓我們使用 TD(lambda) 估算器!

loss_module.make_value_estimator(ValueEstimators.TDLambda, gamma=gamma, lmbda=lmbda)

注意

離線策略通常規定 TD(0) 估算器。在這裡,我們使用 TD(\(\lambda\)) 估算器,因為遵循特定狀態的軌跡是使用過時的策略收集的,這會引入一些偏差。這個技巧,以及可以在資料收集期間使用的多步驟技巧,都是「駭客」的替代版本,我們通常發現它們在實踐中效果很好,儘管它們在回報估算中引入了一些偏差。

目標網路更新器

目標網路是離線策略 RL 演算法的關鍵部分。由於 HardUpdateSoftUpdate 類別,更新目標網路參數變得容易。它們以損失模組作為參數構建,並且透過在訓練迴圈中的適當位置呼叫 updater.step() 來實現更新。

from torchrl.objectives.utils import SoftUpdate

target_net_updater = SoftUpdate(loss_module, eps=1 - tau)

最佳化器 (Optimizer)

最後,我們將使用 Adam 最佳化器來訓練策略網路和價值網路

from torch import optim

optimizer_actor = optim.Adam(
    loss_module.actor_network_params.values(True, True), lr=1e-4, weight_decay=0.0
)
optimizer_value = optim.Adam(
    loss_module.value_network_params.values(True, True), lr=1e-3, weight_decay=1e-2
)
total_collection_steps = total_frames // frames_per_batch

開始訓練策略

現在我們已經建立了所有需要的模組,訓練迴圈非常簡單。

rewards = []
rewards_eval = []

# Main loop

collected_frames = 0
pbar = tqdm.tqdm(total=total_frames)
r0 = None
for i, tensordict in enumerate(collector):

    # update weights of the inference policy
    collector.update_policy_weights_()

    if r0 is None:
        r0 = tensordict["next", "reward"].mean().item()
    pbar.update(tensordict.numel())

    # extend the replay buffer with the new data
    current_frames = tensordict.numel()
    collected_frames += current_frames
    replay_buffer.extend(tensordict.cpu())

    # optimization steps
    if collected_frames >= init_random_frames:
        for _ in range(update_to_data):
            # sample from replay buffer
            sampled_tensordict = replay_buffer.sample().to(device)

            # Compute loss
            loss_dict = loss_module(sampled_tensordict)

            # optimize
            loss_dict["loss_actor"].backward()
            gn1 = torch.nn.utils.clip_grad_norm_(
                loss_module.actor_network_params.values(True, True), 10.0
            )
            optimizer_actor.step()
            optimizer_actor.zero_grad()

            loss_dict["loss_value"].backward()
            gn2 = torch.nn.utils.clip_grad_norm_(
                loss_module.value_network_params.values(True, True), 10.0
            )
            optimizer_value.step()
            optimizer_value.zero_grad()

            gn = (gn1**2 + gn2**2) ** 0.5

            # update priority
            if prb:
                replay_buffer.update_tensordict_priority(sampled_tensordict)
            # update target network
            target_net_updater.step()

    rewards.append(
        (
            i,
            tensordict["next", "reward"].mean().item(),
        )
    )
    td_record = recorder(None)
    if td_record is not None:
        rewards_eval.append((i, td_record["r_evaluation"].item()))
    if len(rewards_eval) and collected_frames >= init_random_frames:
        target_value = loss_dict["target_value"].item()
        loss_value = loss_dict["loss_value"].item()
        loss_actor = loss_dict["loss_actor"].item()
        rn = sampled_tensordict["next", "reward"].mean().item()
        rs = sampled_tensordict["next", "reward"].std().item()
        pbar.set_description(
            f"reward: {rewards[-1][1]: 4.2f} (r0 = {r0: 4.2f}), "
            f"reward eval: reward: {rewards_eval[-1][1]: 4.2f}, "
            f"reward normalized={rn :4.2f}/{rs :4.2f}, "
            f"grad norm={gn: 4.2f}, "
            f"loss_value={loss_value: 4.2f}, "
            f"loss_actor={loss_actor: 4.2f}, "
            f"target value: {target_value: 4.2f}"
        )

    # update the exploration strategy
    actor_model_explore[1].step(current_frames)

collector.shutdown()
del collector
  0%|          | 0/10000 [00:00<?, ?it/s]
  8%|▊         | 800/10000 [00:00<00:03, 2880.70it/s]
 16%|█▌        | 1600/10000 [00:01<00:09, 872.84it/s]
 24%|██▍       | 2400/10000 [00:01<00:06, 1257.62it/s]
 32%|███▏      | 3200/10000 [00:02<00:04, 1587.49it/s]
 40%|████      | 4000/10000 [00:02<00:03, 1856.09it/s]
 48%|████▊     | 4800/10000 [00:02<00:02, 2073.04it/s]
 56%|█████▌    | 5600/10000 [00:03<00:01, 2237.00it/s]
reward: -2.34 (r0 = -1.62), reward eval: reward: -0.00, reward normalized=-2.01/6.42, grad norm= 79.50, loss_value= 372.62, loss_actor= 12.92, target value: -13.97:  56%|█████▌    | 5600/10000 [00:04<00:01, 2237.00it/s]
reward: -2.34 (r0 = -1.62), reward eval: reward: -0.00, reward normalized=-2.01/6.42, grad norm= 79.50, loss_value= 372.62, loss_actor= 12.92, target value: -13.97:  64%|██████▍   | 6400/10000 [00:05<00:04, 864.27it/s]
reward: -2.26 (r0 = -1.62), reward eval: reward: -0.00, reward normalized=-2.58/5.56, grad norm= 154.12, loss_value= 228.92, loss_actor= 13.34, target value: -17.21:  64%|██████▍   | 6400/10000 [00:06<00:04, 864.27it/s]
reward: -2.26 (r0 = -1.62), reward eval: reward: -0.00, reward normalized=-2.58/5.56, grad norm= 154.12, loss_value= 228.92, loss_actor= 13.34, target value: -17.21:  72%|███████▏  | 7200/10000 [00:07<00:04, 612.71it/s]
reward: -4.77 (r0 = -1.62), reward eval: reward: -0.00, reward normalized=-1.90/6.40, grad norm= 131.08, loss_value= 344.77, loss_actor= 13.81, target value: -13.14:  72%|███████▏  | 7200/10000 [00:09<00:04, 612.71it/s]
reward: -4.77 (r0 = -1.62), reward eval: reward: -0.00, reward normalized=-1.90/6.40, grad norm= 131.08, loss_value= 344.77, loss_actor= 13.81, target value: -13.14:  80%|████████  | 8000/10000 [00:09<00:03, 512.64it/s]
reward: -4.79 (r0 = -1.62), reward eval: reward: -0.00, reward normalized=-2.79/5.54, grad norm= 76.23, loss_value= 215.53, loss_actor= 18.52, target value: -18.66:  80%|████████  | 8000/10000 [00:11<00:03, 512.64it/s]
reward: -4.79 (r0 = -1.62), reward eval: reward: -0.00, reward normalized=-2.79/5.54, grad norm= 76.23, loss_value= 215.53, loss_actor= 18.52, target value: -18.66:  88%|████████▊ | 8800/10000 [00:11<00:02, 461.42it/s]
reward: -5.21 (r0 = -1.62), reward eval: reward: -5.98, reward normalized=-2.79/5.03, grad norm= 99.13, loss_value= 200.90, loss_actor= 20.27, target value: -19.92:  88%|████████▊ | 8800/10000 [00:14<00:02, 461.42it/s]
reward: -5.21 (r0 = -1.62), reward eval: reward: -5.98, reward normalized=-2.79/5.03, grad norm= 99.13, loss_value= 200.90, loss_actor= 20.27, target value: -19.92:  96%|█████████▌| 9600/10000 [00:14<00:01, 370.60it/s]
reward: -4.69 (r0 = -1.62), reward eval: reward: -5.98, reward normalized=-3.77/4.74, grad norm= 158.99, loss_value= 172.62, loss_actor= 24.22, target value: -26.04:  96%|█████████▌| 9600/10000 [00:16<00:01, 370.60it/s]
reward: -4.69 (r0 = -1.62), reward eval: reward: -5.98, reward normalized=-3.77/4.74, grad norm= 158.99, loss_value= 172.62, loss_actor= 24.22, target value: -26.04: : 10400it [00:17, 340.85it/s]
reward: -4.96 (r0 = -1.62), reward eval: reward: -5.98, reward normalized=-3.26/4.77, grad norm= 330.53, loss_value= 227.84, loss_actor= 21.39, target value: -23.79: : 10400it [00:19, 340.85it/s]

實驗結果

我們繪製了訓練期間平均獎勵的簡單圖表。 我們可以觀察到我們的策略學得很好,能夠解決這個任務。

注意

如上所述,為了獲得更合理的性能,請使用更大的 total_frames 值,例如 1M。

from matplotlib import pyplot as plt

plt.figure()
plt.plot(*zip(*rewards), label="training")
plt.plot(*zip(*rewards_eval), label="eval")
plt.legend()
plt.xlabel("iter")
plt.ylabel("reward")
plt.tight_layout()
coding ddpg

結論

在本教學中,我們學習了如何在 TorchRL 中編寫損失模組,並以 DDPG 為具體例子。

主要的重點是:

  • 如何使用 LossModule 類別來編寫新的損失元件;

  • 如何使用(或不使用)目標網路,以及如何更新其參數;

  • 如何建立與損失模組相關聯的最佳化器。

下一步

為了進一步迭代這個損失模組,我們可以考慮:

腳本的總運行時間: (1 分鐘 51.854 秒)

估計的記憶體使用量: 330 MB

由 Sphinx-Gallery 產生的圖庫

文件

存取 PyTorch 的完整開發者文件

檢視文件

教學

取得初學者和高級開發者的深入教學

檢視教學

資源

尋找開發資源並獲得問題的解答

檢視資源