捷徑

torchrl.collectors 套件

資料收集器在某種程度上等同於 pytorch 資料載入器,但 (1) 它們收集來自非靜態資料來源的資料,並且 (2) 資料是使用模型收集的(可能是正在訓練的模型的版本)。

TorchRL 的資料收集器接受兩個主要引數:一個環境(或一個環境建構子的列表)和一個策略。它們將在傳送堆疊收集的資料給使用者之前,迭代地執行一個環境步驟和一個策略查詢,經過定義的步驟數。每當環境達到完成狀態,和/或在預定義的步驟數之後,環境將會被重置。

由於資料收集可能是一個計算量大的過程,因此適當地配置執行超參數至關重要。首先要考慮的參數是資料收集應該與最佳化步驟串行還是並行發生。SyncDataCollector 類別將在訓練工作者上執行資料收集。MultiSyncDataCollector 將把工作負載分配到多個工作者上,並聚合將傳送給訓練工作者的結果。最後,MultiaSyncDataCollector 將在多個工作者上執行資料收集,並傳送它可以收集的第一批結果。這種執行將會連續且伴隨網路的訓練發生:這意味著用於資料收集的策略權重可能會稍微落後於訓練工作者上的策略配置。因此,儘管此類別可能是收集資料最快的方式,但它只能在可以接受異步收集資料的設定中使用(例如,離策略 RL 或課程 RL)。對於遠端執行的 rollouts (MultiSyncDataCollectorMultiaSyncDataCollector) 必須使用 collector.update_policy_weights_() 或透過在建構子中設定 update_at_each_batch=True 來將遠端策略的權重與訓練工作者的權重同步。

第二個需要考慮的參數(在遠端設定中)是收集資料的裝置,以及執行環境和策略操作的裝置。舉例來說,在 CPU 上執行的策略可能比在 CUDA 上執行的策略慢。當多個推論工作器同時執行時,將計算工作負載分配到可用的裝置上可以加快收集速度或避免 OOM(記憶體不足)錯誤。最後,批量大小和傳遞裝置的選擇(即資料在等待傳遞給收集工作器時儲存的裝置)也可能影響記憶體管理。要控制的關鍵參數是 devices,它控制執行裝置(即策略的裝置),以及 storing_device,它控制在 rollout 期間環境和資料儲存的裝置。一個好的經驗法則是通常對儲存和計算使用相同的裝置,這是在只傳遞 devices 參數時的預設行為。

除了這些計算參數外,使用者還可以選擇配置以下參數:

  • max_frames_per_traj:調用 env.reset() 之後的影格數量。

  • frames_per_batch:每次在收集器上迭代時傳遞的影格數量。

  • init_random_frames:隨機步驟的數量(調用 env.rand_step() 的步驟)。

  • reset_at_each_iter:如果 True,則在每次批次收集後重置環境。

  • split_trajs:如果 True,則軌跡將被分割並以填充的 tensordict 形式傳遞,並帶有指向表示有效值的布林遮罩的 "mask" 鍵。

  • exploration_type:與策略一起使用的探索策略。

  • reset_when_done:環境是否應該在達到完成狀態時重置。

收集器和批量大小

由於每個收集器都有自己組織其內運行的環境的方式,因此資料將根據收集器的具體情況具有不同的批量大小。下表總結了收集資料時的預期情況:

SyncDataCollector

MultiSyncDataCollector (n=B)

MultiaSyncDataCollector (n=B)

cat_results

NA

“stack”

0

-1

NA

單個環境

[T]

[B, T]

[B*(T//B)]

[B*(T//B)]

[T]

批量環境 (n=P)

[P, T]

[B, P, T]

[B * P, T]

[P, T * B]

[P, T]

在這些情況的每一種中,最後一個維度(對於 timeT)都會進行調整,以便批量大小等於傳遞給收集器的 frames_per_batch 參數。

警告

不應將 MultiSyncDataCollectorcat_results=0 一起使用,因為資料將沿著批量維度(對於批量環境)或時間維度(對於單個環境)堆疊,這可能會在兩者交換時引入一些混淆。cat_results="stack" 是一種更好且更一致的與環境互動的方式,因為它可以保持每個維度分離,並在配置、收集器類別和其他組件之間提供更好的互換性。

雖然 MultiSyncDataCollector 有一個對應於正在運行的子收集器數量的維度 (B),但 MultiaSyncDataCollector 沒有。當考慮到 MultiaSyncDataCollector 以先到先得的方式傳遞資料批次時,這很容易理解,而 MultiSyncDataCollector 在傳遞資料之前從每個子收集器收集資料。

收集器和策略副本

將策略傳遞給收集器時,我們可以選擇運行該策略的裝置。這可以用於將策略的訓練版本保存在一個裝置上,而將推論版本保存在另一個裝置上。例如,如果您有兩個 CUDA 裝置,則最好在一個裝置上進行訓練,而在另一個裝置上執行策略以進行推論。如果是這種情況,可以使用 update_policy_weights_() 將參數從一個裝置複製到另一個裝置(如果不需要複製,則此方法不執行任何操作)。

由於目標是避免顯式調用 policy.to(policy_device),因此收集器將對策略結構進行深層複製,並在實例化期間複製放置在新裝置上的參數(如果需要)。由於並非所有策略都支持深層複製(例如,使用 CUDA 圖或依賴第三方庫的策略),因此我們嘗試限制將執行深層複製的情況。下圖顯示了何時會發生這種情況。

../_images/collector-copy.png

收集器中的策略複製決策樹。

收集器和重播緩衝區的互操作性

在必須從重播緩衝區中採樣單個轉換的最簡單情況下,無需過多關注收集器的構建方式。收集後展平資料將是在填充儲存之前的一個足夠的預處理步驟。

>>> memory = ReplayBuffer(
...     storage=LazyTensorStorage(N),
...     transform=lambda data: data.reshape(-1))
>>> for data in collector:
...     memory.extend(data)

如果必須收集軌跡切片,建議的方法是創建一個多維緩衝區,並使用 SliceSampler 採樣器類進行採樣。必須確保傳遞給緩衝區的資料已正確成形,並且 timebatch 維度已明確分離。實際上,以下配置將起作用:

>>> # Single environment: no need for a multi-dimensional buffer
>>> memory = ReplayBuffer(
...     storage=LazyTensorStorage(N),
...     sampler=SliceSampler(num_slices=4, trajectory_key=("collector", "traj_ids"))
... )
>>> collector = SyncDataCollector(env, policy, frames_per_batch=N, total_frames=-1)
>>> for data in collector:
...     memory.extend(data)
>>> # Batched environments: a multi-dim buffer is required
>>> memory = ReplayBuffer(
...     storage=LazyTensorStorage(N, ndim=2),
...     sampler=SliceSampler(num_slices=4, trajectory_key=("collector", "traj_ids"))
... )
>>> env = ParallelEnv(4, make_env)
>>> collector = SyncDataCollector(env, policy, frames_per_batch=N, total_frames=-1)
>>> for data in collector:
...     memory.extend(data)
>>> # MultiSyncDataCollector + regular env: behaves like a ParallelEnv iif cat_results="stack"
>>> memory = ReplayBuffer(
...     storage=LazyTensorStorage(N, ndim=2),
...     sampler=SliceSampler(num_slices=4, trajectory_key=("collector", "traj_ids"))
... )
>>> collector = MultiSyncDataCollector([make_env] * 4,
...     policy,
...     frames_per_batch=N,
...     total_frames=-1,
...     cat_results="stack")
>>> for data in collector:
...     memory.extend(data)
>>> # MultiSyncDataCollector + parallel env: the ndim must be adapted accordingly
>>> memory = ReplayBuffer(
...     storage=LazyTensorStorage(N, ndim=3),
...     sampler=SliceSampler(num_slices=4, trajectory_key=("collector", "traj_ids"))
... )
>>> collector = MultiSyncDataCollector([ParallelEnv(2, make_env)] * 4,
...     policy,
...     frames_per_batch=N,
...     total_frames=-1,
...     cat_results="stack")
>>> for data in collector:
...     memory.extend(data)

使用 MultiSyncDataCollector 的重播緩衝區採樣軌跡目前並未完全支持,因為資料批次可能來自任何工作器,並且在大多數情況下,寫入緩衝區的連續批次不會來自同一來源(從而中斷軌跡)。

單節點資料收集器

DataCollectorBase()

資料收集器的基底類別。

SyncDataCollector(create_env_fn[, policy, ...])

用於 RL 問題的通用資料收集器。

MultiSyncDataCollector(create_env_fn[, ...])

在不同的處理程序上同步執行給定數量的 DataCollector。

MultiaSyncDataCollector(*args, **kwargs)

在不同的處理程序上非同步執行給定數量的 DataCollector。

aSyncDataCollector(create_env_fn, policy, *, ...)

在獨立的處理程序上執行單個 DataCollector。

分散式資料收集器

TorchRL 提供了一組分散式資料收集器。這些工具支援多種後端('gloo''nccl''mpi' 搭配 DistributedDataCollector 或 PyTorch RPC 搭配 RPCDataCollector)和啟動器('ray'submitittorch.multiprocessing)。它們可以有效地在單個節點或跨多個節點以同步或非同步模式使用。

資源:在專用資料夾中尋找這些收集器的範例。

注意

選擇子收集器:所有分散式收集器都支援各種單機收集器。人們可能會想知道為什麼要使用 MultiSyncDataCollectorParallelEnv 來代替。 通常,多處理收集器的 IO footprint 比需要在每個步驟進行通訊的並行環境低。 但是,模型規格在相反的方向上起作用,因為使用並行環境將導致策略(和/或轉換)的執行速度更快,因為這些操作將被向量化。

注意

選擇收集器(或並行環境)的裝置:在 CPU 上執行的並行環境和多處理環境,透過共享記憶體緩衝區來實現處理程序之間的資料共享。 根據所使用的機器功能,與 cuda 驅動程式原生支援的在 GPU 上共享資料相比,這可能會非常慢。 實際上,這意味著在構建並行環境或收集器時使用 device="cpu" 關鍵字參數可能會導致比使用 device="cuda"(如果可用)更慢的收集速度。

注意

鑑於該庫的許多可選依賴項(例如,Gym、Gymnasium 和許多其他依賴項),警告在多處理/分散式設定中可能會很快變得相當煩人。 預設情況下,TorchRL 會過濾子處理程序中的這些警告。 如果仍然希望看到這些警告,可以透過設定 torchrl.filter_warnings_subprocess=False 來顯示它們。

DistributedDataCollector(create_env_fn, ...)

具有 torch.distributed 後端的分散式資料收集器。

RPCDataCollector(create_env_fn, policy, *, ...)

基於 RPC 的分散式資料收集器。

DistributedSyncDataCollector(create_env_fn, ...)

具有 torch.distributed 後端的分散式同步資料收集器。

submitit_delayed_launcher(num_jobs[, ...])

submitit 的延遲啟動器。

RayCollector(create_env_fn, ...[, ...])

具有 Ray 後端的分散式資料收集器。

輔助函式

split_trajectories(rollout_tensordict, *[, ...])

用於軌跡分離的實用函式。

文件

取得 PyTorch 的完整開發人員文件

檢視文件

教學課程

取得初學者和進階開發人員的深入教學課程

檢視教學課程

資源

尋找開發資源並取得您的問題解答

檢視資源