MPPIPlanner¶
- class torchrl.modules.MPPIPlanner(*args, **kwargs)[source]¶
MPPI 規劃器模組。
- 參考
使用共變數變數重要性進行模型預測路徑積分控制
抽樣。(Williams, G., Aldrich, A., and Theodorou, E. A.) https://arxiv.org/abs/1509.01149 - 模型預測控制的時間差分學習
(Hansen N., Wang X., Su H.) https://arxiv.org/abs/2203.04955
當給定包含初始狀態的 TensorDict 時,此模組將執行 MPPI 規劃步驟。
呼叫模組會傳回根據經驗最大化給定規劃範圍內回報的動作
- 參數:
env (EnvBase) – 用於執行規劃步驟的環境 (可以是 ModelBasedEnv 或
EnvBase
)。planning_horizon (int) – 模擬軌跡的長度
optim_steps (int) – MPC 規劃器使用的優化步驟數
num_candidates (int) – 從高斯分佈中抽樣的候選數。
top_k (int) – 用於更新高斯分佈的平均值和標準差的頂部候選數。
reward_key (str, optional) – 用於檢索獎勵的 TensorDict 中的鍵。預設為“reward”。
action_key (str, optional) – 用於儲存動作的 TensorDict 中的鍵。預設為“action”
範例
>>> from tensordict import TensorDict >>> from torchrl.data import Composite, Unbounded >>> from torchrl.envs.model_based import ModelBasedEnvBase >>> from tensordict.nn import TensorDictModule >>> from torchrl.modules import ValueOperator >>> from torchrl.objectives.value import TDLambdaEstimator >>> class MyMBEnv(ModelBasedEnvBase): ... def __init__(self, world_model, device="cpu", dtype=None, batch_size=None): ... super().__init__(world_model, device=device, dtype=dtype, batch_size=batch_size) ... self.state_spec = Composite( ... hidden_observation=Unbounded((4,)) ... ) ... self.observation_spec = Composite( ... hidden_observation=Unbounded((4,)) ... ) ... self.action_spec = Unbounded((1,)) ... self.reward_spec = Unbounded((1,)) ... ... def _reset(self, tensordict: TensorDict) -> TensorDict: ... tensordict = TensorDict( ... {}, ... batch_size=self.batch_size, ... device=self.device, ... ) ... tensordict = tensordict.update( ... self.full_state_spec.rand()) ... tensordict = tensordict.update( ... self.full_action_spec.rand()) ... tensordict = tensordict.update( ... self.full_observation_spec.rand()) ... return tensordict ... >>> from torchrl.modules import MLP, WorldModelWrapper >>> import torch.nn as nn >>> world_model = WorldModelWrapper( ... TensorDictModule( ... MLP(out_features=4, activation_class=nn.ReLU, activate_last_layer=True, depth=0), ... in_keys=["hidden_observation", "action"], ... out_keys=["hidden_observation"], ... ), ... TensorDictModule( ... nn.Linear(4, 1), ... in_keys=["hidden_observation"], ... out_keys=["reward"], ... ), ... ) >>> env = MyMBEnv(world_model) >>> value_net = nn.Linear(4, 1) >>> value_net = ValueOperator(value_net, in_keys=["hidden_observation"]) >>> adv = TDLambdaEstimator( ... gamma=0.99, ... lmbda=0.95, ... value_network=value_net, ... ) >>> # Build a planner and use it as actor >>> planner = MPPIPlanner( ... env, ... adv, ... temperature=1.0, ... planning_horizon=10, ... optim_steps=11, ... num_candidates=7, ... top_k=3) >>> env.rollout(5, planner) TensorDict( fields={ action: Tensor(shape=torch.Size([5, 1]), device=cpu, dtype=torch.float32, is_shared=False), done: Tensor(shape=torch.Size([5, 1]), device=cpu, dtype=torch.bool, is_shared=False), hidden_observation: Tensor(shape=torch.Size([5, 4]), device=cpu, dtype=torch.float32, is_shared=False), next: TensorDict( fields={ done: Tensor(shape=torch.Size([5, 1]), device=cpu, dtype=torch.bool, is_shared=False), hidden_observation: Tensor(shape=torch.Size([5, 4]), device=cpu, dtype=torch.float32, is_shared=False), reward: Tensor(shape=torch.Size([5, 1]), device=cpu, dtype=torch.float32, is_shared=False), terminated: Tensor(shape=torch.Size([5, 1]), device=cpu, dtype=torch.bool, is_shared=False)}, batch_size=torch.Size([5]), device=cpu, is_shared=False), terminated: Tensor(shape=torch.Size([5, 1]), device=cpu, dtype=torch.bool, is_shared=False)}, batch_size=torch.Size([5]), device=cpu, is_shared=False)