Managed Airflow(第 2 世代)で Apache Airflow DAG を実行する(Google Cloud CLI)

Managed Airflow(第 3 世代) | Managed Airflow(第 2 世代) | Managed Airflow(以前の第 1 世代)

このクイックスタート ガイドでは、Managed Service for Apache Airflow 環境を作成し、Managed Airflow(第 2 世代)で Apache Airflow DAG を実行する方法について説明します。

始める前に

  1. Google Cloud アカウントにログインします。 Google Cloudを初めて使用する場合は、 アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. Google Cloud CLI をインストールします。

  3. 外部 ID プロバイダ(IdP)を使用している場合は、まず連携 ID を使用して gcloud CLI にログインする必要があります。

  4. gcloud CLI を初期化するには、次のコマンドを実行します。

    gcloud init
  5. Google Cloud プロジェクトを作成または選択します

    プロジェクトの選択または作成に必要なロール

    • プロジェクトを選択する: プロジェクトの選択に特定の IAM ロールは必要ありません。ロールが付与されているプロジェクトであれば、どのプロジェクトでも選択できます。
    • プロジェクトを作成する: プロジェクトを作成するには、resourcemanager.projects.create 権限を含むプロジェクト作成者ロール(roles/resourcemanager.projectCreator)が必要です。詳しくは、ロールを付与する方法をご覧ください。
    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、 Google Cloud プロジェクトの名前に置き換えます。

  6. Google Cloud プロジェクトに対して課金が有効になっていることを確認します

  7. Google Cloud CLI をインストールします。

  8. 外部 ID プロバイダ(IdP)を使用している場合は、まず連携 ID を使用して gcloud CLI にログインする必要があります。

  9. gcloud CLI を初期化するには、次のコマンドを実行します。

    gcloud init
  10. Google Cloud プロジェクトを作成または選択します

    プロジェクトの選択または作成に必要なロール

    • プロジェクトを選択する: プロジェクトの選択に特定の IAM ロールは必要ありません。ロールが付与されているプロジェクトであれば、どのプロジェクトでも選択できます。
    • プロジェクトを作成する: プロジェクトを作成するには、resourcemanager.projects.create 権限を含むプロジェクト作成者ロール(roles/resourcemanager.projectCreator)が必要です。詳しくは、ロールを付与する方法をご覧ください。
    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、 Google Cloud プロジェクトの名前に置き換えます。

  11. Google Cloud プロジェクトに対して課金が有効になっていることを確認します

  12. マネージド Airflow API を有効にします。

    API を有効にするために必要なロール

    API を有効にするには、serviceusage.services.enable 権限を含む Service Usage 管理者 IAM ロール(roles/serviceusage.serviceUsageAdmin)が必要です。詳しくは、ロールを付与する方法をご覧ください。

    gcloud services enable composer.googleapis.com
  13. このクイックスタートを完了するために必要な権限を取得するには、プロジェクトに対する次の IAM ロールを付与するよう管理者に依頼してください。

    ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。

    必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。

環境のサービス アカウントを作成する

環境を作成するときに、サービス アカウントを指定します。このサービス アカウントは、環境のサービス アカウントと呼ばれます。環境でこのサービス アカウントを使用して、ほとんどのオペレーションを実行します。

ご使用の環境のサービス アカウントはユーザー アカウントではありません。サービス アカウントは、ユーザーではなく、アプリケーションや仮想マシン(VM)インスタンスで使用される特別なアカウントです。

環境のサービス アカウントを作成するには:

  1. Identity and Access Management のドキュメントの説明に沿って、新しいサービス アカウントを作成します

  2. Identity and Access Management のドキュメントに記載されているとおりに、ロールを付与します。必要なロールは Composer ワーカーcomposer.worker)です。

環境の作成

これがプロジェクトの最初の環境の場合は、Managed Airflow サービス エージェント アカウントを新しいプリンシパルとして環境のサービス アカウントに追加して roles/composer.ServiceAgentV2Ext ロールを付与します。

デフォルトでは、環境はデフォルトの Compute Engine サービス アカウントを使用します。また、次の例では、この権限を追加する方法を示します。

# Get current project's project number
PROJECT_NUMBER=$(gcloud projects list \
  --filter="$(gcloud config get-value project)" \
  --format="value(PROJECT_NUMBER)" \
  --limit=1)

# Add the Cloud Composer v2 API Service Agent Extension role
gcloud iam service-accounts add-iam-policy-binding \
    ENVIRONMENT_SERVICE_ACCOUNT \
    --member serviceAccount:service-$PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com \
    --role roles/composer.ServiceAgentV2Ext

ENVIRONMENT_SERVICE_ACCOUNT は、以前に作成した環境のサービス アカウントに置き換えます。

最新の Managed Airflow(第 2 世代)バージョンで、us-central1 リージョンに example-environment という名前の新しい環境を作成します。

gcloud composer environments create example-environment \
    --location us-central1 \
    --image-version composer-2.17.0-airflow-2.11.1 \
    --service-account ENVIRONMENT_SERVICE_ACCOUNT

ENVIRONMENT_SERVICE_ACCOUNT は、以前に作成した環境のサービス アカウントに置き換えます。

DAG ファイルを作成する

Airflow DAG は、スケジュールを設定して実行する体系的なタスクの集まりです。DAG は、標準の Python ファイルで定義されます。

このガイドでは、quickstart.py ファイルで定義された Airflow DAG の例を使用します。このファイルの Python コードは、次の処理を行います。

  1. DAG(composer_sample_dag)を作成します。この DAG は毎日実行されます。
  2. タスク(print_dag_run_conf)を実行します。このタスクは、bash 演算子を使用して DAG 実行の構成を出力します。

ローカルマシンに quickstart.py ファイルのコピーを保存します。

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

DAG ファイルを環境のバケットにアップロードする

すべての Managed Airflow 環境には、Cloud Storage バケットが関連付けられています。Managed Airflow の Airflow は、このバケットの /dags フォルダにある DAG のみをスケジュール設定します。

DAG のスケジュールを設定するには、quickstart.py をローカルマシンから使用中の環境の /dags フォルダにアップロードします。

Google Cloud CLI で quickstart.py をアップロードするには、quickstart.py ファイルがあるフォルダで次のコマンドを実行します。

gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py

DAG を表示する

DAG ファイルをアップロードすると、Airflow によって次の処理が行われます。

  1. アップロードした DAG ファイルを解析します。DAG が Airflow で使用可能になるまでに数分かかる場合があります。
  2. DAG を使用可能な DAG のリストに追加します。
  3. DAG ファイルで指定したスケジュールに沿って DAG を実行します。

DAG UI で DAG を表示して、DAG がエラーなしで処理され、Airflow で使用できることを確認します。DAG UI は、 Google Cloud コンソールで DAG 情報を表示するための Managed Airflow インターフェースです。Managed Airflow は、ネイティブの Airflow ウェブ インターフェースである Airflow UI にもアクセスできます。

  1. 以前にアップロードした DAG ファイルを Airflow が処理し、最初の DAG 実行(後述)を完了するまで、約 5 分間待ちます。

  2. Google Cloud CLI で次のコマンドを実行します。このコマンドは、環境内の DAG を一覧表示する dags list Airflow CLI コマンドを実行します。

    gcloud composer environments run example-environment \
    --location us-central1 \
    dags list
    
  3. コマンドの出力に composer_quickstart DAG のリストが含まれていることを確認します。

    出力例:

    Executing the command: [ airflow dags list ]...
    Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb
    Use ctrl-c to interrupt the command
    dag_id              | filepath              | owner            | paused
    ====================+=======================+==================+=======
    airflow_monitoring  | airflow_monitoring.py | airflow          | False
    composer_quickstart | dag-quickstart-af2.py | Composer Example | False
    

DAG 実行の詳細を表示する

DAG の 1 回の実行は DAG 実行と呼ばれます。DAG ファイルの開始日が昨日に設定されているため、Airflow はサンプル DAG の DAG 実行をすぐに実行します。このようにして、Airflow は指定された DAG のスケジュールに追いつきます。

サンプル DAG には、コンソールで echo コマンドを実行する 1 つのタスク print_dag_run_conf が含まれています。このコマンドは、DAG に関するメタ情報(DAG 実行の数値識別子)を出力します。

Google Cloud CLI で次のコマンドを実行します。このコマンドは、composer_quickstart DAG の DAG 実行を一覧表示します。

gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart

出力例:

dag_id              | run_id                                      | state   | execution_date                   | start_date                       | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00

Airflow CLI には、タスクログを表示するコマンドはありません。他の方法で Airflow タスクログを表示できます。Managed Airflow DAG UI、Airflow UI、Cloud Logging を使用します。このガイドでは、特定の DAG 実行のログについて Cloud Logging にクエリを実行する方法について説明します。

Google Cloud CLI で次のコマンドを実行します。このコマンドは、Cloud Logging から composer_quickstart DAG の特定の DAG 実行に関するログを読み取ります。--format 引数は、ログメッセージのテキストのみが表示されるように出力を整えます。

gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"

以下のように置き換えます。

  • RUN_ID は、前に実行した tasks states-for-dag-run コマンドの出力の run_id 値に置き換えます。例: 2024-02-17T15:38:38.969307+00:00

出力例:

...

Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task

...

Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746

...

Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check

クリーンアップ

このページで使用したリソースについて、 Google Cloud アカウントに課金されないようにするには、リソースを含む Google Cloud プロジェクトを削除します。

このチュートリアルで使用したリソースを削除します。

  1. Managed Airflow 環境を削除します。

    1. Google Cloud コンソールで、[環境] ページに移動します。

      [環境] に移動

    2. [example-environment] を選択し、[削除] をクリックします。

    3. 環境が削除されるまで待ちます。

  2. 環境のバケットを削除します。Managed Airflow 環境を削除しても、バケットは削除されません。

    1. Google Cloud コンソールで、[ストレージ] > [ブラウザ] ページに移動します。

      [ストレージ] > [ブラウザ] に移動します。

    2. 環境のバケットを選択して、[削除] をクリックします。たとえば、このバケットの名前を us-central1-example-environ-c1616fe8-bucket にします。

次のステップ