捷徑

進階 KubeFlow 流程範例

這是一個僅使用 TorchX 元件建構的 KubeFlow 流程範例。

KFP 轉接器可用於將 TorchX 元件直接轉換為可在 KFP 中使用的內容。

輸入參數

讓我們先為流程定義一些參數。

import argparse
import os.path
import sys
from typing import Dict

import kfp
import torchx
from torchx import specs
from torchx.components.dist import ddp as dist_ddp
from torchx.components.serve import torchserve
from torchx.components.utils import copy as utils_copy, python as utils_python
from torchx.pipelines.kfp.adapter import container_from_app


parser = argparse.ArgumentParser(description="example kfp pipeline")

TorchX 元件圍繞映像建構。根據您使用的排程器,這可能會有所不同,但對於 KFP,這些映像會指定為 Docker 容器。我們為範例應用程式準備了一個容器,為標準內建應用程式準備了另一個容器。如果您修改 torchx 範例程式碼,則需要在 KFP 上啟動它之前重建容器

parser.add_argument(
    "--image",
    type=str,
    help="docker image to use for the examples apps",
    default=torchx.IMAGE,
)

大多數 TorchX 元件使用 fsspec 來抽象化處理遠端檔案系統。這允許元件採用 s3:// 之類的路徑,以便輕鬆使用雲端儲存體提供者。

parser.add_argument(
    "--output_path",
    type=str,
    help="path to place the data",
    required=True,
)
parser.add_argument("--load_path", type=str, help="checkpoint path to load from")

此範例使用 torchserve 進行推論,因此我們需要指定一些選項。這假設您在同一個 Kubernetes 叢集中執行了一個 TorchServe 執行個體,其服務名稱為 torchserve,位於預設命名空間中。

請參閱 https://github.com/pytorch/serve/blob/master/kubernetes/README.md 以瞭解如何設定 TorchServe。

parser.add_argument(
    "--management_api",
    type=str,
    help="path to the torchserve management API",
    default="http://torchserve.default.svc.cluster.local:8081",
)
parser.add_argument(
    "--model_name",
    type=str,
    help="the name of the inference model",
    default="tiny_image_net",
)

筆記本。

if "NOTEBOOK" in globals():
    argv = [
        "--output_path",
        "/tmp/output",
    ]
else:
    argv = sys.argv[1:]

args: argparse.Namespace = parser.parse_args(argv)

建立元件

第一步是將資料下載到我們可以處理的地方。為此,我們可以使用內建的複製元件。此元件採用兩個有效的 fsspec 路徑,並將它們從一個複製到另一個。在這種情況下,我們使用 http 作為來源,並使用 output_path 下的檔案作為輸出。

data_path: str = os.path.join(args.output_path, "tiny-imagenet-200.zip")
copy_app: specs.AppDef = utils_copy(
    "http://cs231n.stanford.edu/tiny-imagenet-200.zip",
    data_path,
    image=args.image,
)

下一個元件用於資料預處理。它會接收來自上一個運算元的原始資料,並對其執行一些轉換,以便與訓練器一起使用。

datapreproc 會將資料輸出到指定的 fsspec 路徑。這些路徑都是提前指定的,因此我們有一個完全靜態的流程。

processed_data_path: str = os.path.join(args.output_path, "processed")
datapreproc_app: specs.AppDef = utils_python(
    "--output_path",
    processed_data_path,
    "--input_path",
    data_path,
    "--limit",
    "100",
    image=args.image,
    m="torchx.examples.apps.datapreproc.datapreproc",
    cpu=1,
    memMB=1024,
)

接下來,我們將建立訓練器元件,它將接收來自先前 datapreproc 元件的訓練資料。我們在單獨的元件檔案中定義了這一點,就像您通常那樣。

擁有單獨的元件檔案允許您透過 torchx run 從 TorchX CLI 啟動您的訓練器,以實現快速迭代,以及以自動化方式從流程中執行它。

# make sure examples is on the path
if "__file__" in globals():
    sys.path.append(os.path.join(os.path.dirname(__file__), "..", "..", ".."))


logs_path: str = os.path.join(args.output_path, "logs")
models_path: str = os.path.join(args.output_path, "models")

trainer_app: specs.AppDef = dist_ddp(
    *(
        "--output_path",
        models_path,
        "--load_path",
        args.load_path or "",
        "--log_path",
        logs_path,
        "--data_path",
        processed_data_path,
        "--epochs",
        str(1),
    ),
    image=args.image,
    m="torchx.examples.apps.lightning.train",
    j="1x1",
    # per node resource settings
    cpu=1,
    memMB=3000,
)

為了讓 tensorboard 路徑顯示在 KFP 的 UI 中,我們需要一些中繼資料,以便 KFP 知道從哪裡使用指標。

這將在我們建立 KFP 容器時使用。

ui_metadata: Dict[str, object] = {
    "outputs": [
        {
            "type": "tensorboard",
            "source": os.path.join(logs_path, "lightning_logs"),
        }
    ]
}

對於推論,我們利用了其中一個內建的 TorchX 元件。此元件接收模型並將其上傳到 TorchServe 管理 API 端點。

serve_app: specs.AppDef = torchserve(
    model_path=os.path.join(models_path, "model.mar"),
    management_api=args.management_api,
    image=args.image,
    params={
        "model_name": args.model_name,
        # set this to allocate a worker
        # "initial_workers": 1,
    },
)

對於模型可解釋性,我們利用了一個儲存在其自身元件檔案中的自訂元件。此元件接收來自 datapreproc 和訓練元件的輸出,並生成具有整合梯度結果的映像。

interpret_path: str = os.path.join(args.output_path, "interpret")
interpret_app: specs.AppDef = utils_python(
    *(
        "--load_path",
        os.path.join(models_path, "last.ckpt"),
        "--data_path",
        processed_data_path,
        "--output_path",
        interpret_path,
    ),
    image=args.image,
    m="torchx.examples.apps.lightning.interpret",
)

流程定義

最後一步是透過 KFP 轉接器使用 torchx 元件定義實際流程,並匯出可以上傳到 KFP 叢集的流程套件。

KFP 轉接器目前不追蹤輸入和輸出,因此需要透過 .after() 指定容器的依賴關係。

我們呼叫 .set_tty() 使元件的日誌更具回應性,例如出於範例目的。

def pipeline() -> None:
    # container_from_app creates a KFP container from the TorchX app
    # definition.
    copy = container_from_app(copy_app)
    copy.container.set_tty()

    datapreproc = container_from_app(datapreproc_app)
    datapreproc.container.set_tty()
    datapreproc.after(copy)

    # For the trainer we want to log that UI metadata so you can access
    # tensorboard from the UI.
    trainer = container_from_app(trainer_app, ui_metadata=ui_metadata)
    trainer.container.set_tty()
    trainer.after(datapreproc)

    if False:
        serve = container_from_app(serve_app)
        serve.container.set_tty()
        serve.after(trainer)

    if False:
        # Serve and interpret only require the trained model so we can run them
        # in parallel to each other.
        interpret = container_from_app(interpret_app)
        interpret.container.set_tty()
        interpret.after(trainer)


kfp.compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path="pipeline.yaml",
)

with open("pipeline.yaml", "rt") as f:
    print(f.read())

完成所有這些操作後,您應該會有一個流程檔案(通常是 pipeline.yaml),您可以透過 UI 或 kfp.Client 將其上傳到您的 KFP 叢集。

# sphinx_gallery_thumbnail_path = '_static/img/gallery-kfp.png'

**指令碼總執行時間:**(0 分鐘 0.000 秒)

由 Sphinx-Gallery 生成的圖庫

文件

取得 PyTorch 的完整開發人員文件

檢視文件

教學課程

取得適用於初學者和進階開發人員的深入教學課程

檢視教學課程

資源

尋找開發資源並獲得問題解答

檢視資源