捷徑

torchrl.data 套件

重播緩衝區

重播緩衝區是非策略 RL 演算法的核心部分。 TorchRL 提供了幾個廣泛使用的重播緩衝區的高效實作

ReplayBuffer(*[, storage, sampler, writer, ...])

通用的、可組合的重播緩衝區類別。

PrioritizedReplayBuffer(*, alpha, beta[, ...])

優先重播緩衝區。

TensorDictReplayBuffer(*[, priority_key])

圍繞 ReplayBuffer 類別的特定 TensorDict 包裝器。

TensorDictPrioritizedReplayBuffer(*, alpha, beta)

圍繞 PrioritizedReplayBuffer 類別的特定 TensorDict 包裝器。

可組合的重播緩衝區

我們還讓使用者能夠組合重播緩衝區。 我們提供廣泛的重播緩衝區使用解決方案,包括支援幾乎所有資料類型; 在記憶體、裝置或實體記憶體上儲存; 幾種抽樣策略; 轉換的使用等。

支援的資料類型和選擇儲存

理論上,回放緩衝區支援任何資料型別,但我們無法保證每個元件都支援所有資料型別。最原始的回放緩衝區實作是由具有 ListStorage 儲存空間的 ReplayBuffer 基底組成。 這種方式效率非常低,但它允許你儲存包含非張量資料的複雜資料結構。連續記憶體中的儲存空間包括 TensorStorageLazyTensorStorageLazyMemmapStorage。這些類別支援 TensorDict 資料作為一等公民,但也支援任何 PyTree 資料結構(例如,元組、列表、字典及其巢狀版本)。TensorStorage 儲存空間要求你在建構時提供儲存空間,而 TensorStorage (RAM, CUDA) 和 LazyMemmapStorage (實體記憶體) 會在第一次擴充後預先分配儲存空間。

以下是一些範例,首先是通用的 ListStorage

>>> from torchrl.data.replay_buffers import ReplayBuffer, ListStorage
>>> rb = ReplayBuffer(storage=ListStorage(10))
>>> rb.add("a string!") # first element will be a string
>>> rb.extend([30, None])  # element [1] is an int, [2] is None

寫入緩衝區的主要入口點是 add()extend()。 也可以使用 __setitem__(),在這種情況下,資料會寫入指定的位置,而不會更新緩衝區的長度或游標。當從緩衝區取樣項目,然後就地更新其值時,這會很有用。

使用 TensorStorage 我們告訴 RB 我們希望儲存空間是連續的,這效率更高,但也更具限制性

>>> import torch
>>> from torchrl.data.replay_buffers import ReplayBuffer, TensorStorage
>>> container = torch.empty(10, 3, 64, 64, dtype=torch.unit8)
>>> rb = ReplayBuffer(storage=TensorStorage(container))
>>> img = torch.randint(255, (3, 64, 64), dtype=torch.uint8)
>>> rb.add(img)

接下來,我們可以避免建立容器並要求儲存空間自動執行此操作。這在使用 PyTrees 和 tensordicts 時非常有用!對於 PyTrees 和其他資料結構,add() 會將傳遞給它的取樣視為該類型的一個實例。extend() 另一方面,會認為資料是可迭代的。對於張量、tensordicts 和列表(見下文),會在根層級尋找可迭代對象。對於 PyTrees,我們假設樹中所有葉子(張量)的前導維度都匹配。如果它們不匹配,extend 將會拋出例外。

>>> import torch
>>> from tensordict import TensorDict
>>> from torchrl.data.replay_buffers import ReplayBuffer, LazyMemmapStorage
>>> rb_td = ReplayBuffer(storage=LazyMemmapStorage(10), batch_size=1)  # max 10 elements stored
>>> rb_td.add(TensorDict({"img": torch.randint(255, (3, 64, 64), dtype=torch.unit8),
...     "labels": torch.randint(100, ())}, batch_size=[]))
>>> rb_pytree = ReplayBuffer(storage=LazyMemmapStorage(10))  # max 10 elements stored
>>> # extend with a PyTree where all tensors have the same leading dim (3)
>>> rb_pytree.extend({"a": {"b": torch.randn(3), "c": [torch.zeros(3, 2), (torch.ones(3, 10),)]}})
>>> assert len(rb_pytree) == 3  # the replay buffer has 3 elements!

注意

在處理數值列表時,extend() 可能具有模稜兩可的簽章,應該將其解釋為 PyTree(在這種情況下,列表中的所有元素都會放入儲存空間中儲存的 PyTree 中的一個切片中)或要一次新增一個的數值列表。為了解決這個問題,TorchRL 在列表和元組之間做出了明確的區分:元組將被視為 PyTree,列表(在根層級)將被解釋為要一次新增到緩衝區的一個堆疊數值。

取樣和索引

可以對回放緩衝區進行索引和取樣。 索引和取樣收集儲存空間中給定索引的資料,然後通過一系列轉換和 collate_fn 來處理它們,這些轉換和 collate_fn 可以傳遞給回放緩衝區的 __init__ 函數。 collate_fn 帶有預設值,預設值應在大多數情況下符合使用者期望,因此你大多數時候都不必擔心它。 轉換通常是 Transform 的實例,即使常規函數也可以使用(在後一種情況下,inv() 方法顯然會被忽略,而在第一種情況下,它可以被用於預處理資料,然後再將其傳遞到緩衝區)。最後,可以通過將執行緒數量通過 prefetch 關鍵字引數傳遞到建構函數來使用多執行緒來實現取樣。我們建議使用者在採用此技術之前,在真實環境中對其進行基準測試,因為不能保證它會在實踐中帶來更快的吞吐量,具體取決於機器和使用環境。

取樣時,batch_size 可以在建構期間傳遞(例如,如果在整個訓練過程中是恆定的)或傳遞給 sample() 方法。

為了進一步改進取樣策略,我們建議你研究我們的取樣器!

以下是一些關於如何從回放緩衝區中取出資料的範例

>>> first_elt = rb_td[0]
>>> storage = rb_td[:] # returns all valid elements from the buffer
>>> sample = rb_td.sample(128)
>>> for data in rb_td:  # iterate over the buffer using the sampler -- batch-size was set in the constructor to 1
...     print(data)

使用以下元件

FlatStorageCheckpointer([done_keys, reward_keys])

以緊湊的形式儲存儲存空間,節省 TED 格式的空間。

H5StorageCheckpointer(*[, checkpoint_file, ...])

以緊湊的形式儲存儲存空間,節省 TED 格式的空間,並使用 H5 格式來儲存資料。

ImmutableDatasetWriter()

不可變資料集的阻塞式寫入器。

LazyMemmapStorage(max_size, *[, ...])

用於 tensors 和 tensordicts 的記憶體映射儲存空間。

LazyTensorStorage(max_size, *[, device, ndim])

用於 tensors 和 tensordicts 的預先分配的 tensor 儲存空間。

ListStorage([max_size])

儲存在列表中的儲存空間。

ListStorageCheckpointer()

ListStoage 的儲存檢查點。

NestedStorageCheckpointer([done_keys, ...])

以緊湊的形式儲存儲存空間,節省 TED 格式的空間,並使用記憶體映射的巢狀 tensors。

PrioritizedSampler(max_capacity, alpha, beta)

重播緩衝區的優先採樣器。

PrioritizedSliceSampler(max_capacity, alpha, ...)

使用優先採樣,沿著第一個維度採樣資料切片,給定開始和停止信號。

RandomSampler()

用於可組合重播緩衝區的均勻隨機採樣器。

RoundRobinWriter(**kw)

用於可組合重播緩衝區的 RoundRobin Writer 類別。

Sampler()

用於可組合重播緩衝區的通用採樣器基底類別。

SamplerWithoutReplacement([drop_last, shuffle])

確保同一個樣本不會出現在連續批次中的資料消耗型採樣器。

SliceSampler(*[, num_slices, slice_len, ...])

沿著第一個維度採樣資料切片,給定開始和停止信號。

SliceSamplerWithoutReplacement(*[, ...])

沿著第一個維度採樣資料切片,給定開始和停止信號,不重複取樣。

Storage(max_size[, checkpointer])

Storage 是重播緩衝區的容器。

StorageCheckpointerBase()

儲存檢查點的公共基底類別。

StorageEnsembleCheckpointer()

用於集成儲存的檢查點。

TensorDictMaxValueWriter([rank_key, reduction])

用於可組合重播緩衝區的 Writer 類別,基於某些排名鍵保留頂部元素。

TensorDictRoundRobinWriter(**kw)

用於可組合、基於 tensordict 的重播緩衝區的 RoundRobin Writer 類別。

TensorStorage(storage[, max_size, device, ndim])

用於 tensors 和 tensordicts 的儲存空間。

TensorStorageCheckpointer()

TensorStorages 的儲存檢查點。

Writer()

ReplayBuffer 基底 Writer 類別。

儲存空間的選擇對重播緩衝區採樣延遲影響很大,尤其是在具有較大資料量的分散式強化學習設定中。 強烈建議在具有共享儲存的分散式設定中使用 LazyMemmapStorage,因為 MemoryMappedTensors 的序列化成本較低,並且能夠指定檔案儲存位置以改進節點故障恢復。 從 https://github.com/pytorch/rl/tree/main/benchmarks/storage 中的粗略基準測試中發現,與使用 ListStorage 相比,以下是平均採樣延遲的改進。

儲存類型

加速

ListStorage

1 倍

LazyTensorStorage

1.83 倍

LazyMemmapStorage

3.44 倍

跨進程共享重播緩衝區

只要重播緩衝區的元件是可共享的,就可以在進程之間共享。 此功能允許多個進程收集資料並協作填充共享重播緩衝區,而不是將資料集中在主進程上,這可能會產生一些資料傳輸開銷。

可共享的儲存空間包括 LazyMemmapStorage 或任何 TensorStorage 的子類,只要它們被實例化並且其內容儲存為記憶體映射的 tensors。 有狀態的 writer(例如 TensorDictRoundRobinWriter)目前不可共享,對於有狀態的採樣器(例如 PrioritizedSampler)也是如此。

共享的重播緩衝區可以在任何有權訪問它的進程上讀取和擴展,如下面的例子所示

>>> from torchrl.data import TensorDictReplayBuffer, LazyMemmapStorage
>>> import torch
>>> from torch import multiprocessing as mp
>>> from tensordict import TensorDict
>>>
>>> def worker(rb):
...     # Updates the replay buffer with new data
...     td = TensorDict({"a": torch.ones(10)}, [10])
...     rb.extend(td)
...
>>> if __name__ == "__main__":
...     rb = TensorDictReplayBuffer(storage=LazyMemmapStorage(21))
...     td = TensorDict({"a": torch.zeros(10)}, [10])
...     rb.extend(td)
...
...     proc = mp.Process(target=worker, args=(rb,))
...     proc.start()
...     proc.join()
...     # the replay buffer now has a length of 20, since the worker updated it
...     assert len(rb) == 20
...     assert (rb["_data", "a"][:10] == 0).all()  # data from main process
...     assert (rb["_data", "a"][10:20] == 1).all()  # data from remote process

儲存軌跡

將軌跡儲存在回放緩衝區中並不太困難。需要注意的一點是,回放緩衝區的大小預設為儲存空間的主維度大小:換句話說,當儲存多維資料時,建立大小為 1M 的回放緩衝區並不代表儲存 1M 個影格,而是儲存 1M 個軌跡。但是,如果在儲存之前將軌跡(或 episode/rollout)展平,容量仍然會是 1M 個步數。

有一種方法可以規避這個問題,即告訴儲存空間在儲存資料時應考慮多少個維度。這可以透過 ndim 關鍵字引數來完成,所有連續儲存空間(如 TensorStorage 等)都接受這個引數。當多維儲存空間傳遞給緩衝區時,緩衝區會自動將最後一個維度視為「時間」維度,這在 TorchRL 中是慣例。可以透過 ReplayBuffer 中的 dim_extend 關鍵字引數來覆寫此設定。建議使用這種方式來儲存透過 ParallelEnv 或其序列對應物獲得的軌跡,我們將在下面看到。

在對軌跡進行採樣時,可能需要對子軌跡進行採樣,以實現學習的多樣化或提高採樣效率。TorchRL 提供了兩種不同的方法來實現這一點。

  • SliceSampler 允許對儲存在 TensorStorage 的主維度上一個接一個的軌跡的給定數量的切片進行採樣。當使用離線資料集(使用該約定儲存)時,這是 TorchRL 中__尤其__推薦的子軌跡採樣方法。這種策略需要在擴展回放緩衝區之前將軌跡展平,並在採樣後重塑它們。SliceSampler 類別的 docstring 提供了關於這種儲存和採樣策略的詳細資訊。請注意,SliceSampler 與多維儲存空間相容。以下範例展示了如何在展平或不展平 tensordict 的情況下使用此功能。在第一種情況下,我們正在從單個環境收集資料。在這種情況下,我們很樂意使用一個沿第一個維度連接傳入資料的儲存空間,因為集合排程不會引入任何中斷。

    >>> from torchrl.envs import TransformedEnv, StepCounter, GymEnv
    >>> from torchrl.collectors import SyncDataCollector, RandomPolicy
    >>> from torchrl.data import ReplayBuffer, LazyTensorStorage, SliceSampler
    >>> env = TransformedEnv(GymEnv("CartPole-v1"), StepCounter())
    >>> collector = SyncDataCollector(env,
    ...     RandomPolicy(env.action_spec),
    ...     frames_per_batch=10, total_frames=-1)
    >>> rb = ReplayBuffer(
    ...     storage=LazyTensorStorage(100),
    ...     sampler=SliceSampler(num_slices=8, traj_key=("collector", "traj_ids"),
    ...         truncated_key=None, strict_length=False),
    ...     batch_size=64)
    >>> for i, data in enumerate(collector):
    ...     rb.extend(data)
    ...     if i == 10:
    ...         break
    >>> assert len(rb) == 100, len(rb)
    >>> print(rb[:]["next", "step_count"])
    tensor([[32],
            [33],
            [34],
            [35],
            [36],
            [37],
            [38],
            [39],
            [40],
            [41],
            [11],
            [12],
            [13],
            [14],
            [15],
            [16],
            [17],
            [...
    

    如果以批次方式運行多個環境,我們仍然可以透過呼叫 data.reshape(-1) 將資料儲存在與之前相同的緩衝區中,這會將 [B, T] 大小展平為 [B * T],但這意味著批次中第一個環境的軌跡會與其他環境的軌跡交錯,這是 SliceSampler 無法處理的情況。為了解决這個問題,我們建議在儲存建構函式中使用 ndim 引數。

    >>> env = TransformedEnv(SerialEnv(2,
    ...     lambda: GymEnv("CartPole-v1")), StepCounter())
    >>> collector = SyncDataCollector(env,
    ...     RandomPolicy(env.action_spec),
    ...     frames_per_batch=1, total_frames=-1)
    >>> rb = ReplayBuffer(
    ...     storage=LazyTensorStorage(100, ndim=2),
    ...     sampler=SliceSampler(num_slices=8, traj_key=("collector", "traj_ids"),
    ...         truncated_key=None, strict_length=False),
    ...     batch_size=64)
    >>> for i, data in enumerate(collector):
    ...     rb.extend(data)
    ...     if i == 100:
    ...         break
    >>> assert len(rb) == 100, len(rb)
    >>> print(rb[:]["next", "step_count"].squeeze())
    tensor([[ 6,  5],
            [ 2,  2],
            [ 3,  3],
            [ 4,  4],
            [ 5,  5],
            [ 6,  6],
            [ 7,  7],
            [ 8,  8],
            [ 9,  9],
            [10, 10],
            [11, 11],
            [12, 12],
            [13, 13],
            [14, 14],
            [15, 15],
            [16, 16],
            [17, 17],
            [18,  1],
            [19,  2],
            [...
    
  • 軌跡也可以獨立儲存,主維度的每個元素指向不同的軌跡。這要求軌跡具有一致的形狀(或被填充)。我們提供了一個自訂的 Transform 類別,名為 RandomCropTensorDict,它允許在緩衝區中對子軌跡進行採樣。請注意,與基於 SliceSampler 的策略不同,這裡不需要具有指向開始和停止訊號的 "episode""done" 鍵。以下是如何使用這個類別的範例。

檢查點回放緩衝區

回放緩衝區的每個元件都可能是狀態性的,因此,需要一種專用的序列化方法。我們的回放緩衝區有兩個獨立的 API 用於將其狀態儲存到磁碟上:dumps()loads() 將使用記憶體對應張量和 json 檔案儲存除了 transform 之外的每個元件(儲存空間、寫入器、採樣器)的資料,以儲存元資料。

除了 ListStorage 之外,這適用於所有類別,因為無法預測其內容(因此不符合記憶體對應資料結構,例如可以在 tensordict 庫中找到的那些)。

這個 API 保證了儲存然後載入回來的緩衝區將處於完全相同的狀態,無論我們查看其採樣器的狀態(例如,優先順序樹)、其寫入器的狀態(例如,最大寫入器堆)還是其儲存空間的狀態。

在底層,簡單地呼叫 dumps() 將會為每個元件(除了我們通常不假設可以使用記憶體對應張量進行序列化的 transform 之外)在特定資料夾中呼叫公共的 dumps 方法。

然而,以 TED 格式儲存資料可能會消耗比所需更多的記憶體。如果將連續軌跡儲存在緩衝區中,我們可以透過在根目錄儲存所有觀察結果,並且僅儲存 “next” 子 TensorDict 的觀察結果的最後一個元素,來避免儲存重複的觀察結果,這可以減少高達兩倍的儲存消耗。為了啟用此功能,我們提供了三個檢查點類別:FlatStorageCheckpointer 將丟棄重複的觀察結果以壓縮 TED 格式。在載入時,這個類別會以正確的格式重新寫入觀察結果。如果緩衝區儲存在磁碟上,則此檢查點執行的操作將不需要任何額外的 RAM。NestedStorageCheckpointer 將使用巢狀張量儲存軌跡,使資料表示更加清晰(沿著第一個維度的每個項目代表一個不同的軌跡)。最後,H5StorageCheckpointer 將以 H5DB 格式儲存緩衝區,使使用者能夠壓縮資料並節省更多空間。

警告

檢查點對重播緩衝區做了一些限制性假設。首先,假設 done 狀態準確地表示軌跡的結束(除了為其寫入的最後一個軌跡,寫入游標指示放置截斷訊號的位置)。對於 MARL 的使用,應該注意的是,只允許具有與根 TensorDict 相同元素數量的 done 狀態:如果 done 狀態具有儲存體批次大小中未表示的額外元素,則這些檢查點將會失敗。例如,形狀為 torch.Size([3, 4, 5]) 的 done 狀態在形狀為 torch.Size([3, 4]) 的儲存體中是不允許的。

這是一個關於如何在實務中使用 H5DB 檢查點的具體範例

>>> from torchrl.data import ReplayBuffer, H5StorageCheckpointer, LazyMemmapStorage
>>> from torchrl.collectors import SyncDataCollector
>>> from torchrl.envs import GymEnv, SerialEnv
>>> import torch
>>> env = SerialEnv(3, lambda: GymEnv("CartPole-v1", device=None))
>>> env.set_seed(0)
>>> torch.manual_seed(0)
>>> collector = SyncDataCollector(
>>>     env, policy=env.rand_step, total_frames=200, frames_per_batch=22
>>> )
>>> rb = ReplayBuffer(storage=LazyMemmapStorage(100, ndim=2))
>>> rb_test = ReplayBuffer(storage=LazyMemmapStorage(100, ndim=2))
>>> rb.storage.checkpointer = H5StorageCheckpointer()
>>> rb_test.storage.checkpointer = H5StorageCheckpointer()
>>> for i, data in enumerate(collector):
...     rb.extend(data)
...     assert rb._storage.max_size == 102
...     rb.dumps(path_to_save_dir)
...     rb_test.loads(path_to_save_dir)
...     assert_allclose_td(rb_test[:], rb[:])

每當無法使用 dumps() 儲存資料時,另一種方法是使用 state_dict(),它會傳回一個可以使用 torch.save() 儲存並使用 torch.load() 載入的資料結構,然後再呼叫 load_state_dict()。這種方法的缺點是它難以儲存大型資料結構,這是在使用重播緩衝區時常見的情況。

TorchRL Episode Data Format (TED)

在 TorchRL 中,序列資料始終以一種稱為 TorchRL Episode Data Format (TED) 的特定格式呈現。此格式對於 TorchRL 中各種元件的無縫整合和運作至關重要。

某些元件(例如重播緩衝區)對資料格式有些無感。但是,其他元件(特別是環境)很大程度上依賴於它來實現平穩運作。

因此,理解 TED、其目的以及如何與之互動至關重要。本指南將清楚地解釋 TED、其使用原因以及如何有效地使用它。

TED 背後的理由

格式化序列資料可能是一項複雜的任務,尤其是在強化學習 (RL) 領域。作為從業者,我們經常遇到在重置時間傳遞資料的情況(儘管並不總是如此),有時會在軌跡的最後一步提供或丟棄資料。

這種可變性意味著我們可以在資料集中觀察到不同長度的資料,並且並不總是能立即清楚地知道如何跨此資料集的各種元素匹配每個時間步長。考慮以下含糊不清的資料集結構

>>> observation.shape
[200, 3]
>>> action.shape
[199, 4]
>>> info.shape
[200, 3]

乍看之下,似乎 info 和 observation 是同時傳遞的(每次重置時各一個 + 每次 step 呼叫時各一個),正如 action 比它們少一個元素所暗示的那樣。但是,如果 info 少一個元素,我們必須假設它要么在重置時被省略,要么未在軌跡的最後一步中傳遞或記錄。如果沒有適當的資料結構文件,就不可能確定哪個 info 對應於哪個時間步長。

更複雜的是,某些資料集提供不一致的資料格式,其中 observationsinfos 在 rollout 的開始或結束時遺失,並且此行為通常沒有記錄。TED 的主要目標是透過提供清晰且一致的資料表示來消除這些歧義。

TED 的結構

TED 建立在 RL 環境中 Markov Decision Process (MDP) 的規範定義之上。在每個步驟中,一個觀察結果會條件化一個 action,從而導致 (1) 一個新的觀察結果,(2) 一個任務完成的指標(terminated、truncated、done),以及 (3) 一個獎勵訊號。

某些元素可能遺失(例如,在模仿學習環境中,獎勵是可選的),或者可以透過 state 或 info 容器傳遞額外的資訊。在某些情況下,在呼叫 step 期間需要額外的資訊才能獲得觀察結果(例如,在無狀態環境模擬器中)。此外,在某些情況下,「action」(或任何其他資料)不能表示為單個張量,需要以不同的方式組織。例如,在多代理 RL 設定中,動作、觀察、獎勵和完成訊號可能是複合的。

TED 以單一、一致且明確的格式來適應所有這些情境。我們透過在執行動作時設定一個限制,來區分時間步長 tt+1 之間發生的事情。換句話說,在呼叫 env.step 之前存在的所有內容都屬於 t,之後發生的所有內容都屬於 t+1

一般規則是,屬於時間步長 t 的所有內容都儲存在 tensordict 的根目錄,而屬於 t+1 的所有內容都儲存在 tensordict 的 "next" 條目中。這是一個範例

>>> data = env.reset()
>>> data = policy(data)
>>> print(env.step(data))
TensorDict(
    fields={
        action: Tensor(...),  # The action taken at time t
        done: Tensor(...),  # The done state when the action was taken (at reset)
        next: TensorDict(  # all of this content comes from the call to `step`
            fields={
                done: Tensor(...),  # The done state after the action has been taken
                observation: Tensor(...),  # The observation resulting from the action
                reward: Tensor(...),  # The reward resulting from the action
                terminated: Tensor(...),  # The terminated state after the action has been taken
                truncated: Tensor(...),  # The truncated state after the action has been taken
            batch_size=torch.Size([]),
            device=cpu,
            is_shared=False),
        observation: Tensor(...),  # the observation at reset
        terminated: Tensor(...),  # the terminated at reset
        truncated: Tensor(...),  # the truncated at reset
    batch_size=torch.Size([]),
    device=cpu,
    is_shared=False)

在 rollout 期間(無論是使用 EnvBase 還是 SyncDataCollector),當 agent 重置其步數計數時,"next" tensordict 的內容會透過 step_mdp() 函式帶到根目錄:t <- t+1。您可以在 此處閱讀有關環境 API 的更多資訊。

在大多數情況下,根目錄中沒有 True 值的 "done" 狀態,因為任何 done 狀態都會觸發(部分)重置,這會將 "done" 變成 False。但是,只有在自動執行重置時,情況才是如此。在某些情況下,部分重置不會觸發重置,因此我們保留這些資料,這些資料的記憶體佔用量應該遠低於觀察資料(例如)。

此格式消除了觀察結果與其動作、資訊或 done 狀態之間匹配的任何模糊之處。

展平 TED 以減少記憶體消耗

TED 在記憶體中複製觀察資料兩次,這可能會影響在實踐中使用此格式的可行性。由於它主要用於簡化表示,因此可以以扁平方式儲存資料,但在訓練期間將其表示為 TED。

這在序列化重播緩衝區時特別有用:例如,TED2Flat 類別確保在將 TED 格式的資料結構寫入磁碟之前先將其展平,而 Flat2TED 載入掛鉤將在反序列化期間取消展平此結構。

Tensordict 的維度

在 rollout 期間,所有收集到的 tensordict 將沿著位於末尾的新維度堆疊。收集器和環境都將使用 "time" 名稱標記此維度。這是一個範例

>>> rollout = env.rollout(10, policy)
>>> assert rollout.shape[-1] == 10
>>> assert rollout.names[-1] == "time"

這確保了時間維度在資料結構中被清楚地標記並且易於識別。

特殊情況和附註

多智能體資料呈現

可以在 MARL 環境 API 部分中存取多智能體資料格式化文件。

基於記憶體的策略(RNN 和 Transformer)

在上面提供的範例中,只有 env.step(data) 會產生需要在下一步中讀取的資料。但是,在某些情況下,策略也會輸出下一步中需要的資訊。對於基於 RNN 的策略來說,通常就是這種情況,它會輸出一個動作以及需要在下一步中使用的循環狀態。為了適應這一點,我們建議使用者調整他們的 RNN 策略,將此資料寫入 tensordict 的 "next" 條目下。這確保了該內容將在下一步中被帶到根目錄。可以在 GRUModuleLSTMModule 中找到更多資訊。

多步

收集器允許使用者在讀取資料時跳過步驟,從而累積即將到來的 n 個步驟的獎勵。這種技術在像 Rainbow 這樣的類 DQN 演算法中很流行。MultiStep 類別對來自收集器的批次執行此資料轉換。在這些情況下,如下檢查將失敗,因為下一個觀察結果偏移了 n 個步驟

>>> assert (data[..., 1:]["observation"] == data[..., :-1]["next", "observation"]).all()

記憶體需求呢?

如果以天真的方式實施,這種資料格式將消耗大約兩倍於平面表示的記憶體。在某些記憶體密集型設定中(例如,在 AtariDQNExperienceReplay 資料集中),我們僅在磁碟上儲存 T+1 觀察資料,並在 get 時間線上執行格式化。在其他情況下,我們假設 2 倍的記憶體成本是為了更清晰的表示而付出的少量代價。但是,推廣離線資料集的延遲表示肯定是一個有益的功能,我們歡迎朝這個方向做出貢獻!

資料集

TorchRL 提供了離線 RL 資料集的包裝器。這些資料以 ReplayBuffer 實例的形式呈現,這意味著可以使用轉換、採樣器和儲存器隨意自訂它們。例如,可以使用 SelectTransformExcludeTransform 將條目篩選出或篩選出資料集。

預設情況下,資料集儲存為記憶體映射張量,允許它們以幾乎不佔用記憶體的方式快速採樣。

這是一個範例

注意

安裝依賴項是使用者的責任。對於 D4RL,需要一個 儲存庫的克隆,因為最新的 wheels 沒有在 PyPI 上發布。對於 OpenML,需要 scikit-learnpandas

轉換資料集

在許多情況下,原始資料不會直接被使用。一個很自然的想法是將一個 Transform 實例傳遞給資料集建構子,並即時修改樣本。這方法是可行的,但會因為轉換而產生額外的執行時間。如果轉換可以(至少部分)預先應用到資料集上,則可以節省相當可觀的磁碟空間,並減少取樣時產生的額外負擔。為此,可以使用 preprocess()。此方法將對資料集的每個元素運行一個逐樣本的預處理流程,並將現有的資料集替換為轉換後的版本。

一旦轉換完成,重新建立相同的資料集將會產生另一個具有相同轉換後儲存空間的物件(除非使用了 download="force")。

>>> dataset = RobosetExperienceReplay(
...     "FK1-v4(expert)/FK1_MicroOpenRandom_v2d-v4", batch_size=32, download="force"
... )
>>>
>>> def func(data):
...     return data.set("obs_norm", data.get("observation").norm(dim=-1))
...
>>> dataset.preprocess(
...     func,
...     num_workers=max(1, os.cpu_count() - 2),
...     num_chunks=1000,
...     mp_start_method="fork",
... )
>>> sample = dataset.sample()
>>> assert "obs_norm" in sample.keys()
>>> # re-recreating the dataset gives us the transformed version back.
>>> dataset = RobosetExperienceReplay(
...     "FK1-v4(expert)/FK1_MicroOpenRandom_v2d-v4", batch_size=32
... )
>>> sample = dataset.sample()
>>> assert "obs_norm" in sample.keys()

BaseDatasetExperienceReplay(*[, priority_key])

離線資料集的父類別。

AtariDQNExperienceReplay(dataset_id[, ...])

Atari DQN 經驗回放類別。

D4RLExperienceReplay(dataset_id, batch_size)

D4RL 的經驗回放類別。

GenDGRLExperienceReplay(dataset_id[, ...])

Gen-DGRL 經驗回放資料集。

MinariExperienceReplay(dataset_id, batch_size, *)

Minari 經驗回放資料集。

OpenMLExperienceReplay(name, batch_size[, ...])

OpenML 資料的經驗回放。

OpenXExperienceReplay(dataset_id[, ...])

Open X-Embodiment 資料集的經驗回放。

RobosetExperienceReplay(dataset_id, ...[, ...])

Roboset 經驗回放資料集。

VD4RLExperienceReplay(dataset_id, batch_size, *)

V-D4RL 經驗回放資料集。

組合資料集

在離線 RL 中,通常會同時使用多個資料集。此外,TorchRL 通常具有細粒度的資料集命名法,其中每個任務都單獨表示,而其他函式庫會以更緊湊的方式表示這些資料集。為了讓使用者能夠將多個資料集合併在一起,我們提出了一個 ReplayBufferEnsemble 基本元件,允許使用者一次從多個資料集中取樣。

如果個別資料集的格式不同,則可以使用 Transform 實例。在以下範例中,我們建立兩個具有語義上相同條目的虛擬資料集,這些條目在名稱上有所不同(("some", "key")"another_key"),並展示如何重新命名它們以使其具有匹配的名稱。我們還會調整圖片大小,以便它們可以在取樣期間堆疊在一起。

>>> from torchrl.envs import Comopse, ToTensorImage, Resize, RenameTransform
>>> from torchrl.data import TensorDictReplayBuffer, ReplayBufferEnsemble, LazyMemmapStorage
>>> from tensordict import TensorDict
>>> import torch
>>> rb0 = TensorDictReplayBuffer(
...     storage=LazyMemmapStorage(10),
...     transform=Compose(
...         ToTensorImage(in_keys=["pixels", ("next", "pixels")]),
...         Resize(32, in_keys=["pixels", ("next", "pixels")]),
...         RenameTransform([("some", "key")], ["renamed"]),
...     ),
... )
>>> rb1 = TensorDictReplayBuffer(
...     storage=LazyMemmapStorage(10),
...     transform=Compose(
...         ToTensorImage(in_keys=["pixels", ("next", "pixels")]),
...         Resize(32, in_keys=["pixels", ("next", "pixels")]),
...         RenameTransform(["another_key"], ["renamed"]),
...     ),
... )
>>> rb = ReplayBufferEnsemble(
...     rb0,
...     rb1,
...     p=[0.5, 0.5],
...     transform=Resize(33, in_keys=["pixels"], out_keys=["pixels33"]),
... )
>>> data0 = TensorDict(
...     {
...         "pixels": torch.randint(255, (10, 244, 244, 3)),
...         ("next", "pixels"): torch.randint(255, (10, 244, 244, 3)),
...         ("some", "key"): torch.randn(10),
...     },
...     batch_size=[10],
... )
>>> data1 = TensorDict(
...     {
...         "pixels": torch.randint(255, (10, 64, 64, 3)),
...         ("next", "pixels"): torch.randint(255, (10, 64, 64, 3)),
...         "another_key": torch.randn(10),
...     },
...     batch_size=[10],
... )
>>> rb[0].extend(data0)
>>> rb[1].extend(data1)
>>> for _ in range(2):
...     sample = rb.sample(10)
...     assert sample["next", "pixels"].shape == torch.Size([2, 5, 3, 32, 32])
...     assert sample["pixels"].shape == torch.Size([2, 5, 3, 32, 32])
...     assert sample["pixels33"].shape == torch.Size([2, 5, 3, 33, 33])
...     assert sample["renamed"].shape == torch.Size([2, 5])

ReplayBufferEnsemble(*rbs[, storages, ...])

重播緩衝區的集合。

SamplerEnsemble(*samplers[, p, ...])

取樣器的集合。

StorageEnsemble(*storages[, transforms])

儲存體的集合。

WriterEnsemble(*writers)

寫入器的集合。

TensorSpec

TensorSpec 父類別及其子類別定義了 TorchRL 中狀態、觀察、動作、獎勵和完成狀態的基本屬性,例如它們的形狀、裝置、dtype 和域。

重要的是,您的環境規格必須與其傳送和接收的輸入和輸出相符,因為 ParallelEnv 將會根據這些規格建立緩衝區,以便與產生的程序進行通訊。請查看 torchrl.envs.utils.check_env_specs() 方法以進行健全性檢查。

如果需要,可以使用 make_composite_from_td() 函式從資料自動產生規格。

規格分為兩個主要類別:數值型和類別型。

數值型 TensorSpec 子類別。

數值型

有界

無界

有界離散

有界連續

無界離散

無界連續

每當建立 Bounded 實例時,它的域(由其 dtype 隱式定義,或由 “domain” 關鍵字引數顯式定義)將決定實例化的類別是 BoundedContinuous 類型還是 BoundedDiscrete 類型。同樣適用於 Unbounded 類別。有關更多資訊,請參閱這些類別。

類別型 TensorSpec 子類別。

類別型

OneHot

MultiOneHot

類別型

MultiCategorical

Binary

gymnasium 不同,TorchRL 沒有任意規格列表的概念。如果需要將多個規格組合在一起,TorchRL 假設資料將以字典形式呈現(更具體地說,以 TensorDict 或相關格式呈現)。在這些情況下,對應的 TensorSpec 類別是 Composite 規格。

儘管如此,規格可以使用 stack() 堆疊在一起:如果它們是相同的,它們的形狀將相應地擴展。 否則,將通過 Stacked 類別創建一個延遲堆疊。

類似地,TensorSpecsTensorTensorDict 具有一些共同的行為:它們可以像常規的 Tensor 實例一樣,被重塑(reshaped)、索引(indexed)、壓縮(squeezed)、擴展(unsqueezed)、移動到另一個裝置(to)或解綁(unbind)。

如果某些維度是 -1,則稱規格是「動態的」,並且負維度表示相應的資料具有不一致的形狀。 當被優化器或環境(例如,批次化環境,如 ParallelEnv)看到時,這些負形狀會告訴 TorchRL 避免使用緩衝區,因為張量形狀是不可預測的。

TensorSpec(shape, space, ...)

張量元資料容器的父類別。

Binary([n, shape, device, dtype])

二元離散張量規格。

Bounded(*args, **kwargs)

有界張量規格。

Categorical(n[, shape, device, dtype, mask])

離散張量規格。

Composite(*args, **kwargs)

TensorSpecs 的組合。

MultiCategorical(nvec[, shape, device, ...])

離散張量規格的串聯。

MultiOneHot(nvec[, shape, device, dtype, ...])

one-hot 離散張量規格的串聯。

NonTensor([shape, device, dtype])

非張量資料的規格。

Stacked(*specs, dim)

張量規格堆疊的延遲表示。

StackedComposite(*args, **kwargs)

複合規格堆疊的延遲表示。

Unbounded(*args, **kwargs)

無界張量規格。

UnboundedContinuous(*args, **kwargs)

torchrl.data.Unbounded 的專門版本,具有連續空間。

UnboundedDiscrete(*args, **kwargs)

torchrl.data.Unbounded 的專門版本,具有離散空間。

以下類別已棄用,僅指向上面的類別

BinaryDiscreteTensorSpec(*args, **kwargs)

torchrl.data.Binary 的已棄用版本。

BoundedTensorSpec(*args, **kwargs)

torchrl.data.Bounded 的已棄用版本。

CompositeSpec(*args, **kwargs)

torchrl.data.Composite 的已棄用版本。

DiscreteTensorSpec(*args, **kwargs)

torchrl.data.Categorical 的已棄用版本。

LazyStackedCompositeSpec(*args, **kwargs)

torchrl.data.StackedComposite 的已棄用版本。

LazyStackedTensorSpec(*args, **kwargs)

torchrl.data.Stacked 的已棄用版本。

MultiDiscreteTensorSpec(*args, **kwargs)

已棄用的 torchrl.data.MultiCategorical 版本。

MultiOneHotDiscreteTensorSpec(*args, **kwargs)

已棄用的 torchrl.data.MultiOneHot 版本。

NonTensorSpec(*args, **kwargs)

已棄用的 torchrl.data.NonTensor 版本。

OneHotDiscreteTensorSpec(*args, **kwargs)

已棄用的 torchrl.data.OneHot 版本。

UnboundedContinuousTensorSpec(*args, **kwargs)

具有連續空間,已棄用的 torchrl.data.Unbounded 版本。

UnboundedDiscreteTensorSpec(*args, **kwargs)

具有離散空間,已棄用的 torchrl.data.Unbounded 版本。

樹和森林

TorchRL 提供了一組類別和函式,可用於有效率地表示樹和森林。

BinaryToDecimal(num_bits, device, dtype[, ...])

一個將二進位編碼張量轉換為十進位的模組。

HashToInt()

將雜湊值轉換為可用於索引連續儲存體的整數。

QueryModule(*args, **kwargs)

一個產生儲存體相容索引的模組。

RandomProjectionHash(*[, n_components, ...])

一個將隨機投影與 SipHash 結合以獲得低維張量的模組,更易於透過 SipHash 嵌入。

SipHash([as_tensor])

一個用於計算給定張量的 SipHash 值的模組。

TensorDictMap(*args, **kwargs)

TensorDict 的 Map-Storage。

TensorMap()

一個用於實作不同儲存體的抽象。

從人類回饋中進行強化學習 (RLHF)

在從人類回饋中進行強化學習 (RLHF) 時,資料至關重要。鑑於這些技術通常用於語言領域,而語言在函式庫中 RL 的其他子網域中很少涉及,因此我們提供特定的實用工具來促進與 datasets 等外部函式庫的互動。這些實用工具包含用於標記資料、以適用於 TorchRL 模組的方式格式化資料以及優化儲存以實現高效採樣的工具。

PairwiseDataset(chosen_data, rejected_data, ...)

PromptData(input_ids, attention_mask, ...[, ...])

PromptTensorDictTokenizer(tokenizer, max_length)

prompt 資料集的 Tokenization 配方。

RewardData(input_ids, attention_mask[, ...])

RolloutFromModel(model, ref_model, reward_model)

一個用於使用因果語言模型執行 rollout 的類別。

TensorDictTokenizer(tokenizer, max_length[, ...])

一個處理函式的工廠,該函式將 tokenizer 應用於文字範例。

TokenizedDatasetLoader(split, max_length, ...)

載入 tokenized 資料集,並快取其記憶體對應副本。

create_infinite_iterator(iterator)

無限期地迭代一個迭代器。

get_dataloader(batch_size, block_size, ...)

建立一個資料集並從中傳回一個 dataloader。

ConstantKLController(*[, kl_coef, model])

常數 KL 控制器。

AdaptiveKLController(*, init_kl_coef, ...[, ...])

Adaptive KL 控制器,如 Ziegler 等人在 "Fine-Tuning Language Models from Human Preferences" 中所述。

實用工具

MultiStep(gamma, n_steps)

多步獎勵轉換。

consolidate_spec(spec[, ...])

給定一個 TensorSpec,透過新增 0 形狀的 spec 來移除獨佔鍵。

check_no_exclusive_keys(spec[, recurse])

給定一個 TensorSpec,如果沒有獨佔鍵,則傳回 true。

contains_lazy_spec(spec)

如果 spec 包含惰性堆疊的 spec,則傳回 true。

Nested2TED([done_key, shift_key, ...])

將巢狀的 tensordict(其中每一列都是一個軌跡)轉換為 TED 格式。

Flat2TED([done_key, shift_key, is_full_key, ...])

一個儲存加載 hook,用於將扁平化的 TED 數據反序列化為 TED 格式。

H5Combine()

將持久性 tensordict 中的軌跡組合到儲存在檔案系統中的單個獨立 tensordict 中。

H5Split([done_key, shift_key, is_full_key, ...])

將使用 TED2Nested 準備的資料集分割成一個 TensorDict,其中每個軌跡都作為其父巢狀張量的視圖儲存。

TED2Flat([done_key, shift_key, is_full_key, ...])

一個儲存儲存 hook,用於以緊湊的格式序列化 TED 數據。

TED2Nested(*args, **kwargs)

將 TED 格式的資料集轉換為一個 tensordict,其中填充了巢狀張量,並且每一列都是一個軌跡。

MultiStepTransform(n_steps, gamma, *[, ...])

用於 ReplayBuffers 的多步轉換。

文件

訪問 PyTorch 的完整開發者文檔

查看文檔

教程

獲取針對初學者和高級開發者的深入教程

查看教程

資源

查找開發資源並獲得您問題的解答

查看資源