• 文件 >
  • 從串流建立 tensordict
捷徑

從串流建立 tensordict

作者: Vincent Moens

在許多現實世界的應用中,資料會以不同的頻率連續產生。

例如,來自 IoT 裝置的感測器讀數、金融交易或社群媒體更新都可以產生需要即時處理和分析的資料串流。

當處理此類資料串流時,通常需要將傳入的資料「分桶」成離散的區塊,以便有效地處理和分析。但是,當處理具有不同頻率或格式的資料串流時,這可能會具有挑戰性。

在本教學中,我們將探討如何使用 TensorDict 建立和操作資料串流。我們將學習如何建立張量的延遲堆疊、處理非同步資料串流,以及密集化我們的資料以實現高效的儲存和處理。

在本教學中,您將學習: - 如何讀取資料串流並以固定的間隔將其寫入 tensordict 中; - 如何建立將具有異質形狀的內容堆疊在一起的 TensorDict; - 如果需要,如何使用 nested_tensor 在單個儲存空間中密集化這些張量。

將異質 tensordict 堆疊在一起

在許多現實生活中,資料以具有不同定義頻率的串流形式出現。

本教學的目標是「分桶」即將到來的資料,以便可以以給定的較慢頻率讀取和處理它。 這種情況下的挑戰是,資料可能無法以常規的「矩形」格式(即張量的每個維度都已明確定義)表示,但可能存在一個資料桶比另一個資料桶具有更多元素的情況,在這種情況下,我們無法簡單地將它們堆疊在一起。 通常,考慮資料的第一個和第二個桶如下所示的情況

import torch
from tensordict import TensorDict

bucket0 = TensorDict(stream0=torch.randn(5), stream1=torch.randn(4))
bucket1 = TensorDict(stream0=torch.randn(4), stream1=torch.randn(5))

原則上,我們無法在記憶體中連續堆疊這兩個 tensordict,因為兩個串流的形狀不同。 幸運的是,TensorDict 提供了一種工具來將具有異質張量形狀的實例組合在一起:LazyStackedTensorDict。 若要建立延遲堆疊,只需呼叫 lazy_stack()

data = TensorDict.lazy_stack([bucket0, bucket1], dim=0)
print(data)
LazyStackedTensorDict(
    fields={
        stream0: Tensor(shape=torch.Size([2, -1]), device=cpu, dtype=torch.float32, is_shared=False),
        stream1: Tensor(shape=torch.Size([2, -1]), device=cpu, dtype=torch.float32, is_shared=False)},
    exclusive_fields={
    },
    batch_size=torch.Size([2]),
    device=None,
    is_shared=False,
    stack_dim=0)

產生的資料只是兩個 tensordict 的表示,就好像它們已沿維度 0 堆疊在一起一樣。 LazyStackedTensorDict 支援 TensorDictBase 類別的大多數常見操作,以下是一些範例

data_select = data.select("stream0")
data_plus_1 = data + 1
data_apply = data.apply(lambda x: x + 1)

此外,對其進行索引將傳回我們用來建立堆疊的原始資料

assert data[0] is bucket0

儘管如此,在某些情況下,人們可能希望具有底層資料的連續表示。 為此,TensorDictBase 提供了一個 densify() 方法,該方法將堆疊可以堆疊的張量,並嘗試將其餘部分表示為 nested_tensor 實例

data_cont = data.densify()

非同步資料串流

現在讓我們切換到更具體的範例,在其中建立一個以給定頻率串流資料的函數(在這種情況下,只是每次迭代遞增 1 的整數)。

為了在不同執行緒之間傳遞資料,此函式會使用作為輸入接收的佇列。

import asyncio
from typing import List


async def generate_numbers(frequency: float, queue: asyncio.Queue) -> None:
    i = 0
    while True:
        await asyncio.sleep(1 / frequency)
        await queue.put(i)
        i += 1

collect_data 函式會從佇列中讀取指定時間長度的資料。一旦 timeout 時間到期,函式就會傳回。

async def collect_data(queue: asyncio.Queue, timeout: float) -> List[int]:
    values = []

    # We create a nested `collect` async function in order to be able to stop it as
    #  soon as timeout is passed (see wait_for below).
    async def collect():
        nonlocal values
        while True:
            value = await queue.get()
            values.append(value)

    task = asyncio.create_task(collect())
    try:
        await asyncio.wait_for(task, timeout=timeout)
    except asyncio.TimeoutError:
        task.cancel()
    return values

wait7hz 函式會從佇列中讀取指定時間長度的資料。

async def wait7hz() -> None:
    queue = asyncio.Queue()
    generate_task = asyncio.create_task(generate_numbers(7, queue))
    collect_data_task = asyncio.create_task(collect_data(queue, timeout=1))
    values = await collect_data_task
    # The ``generate_task`` has not been terminated
    generate_task.cancel()
    print(values)


asyncio.run(wait7hz())

from typing import Callable, Dict
[0, 1, 2, 3, 4, 5]

現在,我們可以設計一個繼承自 LazyStackedTensorDict 的類別,並從不同的資料流讀取資料,然後將它們註冊到不同的 tensordict 中。LazyStackedTensorDict 的一個很棒的特性是它可以逐步建構,因此我們可以簡單地透過擴展 lazy stack 來註冊新傳入的資料,直到我們收集到足夠的資料為止。以下是 StreamedTensorDict 類別的實作方式。

from tensordict import LazyStackedTensorDict, NestedKey, TensorDictBase


class StreamedTensorDict(LazyStackedTensorDict):
    """A lazy stack class that can be built from a dictionary of streams."""

    @classmethod
    async def from_streams(
        cls,
        streams: Dict[NestedKey, Callable],
        timeout: float,
        batch_size: int,
        densify: bool = True,
    ) -> TensorDictBase:
        td = cls(stack_dim=0)

        # We construct a queue for each stream
        queues = [asyncio.Queue() for _ in range(len(streams))]
        tasks = []
        for stream, queue in zip(streams.values(), queues):
            task = asyncio.create_task(stream(queue))
            tasks.append(task)
        for _ in range(batch_size):
            values_tasks = []
            for queue in queues:
                values_task = asyncio.create_task(collect_data(queue, timeout))
                values_tasks.append(values_task)
            values = await asyncio.gather(*values_tasks)
            td.append(TensorDict(dict(zip(streams.keys(), values))))

        # Cancel the generator tasks
        for task in tasks:
            task.cancel()
        if densify:
            return td.densify(layout=torch.strided)
        return td

最後,main 函式會組合串流函式 stream0stream1,並將它們傳遞給 StreamedTensorDict.from_streams 方法,該方法將收集 batch_size 批次的資料,每次 timeout=1 秒。

async def main() -> TensorDictBase:
    def stream0(queue):
        return generate_numbers(frequency=7, queue=queue)

    def stream1(queue):
        return generate_numbers(frequency=3, queue=queue)

    # Running this should take about 10 seconds
    return await StreamedTensorDict.from_streams(
        {"bucket0": stream0, "bucket1": stream1}, timeout=1, batch_size=10
    )


td = asyncio.run(main())

print("TensorDict from stream", td)
TensorDict from stream TensorDict(
    fields={
        bucket0: Tensor(shape=torch.Size([10, -1]), device=cpu, dtype=torch.int64, is_shared=False),
        bucket1: Tensor(shape=torch.Size([10, -1]), device=cpu, dtype=torch.int64, is_shared=False)},
    batch_size=torch.Size([10]),
    device=None,
    is_shared=False)
讓我們表示來自兩個資料流的資料 - 應該等於 torch.arange(),針對 batch_size * timeout * Hz

<=> 1 * 10 秒 * 3 或 7

print("bucket0 (7Hz, around 70 values)", td["bucket0"].values())
print("bucket1 (3Hz, around 30 values)", td["bucket1"].values())
print("shapes of bucket0 (7Hz, around 70 values)", td["bucket0"]._nested_tensor_size())
bucket0 (7Hz, around 70 values) tensor([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16, 17,
        18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35,
        36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53,
        54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68])
bucket1 (3Hz, around 30 values) tensor([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16, 17,
        18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28])
shapes of bucket0 (7Hz, around 70 values) tensor([[6],
        [7],
        [7],
        [7],
        [7],
        [7],
        [7],
        [7],
        [7],
        [7]])

結論

在本教學中,我們探索了使用 TensorDict 和非同步資料流的基本知識。 我們學習了如何建立 tensors 的 lazy stacks、使用 asyncio 處理非同步資料流,以及 densify 我們的資料以進行有效的儲存和處理。

我們也看到了如何使用 TensorDictLazyStackedTensorDict 來簡化複雜的資料處理任務,例如將具有不同頻率的資料流進行 bucketizing。 透過利用 TensorDict 和 asyncio 的強大功能,您可以建構可擴展且高效的資料處理 pipelines,這些 pipelines 可以處理即使是最嚴苛的真實世界應用程式。

感謝您閱讀本教學! 我們希望您覺得它有幫助且內容豐富。

腳本的總執行時間: (0 分鐘 11.021 秒)

由 Sphinx-Gallery 產生

文件

取得 PyTorch 的完整開發者文件

檢視文件

教學

取得初學者和進階開發人員的深入教學

檢視教學

資源

尋找開發資源並獲得您問題的解答

檢視資源