注意
前往結尾以下載完整的範例程式碼。
開始使用資料收集與儲存¶
作者: Vincent Moens
注意
若要在筆記本中執行本教學,請在開頭新增一個安裝儲存格,其中包含
!pip install tensordict !pip install torchrl
沒有資料就沒有學習。在監督式學習中,使用者習慣使用 DataLoader
等工具將資料整合到他們的訓練迴圈中。Dataloader 是一種可迭代的物件,為你提供將用於訓練模型的資料。
TorchRL 以類似的方式處理資料載入的問題,儘管它在 RL 函式庫的生態系統中非常獨特。TorchRL 的資料載入器稱為 DataCollectors
。大多數情況下,資料收集並不止於原始資料的收集,因為資料需要暫時儲存在緩衝區(或用於 on-policy 演算法的等效結構)中,然後才能被 損失模組 使用。本教學將探討這兩個類別。
資料收集器¶
此處討論的主要資料收集器是 SyncDataCollector
,這是本文檔的重點。從根本上講,收集器是一個簡單的類別,負責在環境中執行你的策略,在必要時重置環境,並提供預定義大小的批次。與 rollout()
方法(在 環境教學 中示範)不同,收集器不會在連續的資料批次之間重置。因此,兩個連續的資料批次可能包含來自相同軌跡的元素。
你需要傳遞給收集器的基本引數是你想要收集的批次大小 (frames_per_batch
)、迭代器的長度(可能是無限的)、策略和環境。為了簡單起見,我們將在本範例中使用一個虛擬的隨機策略。
import torch
torch.manual_seed(0)
from torchrl.collectors import SyncDataCollector
from torchrl.envs import GymEnv
from torchrl.envs.utils import RandomPolicy
env = GymEnv("CartPole-v1")
env.set_seed(0)
policy = RandomPolicy(env.action_spec)
collector = SyncDataCollector(env, policy, frames_per_batch=200, total_frames=-1)
我們現在期望我們的收集器提供大小為 200
的批次,無論收集過程中發生什麼。換句話說,我們可能在這個批次中有多个軌跡!total_frames
指示收集器應該運作多久。-1
的值將產生一個永不停止的收集器。
讓我們迭代收集器,以了解這些資料的外觀
for data in collector:
print(data)
break
TensorDict(
fields={
action: Tensor(shape=torch.Size([200, 2]), device=cpu, dtype=torch.int64, is_shared=False),
collector: TensorDict(
fields={
traj_ids: Tensor(shape=torch.Size([200]), device=cpu, dtype=torch.int64, is_shared=False)},
batch_size=torch.Size([200]),
device=None,
is_shared=False),
done: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.bool, is_shared=False),
next: TensorDict(
fields={
done: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.bool, is_shared=False),
observation: Tensor(shape=torch.Size([200, 4]), device=cpu, dtype=torch.float32, is_shared=False),
reward: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([200]),
device=None,
is_shared=False),
observation: Tensor(shape=torch.Size([200, 4]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([200]),
device=None,
is_shared=False)
正如你所看到的,我們的資料使用了一些收集器特定的中繼資料進行了增強,這些中繼資料分組在 "collector"
子 tensordict 中,這是我們在 環境 rollout 期間沒有看到的。這對於追蹤軌跡 ID 很有用。在以下列表中,每個項目都標記了相應轉換所屬的軌跡編號
print(data["collector", "traj_ids"])
tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2,
2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3,
3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 5, 5, 5,
5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6,
6, 6, 6, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7,
7, 7, 7, 7, 7, 7, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9,
9, 9, 9, 9, 9, 9, 9, 9])
在編寫最先進的演算法時,資料收集器非常有用,因為效能通常取決於特定技術在給定次數的環境互動中解決問題的能力(收集器中的 total_frames
參數)。因此,我們範例中的大多數訓練迴圈都如下所示:
>>> for data in collector:
... # your algorithm here
回放緩衝區¶
既然我們已經探討了如何收集資料,我們想知道如何儲存它。在 RL 中,典型的設置是資料被收集、暫時儲存,並在一段時間後根據某種啟發式方法清除:先進先出或其他方法。一個典型的虛擬碼如下所示:
>>> for data in collector:
... storage.store(data)
... for i in range(n_optim):
... sample = storage.sample()
... loss_val = loss_fn(sample)
... loss_val.backward()
... optim.step() # etc
在 TorchRL 中儲存資料的父類稱為 ReplayBuffer
。TorchRL 的回放緩衝區是可組合的:您可以編輯儲存類型、取樣技術、寫入啟發式或應用於它們的轉換。我們將把這些高級功能留給專門的深入教學。通用回放緩衝區只需要知道它必須使用哪種儲存方式。通常,我們建議使用 TensorStorage
子類,它在大多數情況下都能正常工作。在本教學中,我們將使用 LazyMemmapStorage
,它具有兩個優點:首先,由於它是 "lazy" 的,您不需要事先明確告訴它您的資料是什麼樣的。其次,它使用 MemoryMappedTensor
作為後端,以有效的方式將您的資料儲存在磁碟上。您唯一需要知道的是您希望緩衝區有多大。
from torchrl.data.replay_buffers import LazyMemmapStorage, ReplayBuffer
buffer = ReplayBuffer(storage=LazyMemmapStorage(max_size=1000))
可以使用 add()
(單個元素)或 extend()
(多個元素)方法來填充緩衝區。使用我們剛才收集的資料,我們一次性初始化並填充緩衝區
indices = buffer.extend(data)
我們可以檢查緩衝區現在擁有的元素數量與我們從收集器獲得的元素數量相同
assert len(buffer) == collector.frames_per_batch
剩下唯一要知道的是如何從緩衝區收集資料。自然地,這依賴於 sample()
方法。因為我們沒有指定取樣必須在沒有重複的情況下進行,所以不能保證從我們的緩衝區收集的樣本是唯一的
sample = buffer.sample(batch_size=30)
print(sample)
TensorDict(
fields={
action: Tensor(shape=torch.Size([30, 2]), device=cpu, dtype=torch.int64, is_shared=False),
collector: TensorDict(
fields={
traj_ids: Tensor(shape=torch.Size([30]), device=cpu, dtype=torch.int64, is_shared=False)},
batch_size=torch.Size([30]),
device=cpu,
is_shared=False),
done: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.bool, is_shared=False),
next: TensorDict(
fields={
done: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.bool, is_shared=False),
observation: Tensor(shape=torch.Size([30, 4]), device=cpu, dtype=torch.float32, is_shared=False),
reward: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([30]),
device=cpu,
is_shared=False),
observation: Tensor(shape=torch.Size([30, 4]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([30]),
device=cpu,
is_shared=False)
同樣,我們的樣本看起來與我們從收集器收集的資料完全相同!
下一步¶
您可以查看其他多進程收集器,例如
MultiSyncDataCollector
或MultiaSyncDataCollector
。如果您有多個節點用於推論,TorchRL 也提供分散式收集器。在 API 參考 中查看它們。
查看專用的 回放緩衝區教學 以了解有關建立緩衝區時可用的選項的更多資訊,或 API 參考,其中詳細介紹了所有功能。回放緩衝區具有無數功能,例如多線程取樣、優先經驗回放等等…
為了簡化起見,我們省略了回放緩衝區的可迭代性。親自試一試:建立一個緩衝區並在建構函數中指示其批次大小,然後嘗試迭代它。這相當於在迴圈中呼叫
rb.sample()
!
腳本的總運行時間:(0 分鐘 22.051 秒)
估計的記憶體使用量: 321 MB