快捷方式

TorchScript 中的動態平行化

建立於:2020 年 7 月 28 日 | 最後更新:2024 年 12 月 02 日 | 最後驗證:2024 年 11 月 05 日

警告

TorchScript 不再處於積極開發階段。

在本教學中,我們介紹了在 TorchScript 中執行動態運算符間平行化的語法。 這種平行化具有以下屬性

  • 動態 - 建立的平行任務數量及其工作負載可以取決於程式的控制流程。

  • 運算符間 - 平行化涉及平行執行 TorchScript 程式片段。 這與運算符內平行化不同,後者涉及拆分個別運算符並平行執行運算符工作的子集。

基本語法

動態平行化的兩個重要 API 是

  • torch.jit.fork(fn : Callable[..., T], *args, **kwargs) -> torch.jit.Future[T]

  • torch.jit.wait(fut : torch.jit.Future[T]) -> T

展示這些工作原理的一個好方法是透過範例

import torch

def foo(x):
    return torch.neg(x)

@torch.jit.script
def example(x):
    # Call `foo` using parallelism:
    # First, we "fork" off a task. This task will run `foo` with argument `x`
    future = torch.jit.fork(foo, x)

    # Call `foo` normally
    x_normal = foo(x)

    # Second, we "wait" on the task. Since the task may be running in
    # parallel, we have to "wait" for its result to become available.
    # Notice that by having lines of code between the "fork()" and "wait()"
    # call for a given Future, we can overlap computations so that they
    # run in parallel.
    x_parallel = torch.jit.wait(future)

    return x_normal, x_parallel

print(example(torch.ones(1))) # (-1., -1.)

fork() 接受可呼叫物件 fn 和該可呼叫物件的引數 argskwargs,並建立非同步任務來執行 fnfn 可以是函數、方法或 Module 實例。 fork() 傳回對此執行結果值的參考,稱為 Future。 因為 fork 在建立非同步任務後立即傳回,所以 fn 可能尚未在執行 fork() 呼叫後面的程式碼行時執行。 因此,wait() 用於等待非同步任務完成並傳回值。

這些建構可以用來重疊函數中語句的執行(在工作範例區段中顯示)或與迴圈等其他語言建構組合

import torch
from typing import List

def foo(x):
    return torch.neg(x)

@torch.jit.script
def example(x):
    futures : List[torch.jit.Future[torch.Tensor]] = []
    for _ in range(100):
        futures.append(torch.jit.fork(foo, x))

    results = []
    for future in futures:
        results.append(torch.jit.wait(future))

    return torch.sum(torch.stack(results))

print(example(torch.ones([])))

注意

當我們初始化一個空的 Futures 列表時,我們需要為 futures 添加一個顯式的類型註釋。 在 TorchScript 中,空的容器預設為假定它們包含 Tensor 值,因此我們將列表建構函式註釋為 # 類型 List[torch.jit.Future[torch.Tensor]]

此範例使用 fork() 啟動 100 個函數 foo 的實例,等待 100 個任務完成,然後將結果相加,傳回 -100.0

應用範例:雙向 LSTM 的集成

讓我們嘗試將平行化應用到更實際的範例,看看我們可以從中獲得什麼樣的效能。 首先,讓我們定義基準模型:雙向 LSTM 層的集成。

import torch, time

# In RNN parlance, the dimensions we care about are:
# # of time-steps (T)
# Batch size (B)
# Hidden size/number of "channels" (C)
T, B, C = 50, 50, 1024

# A module that defines a single "bidirectional LSTM". This is simply two
# LSTMs applied to the same sequence, but one in reverse
class BidirectionalRecurrentLSTM(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.cell_f = torch.nn.LSTM(input_size=C, hidden_size=C)
        self.cell_b = torch.nn.LSTM(input_size=C, hidden_size=C)

    def forward(self, x : torch.Tensor) -> torch.Tensor:
        # Forward layer
        output_f, _ = self.cell_f(x)

        # Backward layer. Flip input in the time dimension (dim 0), apply the
        # layer, then flip the outputs in the time dimension
        x_rev = torch.flip(x, dims=[0])
        output_b, _ = self.cell_b(torch.flip(x, dims=[0]))
        output_b_rev = torch.flip(output_b, dims=[0])

        return torch.cat((output_f, output_b_rev), dim=2)


# An "ensemble" of `BidirectionalRecurrentLSTM` modules. The modules in the
# ensemble are run one-by-one on the same input then their results are
# stacked and summed together, returning the combined result.
class LSTMEnsemble(torch.nn.Module):
    def __init__(self, n_models):
        super().__init__()
        self.n_models = n_models
        self.models = torch.nn.ModuleList([
            BidirectionalRecurrentLSTM() for _ in range(self.n_models)])

    def forward(self, x : torch.Tensor) -> torch.Tensor:
        results = []
        for model in self.models:
            results.append(model(x))
        return torch.stack(results).sum(dim=0)

# For a head-to-head comparison to what we're going to do with fork/wait, let's
# instantiate the model and compile it with TorchScript
ens = torch.jit.script(LSTMEnsemble(n_models=4))

# Normally you would pull this input out of an embedding table, but for the
# purpose of this demo let's just use random data.
x = torch.rand(T, B, C)

# Let's run the model once to warm up things like the memory allocator
ens(x)

x = torch.rand(T, B, C)

# Let's see how fast it runs!
s = time.time()
ens(x)
print('Inference took', time.time() - s, ' seconds')

在我的機器上,這個網路在 2.05 秒內執行。 我們可以做得更好!

平行化正向和反向層

我們可以做的一個非常簡單的事情是平行化 BidirectionalRecurrentLSTM 中的正向和反向層。 對於這種情況,計算的結構是靜態的,所以我們甚至不需要任何迴圈。 讓我們像這樣重寫 BidirectionalRecurrentLSTMforward 方法

def forward(self, x : torch.Tensor) -> torch.Tensor:
    # Forward layer - fork() so this can run in parallel to the backward
    # layer
    future_f = torch.jit.fork(self.cell_f, x)

    # Backward layer. Flip input in the time dimension (dim 0), apply the
    # layer, then flip the outputs in the time dimension
    x_rev = torch.flip(x, dims=[0])
    output_b, _ = self.cell_b(torch.flip(x, dims=[0]))
    output_b_rev = torch.flip(output_b, dims=[0])

    # Retrieve the output from the forward layer. Note this needs to happen
    # *after* the stuff we want to parallelize with
    output_f, _ = torch.jit.wait(future_f)

    return torch.cat((output_f, output_b_rev), dim=2)

在這個範例中,forward()cell_f 的執行委派給另一個線程,同時它繼續執行 cell_b。 這會導致兩個儲存格的執行相互重疊。

使用這個簡單修改再次執行腳本會產生 1.71 秒的執行時間,提高了 17%

旁註:可視化平行化

我們尚未完成模型的最佳化,但值得介紹我們用於視覺化效能的工具。其中一個重要的工具是 PyTorch 分析器 (profiler)

讓我們使用分析器以及 Chrome 追蹤匯出功能來視覺化我們平行化模型的效能

with torch.autograd.profiler.profile() as prof:
    ens(x)
prof.export_chrome_trace('parallel.json')

這段程式碼會寫出一個名為 parallel.json 的檔案。 如果您在 Google Chrome 中導覽至 chrome://tracing,點擊 Load 按鈕,並載入該 JSON 檔案,您應該會看到如下的時間軸

https://i.imgur.com/rm5hdG9.png

時間軸的水平軸表示時間,垂直軸表示執行緒。 如我們所見,我們一次執行兩個 lstm 實例。 這是我們努力平行化雙向層的結果!

在集成模型中平行化模型

您可能已經注意到我們的程式碼中還有另一個平行化的機會:我們還可以彼此平行地執行 LSTMEnsemble 中包含的模型。 這樣做的方式非常簡單,以下是我們應該如何更改 LSTMEnsembleforward 方法

def forward(self, x : torch.Tensor) -> torch.Tensor:
    # Launch tasks for each model
    futures : List[torch.jit.Future[torch.Tensor]] = []
    for model in self.models:
        futures.append(torch.jit.fork(model, x))

    # Collect the results from the launched tasks
    results : List[torch.Tensor] = []
    for future in futures:
        results.append(torch.jit.wait(future))

    return torch.stack(results).sum(dim=0)

或者,如果您重視簡潔,我們可以使用列表推導式

def forward(self, x : torch.Tensor) -> torch.Tensor:
    futures = [torch.jit.fork(model, x) for model in self.models]
    results = [torch.jit.wait(fut) for fut in futures]
    return torch.stack(results).sum(dim=0)

如簡介中所述,我們使用迴圈為我們集成模型中的每個模型分叉任務。 然後,我們使用另一個迴圈來等待所有任務完成。 這提供了更多的計算重疊。

透過這個小小的更新,腳本在 1.4 秒內運行,總加速了 32%! 對於兩行程式碼來說,非常棒。

我們可以再次使用 Chrome 追蹤器來看看發生了什麼

https://i.imgur.com/kA0gyQm.png

我們現在可以看到所有 LSTM 實例都在完全平行地運行。

結論

在本教學中,我們學習了 fork()wait(),它們是在 TorchScript 中進行動態、互通平行化的基本 API。 我們看到了使用這些函數來平行化 TorchScript 程式碼中函數、方法或 Modules 執行的幾個典型使用模式。 最後,我們完成了一個使用這種技術來最佳化模型的範例,並探索了 PyTorch 中可用的效能測量和視覺化工具。

文件

存取 PyTorch 的完整開發人員文件

檢視文件

教學課程

取得初學者和進階開發人員的深入教學課程

檢視教學課程

資源

尋找開發資源並取得您問題的解答

檢視資源