捷徑

Airflow

對於支援 Python 執行的管道,您可以直接使用 TorchX API。TorchX 旨在透過程式化 API 輕鬆整合到其他應用程式中。不需要特殊的 Airflow 整合。

使用 TorchX,您可以將 Airflow 用於管道協調,並在遠端 GPU 叢集上執行 PyTorch 應用程式(即分散式訓練)。

[1]:
import datetime
import pendulum

from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunType
from airflow.models.dag import DAG
from airflow.decorators import task


DATA_INTERVAL_START = pendulum.datetime(2021, 9, 13, tz="UTC")
DATA_INTERVAL_END = DATA_INTERVAL_START + datetime.timedelta(days=1)

要從 Airflow 啟動 TorchX 作業,您可以建立一個 Airflow Python 工作來匯入執行程式、啟動作業並等待其完成。如果您在遠端叢集上執行,您可能需要使用 virtualenv 工作來安裝 torchx 套件。

[2]:
@task(task_id=f'hello_torchx')
def run_torchx(message):
    """This is a function that will run within the DAG execution"""
    from torchx.runner import get_runner
    with get_runner() as runner:
        # Run the utils.sh component on the local_cwd scheduler.
        app_id = runner.run_component(
            "utils.sh",
            ["echo", message],
            scheduler="local_cwd",
        )

        # Wait for the the job to complete
        status = runner.wait(app_id, wait_interval=1)

        # Raise_for_status will raise an exception if the job didn't succeed
        status.raise_for_status()

        # Finally we can print all of the log lines from the TorchX job so it
        # will show up in the workflow logs.
        for line in runner.log_lines(app_id, "sh", k=0):
            print(line, end="")

定義好工作後,我們就可以將其放入 Airflow DAG 中並照常執行。

[3]:
from torchx.schedulers.ids import make_unique

with DAG(
    dag_id=make_unique('example_python_operator'),
    schedule_interval=None,
    start_date=DATA_INTERVAL_START,
    catchup=False,
    tags=['example'],
) as dag:
    run_job = run_torchx("Hello, TorchX!")


dagrun = dag.create_dagrun(
    state=DagRunState.RUNNING,
    execution_date=DATA_INTERVAL_START,
    data_interval=(DATA_INTERVAL_START, DATA_INTERVAL_END),
    start_date=DATA_INTERVAL_END,
    run_type=DagRunType.MANUAL,
)
ti = dagrun.get_task_instance(task_id="hello_torchx")
ti.task = dag.get_task(task_id="hello_torchx")
ti.run(ignore_ti_state=True)
assert ti.state == TaskInstanceState.SUCCESS
/tmp/ipykernel_52183/454499020.py:3 RemovedInAirflow3Warning: Param `schedule_interval` is deprecated and will be removed in a future release. Please use `schedule` instead.
[2024-07-17T02:04:16.962+0000] {taskinstance.py:2076} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: example_python_operator-mz5brbw7kwqxs.hello_torchx manual__2021-09-13T00:00:00+00:00 [None]>
[2024-07-17T02:04:16.968+0000] {taskinstance.py:2076} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: example_python_operator-mz5brbw7kwqxs.hello_torchx manual__2021-09-13T00:00:00+00:00 [None]>
[2024-07-17T02:04:16.969+0000] {taskinstance.py:2306} INFO - Starting attempt 1 of 1
[2024-07-17T02:04:16.970+0000] {taskinstance.py:2388} WARNING - cannot record queued_duration for task hello_torchx because previous state change time has not been saved
[2024-07-17T02:04:16.981+0000] {taskinstance.py:2330} INFO - Executing <Task(_PythonDecoratedOperator): hello_torchx> on 2021-09-13 00:00:00+00:00
[2024-07-17T02:04:17.253+0000] {taskinstance.py:2648} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='example_python_operator-mz5brbw7kwqxs' AIRFLOW_CTX_TASK_ID='hello_torchx' AIRFLOW_CTX_EXECUTION_DATE='2021-09-13T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2021-09-13T00:00:00+00:00'
[2024-07-17T02:04:17.255+0000] {taskinstance.py:430} INFO - ::endgroup::
[2024-07-17T02:04:17.953+0000] {api.py:72} INFO - Tracker configurations: {}
[2024-07-17T02:04:17.957+0000] {local_scheduler.py:771} INFO - Log directory not set in scheduler cfg. Creating a temporary log dir that will be deleted on exit. To preserve log directory set the `log_dir` cfg option
[2024-07-17T02:04:17.957+0000] {local_scheduler.py:777} INFO - Log directory is: /tmp/torchx_39w81v6k
Hello, TorchX!
[2024-07-17T02:04:18.062+0000] {python.py:237} INFO - Done. Returned value was: None
[2024-07-17T02:04:18.063+0000] {taskinstance.py:441} INFO - ::group::Post task execution logs
[2024-07-17T02:04:18.071+0000] {taskinstance.py:1206} INFO - Marking task as SUCCESS. dag_id=example_python_operator-mz5brbw7kwqxs, task_id=hello_torchx, run_id=manual__2021-09-13T00:00:00+00:00, execution_date=20210913T000000, start_date=20240717T020416, end_date=20240717T020418

如果一切順利,您應該會在上方看到 Hello, TorchX! 列印出來。

後續步驟

文件

存取 PyTorch 的完整開發人員文件

查看文件

教學

取得適用於初學者和進階開發人員的深入教學

查看教學

資源

尋找開發資源並獲得問題解答

查看資源