Managed Airflow(Gen 3)で Apache Airflow DAG を実行する(Google Cloud CLI)

Managed Airflow(Gen 3) | Managed Airflow(Gen 2) | Managed Airflow(レガシー Gen 1)

このクイックスタート ガイドでは、Managed Service for Apache Airflow 環境を作成し、Managed Airflow(Gen 3)で Apache Airflow DAG を実行する方法について説明します。

始める前に

  1. アカウントにログインします。 Google Cloud を初めてご利用の場合は、 Google Cloud アカウントを作成して、 実際のシナリオでプロダクトがどのように機能するかを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. Google Cloud CLI をインストールします。

  3. 外部 ID プロバイダ(IdP)を使用している場合は、まず連携 ID を使用して gcloud CLI にログインする必要があります。

  4. gcloud CLI を初期化するには、次のコマンドを実行します:

    gcloud init
  5. プロジェクトを作成または選択します Google Cloud

    プロジェクトを選択または作成するために必要なロール

    • プロジェクトを選択する: プロジェクトの選択に特定の IAM ロールは必要ありません。ロールが付与されているプロジェクトを選択できます。
    • プロジェクトを作成する: プロジェクトを作成するには、プロジェクト作成者ロール (roles/resourcemanager.projectCreator)が必要です。これには resourcemanager.projects.create 権限が含まれています。ロールを付与する方法を確認する
    • プロジェクトを作成する: Google Cloud

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、 Google Cloud プロジェクトの名前に置き換えます。

  6. プロジェクトで課金が有効になっていることを確認します Google Cloud 。

  7. Google Cloud CLI をインストールします。

  8. 外部 ID プロバイダ(IdP)を使用している場合は、まず連携 ID を使用して gcloud CLI にログインする必要があります。

  9. gcloud CLI を初期化するには、次のコマンドを実行します:

    gcloud init
  10. プロジェクトを作成または選択します Google Cloud

    プロジェクトを選択または作成するために必要なロール

    • プロジェクトを選択する: プロジェクトの選択に特定の IAM ロールは必要ありません。ロールが付与されているプロジェクトを選択できます。
    • プロジェクトを作成する: プロジェクトを作成するには、プロジェクト作成者ロール (roles/resourcemanager.projectCreator)が必要です。これには resourcemanager.projects.create 権限が含まれています。ロールを付与する方法を確認する
    • プロジェクトを作成する: Google Cloud

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、 Google Cloud プロジェクトの名前に置き換えます。

  11. プロジェクトで課金が有効になっていることを確認します Google Cloud 。

  12. Managed Airflow API を有効にします。

    API を有効にするために必要なロール

    API を有効にするには、 権限を含む Service Usage 管理者 IAM ロール(roles/serviceusage.serviceUsageAdmin)が必要です。serviceusage.services.enableロールを付与する方法を確認する

    gcloud services enable composer.googleapis.com
  13. このクイックスタートを完了するために必要な権限を取得するには、プロジェクトに対する次の IAM ロールを付与するよう管理者に依頼してください。

    ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。

    必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。

環境のサービス アカウントを作成する

環境を作成するときに、サービス アカウントを指定します。このサービス アカウントは、環境のサービス アカウントと呼ばれます。環境でこのサービス アカウントを使用して、ほとんどのオペレーションを実行します。

ご使用の環境のサービス アカウントはユーザー アカウントではありません。サービス アカウントは、ユーザーではなく、アプリケーションや仮想マシン(VM)インスタンスで使用される特別なアカウントです。

環境のサービス アカウントを作成するには:

  1. Identity and Access Management のドキュメントの説明に沿って、新しいサービス アカウントを作成します

  2. Identity and Access Management のドキュメントに記載されているとおりに、ロールを付与します。必要なロールはComposer ワーカーcomposer.worker)です。

環境の作成

最新の Managed Airflow(Gen 3)バージョンで、us-central1 リージョンに example-environment という名前の新しい環境を作成します。

gcloud composer environments create example-environment \
    --location us-central1 \
    --image-version composer-3-airflow-2.11.1-build.5 \
    --service-account ENVIRONMENT_SERVICE_ACCOUNT

ENVIRONMENT_SERVICE_ACCOUNT は、以前に作成した環境のサービス アカウントに置き換えます。

DAG ファイルを作成する

Airflow DAG は、スケジュールを設定して実行する体系的なタスクの集まりです。DAG は、標準の Python ファイルで定義されます。

このガイドでは、quickstart.py ファイルで定義された Airflow DAG の例を使用します。このファイルの Python コードは、次の処理を行います。

  1. DAG(composer_sample_dag)を作成します。この DAG は毎日実行されます。
  2. タスク(print_dag_run_conf)を実行します。このタスクは、bash 演算子を使用して DAG 実行の構成を出力します。

ローカルマシンに quickstart.py ファイルのコピーを保存します。

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

DAG ファイルを環境のバケットにアップロードする

すべての Managed Airflow 環境には、Cloud Storage バケットが関連付けられています。Managed Airflow の Airflow は、このバケットの /dags フォルダにある DAG のみをスケジュール設定します。

DAG のスケジュールを設定するには、quickstart.py をローカルマシンから使用中の環境の /dags フォルダにアップロードします。

Google Cloud CLI で quickstart.py をアップロードするには、quickstart.py ファイルがあるフォルダで次のコマンドを実行します。

gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py

DAG を表示する

DAG ファイルをアップロードすると、Airflow によって次の処理が行われます。

  1. アップロードした DAG ファイルを解析します。DAG が Airflow で使用可能になるまでに数分かかる場合があります。
  2. DAG を使用可能な DAG のリストに追加します。
  3. DAG ファイルで指定したスケジュールに沿って DAG を実行します。

DAG UI で DAG を表示して、DAG がエラーなしで処理され、Airflow で使用できることを確認します。DAG UI は、 コンソールで Google Cloud DAG 情報を表示するための Managed Airflow インターフェースです。Managed Airflow は、ネイティブの Airflow ウェブ インターフェースである Airflow UIにもアクセスできます。

  1. 以前にアップロードした DAG ファイルを Airflow が処理し、最初の DAG 実行(後述)を完了するまで、約 5 分間待ちます。

  2. Google Cloud CLI で次のコマンドを実行します。このコマンドは、環境内の DAG を一覧表示する dags list Airflow CLI コマンドを実行します。

    gcloud composer environments run example-environment \
    --location us-central1 \
    dags list
    
  3. コマンドの出力に composer_quickstart DAG のリストが含まれていることを確認します。

    出力例:

    Executing the command: [ airflow dags list ]...
    Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb
    Use ctrl-c to interrupt the command
    dag_id              | filepath              | owner            | paused
    ====================+=======================+==================+=======
    airflow_monitoring  | airflow_monitoring.py | airflow          | False
    composer_quickstart | dag-quickstart-af2.py | Composer Example | False
    

DAG 実行の詳細を表示する

DAG の 1 回の実行は DAG 実行と呼ばれます。DAG ファイルの開始日が昨日に設定されているため、Airflow はサンプル DAG の DAG 実行をすぐに実行します。このようにして、Airflow は指定された DAG のスケジュールに追いつきます。

サンプル DAG には、コンソールで echo コマンドを実行する 1 つのタスク print_dag_run_conf が含まれています。このコマンドは、DAG に関するメタ情報(DAG 実行の数値識別子)を出力します。

Google Cloud CLI で次のコマンドを実行します。このコマンドは、composer_quickstart DAG の DAG 実行を一覧表示します。

gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart

出力例:

dag_id              | run_id                                      | state   | execution_date                   | start_date                       | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00

Airflow CLI には、タスクログを表示するコマンドはありません。Airflow タスクログを表示するには、 Managed Airflow DAG UI、Airflow UI、Cloud Logging などの 他の方法を使用できます。このガイドでは、特定の DAG 実行のログについて Cloud Logging にクエリを実行する方法について説明します。

Google Cloud CLI で次のコマンドを実行します。このコマンドは、Cloud Logging から composer_quickstart DAG の特定の DAG 実行に関するログを読み取ります。--format 引数は、ログメッセージのテキストのみが表示されるように出力を整えます。

gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"

以下のように置き換えます。

  • RUN_ID は、前に実行した tasks states-for-dag-run コマンドの出力の run_id 値に置き換えます。例: 2024-02-17T15:38:38.969307+00:00

出力例:

...

Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task

...

Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746

...

Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check

クリーンアップ

このページで使用したリソースについて、アカウントに課金されないようにするには、リソースを含むプロジェクトを削除します。 Google Cloud Google Cloud

このチュートリアルで使用したリソース を削除します。

  1. Managed Airflow 環境を削除します。

    1. コンソールで、[Environments] ページに移動します。 Google Cloud

      [環境] に移動

    2. [example-environment] を選択し、[削除] をクリックします。

    3. 環境が削除されるまで待ちます。

  2. 環境のバケットを削除します。Managed Airflow 環境を削除しても、バケットは削除されません。

    1. コンソールで、[ストレージ] > [ブラウザ] ページに移動します。 Google Cloud

      [ストレージ] > [ブラウザ] に移動

    2. 環境のバケットを選択して、[削除] をクリックします。たとえば、このバケットの名前を us-central1-example-environ-c1616fe8-bucket にします。

次のステップ