BraxWrapper¶
- torchrl.envs.BraxWrapper(*args, **kwargs)[原始碼]¶
Google Brax 環境封裝器。
Brax 提供基於 Jax 的向量化和可微分模擬框架。TorchRL 的封裝器會產生一些 jax 到 torch 轉換的額外負擔,但計算圖仍然可以在模擬的軌跡之上建立,從而允許透過 rollout 進行反向傳播。
GitHub: https://github.com/google/brax
論文: https://arxiv.org/abs/2106.13281
- 參數:
env (brax.envs.base.PipelineEnv) – 要封裝的環境。
categorical_action_encoding (bool, optional) – 如果
True
,categorical 規格將轉換為 TorchRL 等效項 (torchrl.data.Categorical
),否則將使用 one-hot 編碼 (torchrl.data.OneHot
)。預設為False
。
- 關鍵字參數:
from_pixels (bool, optional) – 尚未支援。
frame_skip (int, optional) – 如果提供,表示要重複執行相同動作的步數。傳回的觀察將是序列的最後一個觀察,而獎勵將是跨步數的獎勵總和。
device (torch.device, optional) – 如果提供,則為要將資料轉換到的裝置。預設為
torch.device("cpu")
。batch_size (torch.Size, optional) – 環境的批次大小。在
brax
中,這表示向量化環境的數量。預設為torch.Size([])
。allow_done_after_reset (bool, optional) – 如果
True
,則在呼叫reset()
後,允許 envs 立即done
。預設為False
。
- 變數:
available_envs – 可用於建置的環境
範例
>>> import brax.envs >>> from torchrl.envs import BraxWrapper >>> import torch >>> device = "cuda" if torch.cuda.is_available() else "cpu" >>> base_env = brax.envs.get_environment("ant") >>> env = BraxWrapper(base_env, device=device) >>> env.set_seed(0) >>> td = env.reset() >>> td["action"] = env.action_spec.rand() >>> td = env.step(td) >>> print(td) TensorDict( fields={ action: Tensor(torch.Size([8]), dtype=torch.float32), done: Tensor(torch.Size([1]), dtype=torch.bool), next: TensorDict( fields={ observation: Tensor(torch.Size([87]), dtype=torch.float32)}, batch_size=torch.Size([]), device=cpu, is_shared=False), observation: Tensor(torch.Size([87]), dtype=torch.float32), reward: Tensor(torch.Size([1]), dtype=torch.float32), state: TensorDict(...)}, batch_size=torch.Size([]), device=cpu, is_shared=False) >>> print(env.available_envs) ['acrobot', 'ant', 'fast', 'fetch', ...]
為了利用 Brax 的優勢,通常會同時執行多個環境。在以下範例中,我們反覆測試不同的批次大小,並報告短期 rollout 的執行時間
範例
>>> import torch >>> from torch.utils.benchmark import Timer >>> device = "cuda" if torch.cuda.is_available() else "cpu" >>> for batch_size in [4, 16, 128]: ... timer = Timer(''' ... env.rollout(100) ... ''', ... setup=f''' ... import brax.envs ... from torchrl.envs import BraxWrapper ... env = BraxWrapper(brax.envs.get_environment("ant"), batch_size=[{batch_size}], device="{device}") ... env.set_seed(0) ... env.rollout(2) ... ''') ... print(batch_size, timer.timeit(10)) 4 env.rollout(100) setup: [...] 310.00 ms 1 measurement, 10 runs , 1 thread
16 env.rollout(100) setup: […] 268.46 ms 1 measurement, 10 runs , 1 thread
128 env.rollout(100) setup: […] 433.80 ms 1 measurement, 10 runs , 1 thread
可以透過 rollout 進行反向傳播並直接最佳化策略
>>> import brax.envs >>> from torchrl.envs import BraxWrapper >>> from tensordict.nn import TensorDictModule >>> from torch import nn >>> import torch >>> >>> env = BraxWrapper(brax.envs.get_environment("ant"), batch_size=[10], requires_grad=True) >>> env.set_seed(0) >>> torch.manual_seed(0) >>> policy = TensorDictModule(nn.Linear(27, 8), in_keys=["observation"], out_keys=["action"]) >>> >>> td = env.rollout(10, policy) >>> >>> td["next", "reward"].mean().backward(retain_graph=True) >>> print(policy.module.weight.grad.norm()) tensor(213.8605)