Airflow コマンドライン インターフェースにアクセスする

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Apache Airflow には、DAG のトリガーと管理、DAG 実行とタスクに関する情報の取得、接続とユーザーの追加と削除など、タスクの実行に使用できるコマンドライン インターフェース(CLI)があります。

サポートされている Airflow CLI コマンド

Airflow は、Airflow ドキュメントで説明されている Airflow 2 CLI 構文を使用します。

サポートされている Airflow CLI コマンドの完全な一覧については、gcloud composer environments run コマンドのリファレンスをご覧ください。

始める前に

  • Cloud Composer で Google Cloud CLI を使用して Airflow CLI コマンドを実行するには、権限が必要です。

  • Airflow CLI コマンドは environments.executeAirflowCommand 割り当てを消費します。

  • Cloud Composer 3 では、kubectl を使用して Airflow CLI コマンドを実行することはできません。この方法は非推奨であり、2.4.0 より前のバージョンの Cloud Composer でのみ必要です。

Airflow CLI コマンドを実行

環境で Airflow CLI コマンドを実行するには、gcloud CLI を使用します。

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    SUBCOMMAND \
    -- SUBCOMMAND_ARGUMENTS

次のように置き換えます。

  • ENVIRONMENT_NAME: 環境の名前。
  • LOCATION: 環境が配置されているリージョン。
  • SUBCOMMAND: サポートされている Airflow CLI コマンドのいずれか。
  • SUBCOMMAND_ARGUMENTS は、Airflow CLI コマンドの引数に置き換えます。

サブコマンド引数の区切り文字

-- で、指定された Airflow CLI コマンドの引数を区切ります。

  • 複合 CLI コマンドをサブコマンドとして指定します。
  • 複合コマンドの引数は、-- 区切り文字の後にサブコマンド引数として指定します。

例:

gcloud composer environments run example-environment \
    dags list -- --output=json

デフォルトのロケーション

大部分の gcloud composer コマンドでは、ロケーションを指定する必要があります。ロケーションを指定するには、--location フラグを使用するか、デフォルトのロケーションを設定します

たとえば、Cloud Composer 環境で ID 5077 を指定し、sample_quickstart という名前の DAG をトリガーするには、次のようにします。

gcloud composer environments run example-environment \
    --location us-central1 dags trigger -- sample_quickstart \
    --run-id=5077

Cloud Composer API を使用して Airflow CLI コマンドを実行する

Cloud Composer 3 では、Cloud Composer API を使用して Airflow CLI コマンドを実行できます。

コマンドの実行

environments.executeAirflowCommand API リクエストを作成します。

{
  "environment": "projects/PROJECT_ID/locations/LOCATION/environments/ENVIRONMENT_NAME",
  "command": "AIRFLOW_COMMAND",
  "subcommand": "AIRFLOW_SUBCOMMAND",
  "parameters": [
    "SUBCOMMAND_PARAMETER"
  ]
}

以下を置き換えます。

  • PROJECT_ID: プロジェクト ID
  • LOCATION: 環境が配置されているリージョン。
  • ENVIRONMENT_NAME: 環境の名前。
  • AIRFLOW_COMMAND: 実行する Airflow CLI コマンド(dags など)。
  • AIRFLOW_SUBCOMMAND: 実行する Airflow CLI コマンドのサブコマンド(list など)。
  • (省略可)SUBCOMMAND_PARAMETER: サブコマンドのパラメータ。複数のパラメータを使用する場合は、リストにアイテムを追加します。

例:

// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:executeAirflowCommand
{
  "environment": "projects/example-project/locations/us-central1/environments/example-environment",
  "command": "dags",
  "subcommand": "list",
  "parameters": [
    "-o json",
    "--verbose"
  ]
}

ポーリング コマンドのステータス

Cloud Composer API を使用して Airflow CLI コマンドを実行した後、PollAirflowCommand リクエストを発行し、エラーとステータス コード用の exitInfo のフィールドを調べて、コマンドが正常に完了したかどうかを確認します。output フィールドにはログ行が含まれます。

コマンドの実行ステータスを取得してログを取得するには、ExecuteAirflowCommandRequest から返された executionIdpodpodNamespace の値を指定します。

例:

// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:pollAirflowCommand
{
  "executionId": "a117da94-355d-4ad4-839e-ac39ccb0bf48",
  "pod": "airflow-webserver-66d96b858f-tn96b",
  "podNamespace": "airflow-2-10-2-build-13-226523e4",
  "nextLineNumber": 1
}

次のステップ