Managed Airflow(Gen 3)で Apache Airflow DAG を実行する(Google Cloud CLI)
Managed Airflow(Gen 3) | Managed Airflow(Gen 2) | Managed Airflow(レガシー Gen 1)
このクイックスタート ガイドでは、Managed Service for Apache Airflow 環境を作成し、Managed Airflow(Gen 3)で Apache Airflow DAG を実行する方法について説明します。
Airflow を初めてご利用の場合は、Airflow のコンセプト、オブジェクト、使用状況の詳細について、Apache Airflow ドキュメントのAirflow のコンセプト チュートリアルをご覧ください。
コンソールを使用する場合 Google Cloud は、Managed Service for Apache Airflow で Apache Airflow DAG を実行する をご覧ください。
環境を Terraform を使用して 作成する場合は、 環境を作成する(Terraform) をご覧ください。
始める前に
- アカウントにログインします。 Google Cloud を初めてご利用の場合は、 Google Cloud アカウントを作成して、 実際のシナリオでプロダクトがどのように機能するかを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
-
Google Cloud CLI をインストールします。
-
外部 ID プロバイダ(IdP)を使用している場合は、まず連携 ID を使用して gcloud CLI にログインする必要があります。
-
gcloud CLI を初期化するには、次のコマンドを実行します:
gcloud init -
プロジェクトを作成または選択します Google Cloud 。
プロジェクトを選択または作成するために必要なロール
- プロジェクトを選択する: プロジェクトの選択に特定の IAM ロールは必要ありません。ロールが付与されているプロジェクトを選択できます。
-
プロジェクトを作成する: プロジェクトを作成するには、プロジェクト作成者ロール
(
roles/resourcemanager.projectCreator)が必要です。これにはresourcemanager.projects.create権限が含まれています。ロールを付与する方法を確認する。
-
プロジェクトを作成する: Google Cloud
gcloud projects create PROJECT_ID
PROJECT_IDは、作成する Google Cloud プロジェクトの名前に置き換えます。 -
作成した Google Cloud プロジェクトを選択します。
gcloud config set project PROJECT_ID
PROJECT_IDは、 Google Cloud プロジェクトの名前に置き換えます。
-
Google Cloud CLI をインストールします。
-
外部 ID プロバイダ(IdP)を使用している場合は、まず連携 ID を使用して gcloud CLI にログインする必要があります。
-
gcloud CLI を初期化するには、次のコマンドを実行します:
gcloud init -
プロジェクトを作成または選択します Google Cloud 。
プロジェクトを選択または作成するために必要なロール
- プロジェクトを選択する: プロジェクトの選択に特定の IAM ロールは必要ありません。ロールが付与されているプロジェクトを選択できます。
-
プロジェクトを作成する: プロジェクトを作成するには、プロジェクト作成者ロール
(
roles/resourcemanager.projectCreator)が必要です。これにはresourcemanager.projects.create権限が含まれています。ロールを付与する方法を確認する。
-
プロジェクトを作成する: Google Cloud
gcloud projects create PROJECT_ID
PROJECT_IDは、作成する Google Cloud プロジェクトの名前に置き換えます。 -
作成した Google Cloud プロジェクトを選択します。
gcloud config set project PROJECT_ID
PROJECT_IDは、 Google Cloud プロジェクトの名前に置き換えます。
-
Managed Airflow API を有効にします。
API を有効にするために必要なロール
API を有効にするには、 権限を含む Service Usage 管理者 IAM ロール(
roles/serviceusage.serviceUsageAdmin)が必要です。serviceusage.services.enableロールを付与する方法を確認する。gcloud services enable composer.googleapis.com
-
このクイックスタートを完了するために必要な権限を取得するには、プロジェクトに対する次の IAM ロールを付与するよう管理者に依頼してください。
-
IAM ロールと権限を割り当てるには:
プロジェクト IAM 管理者 (
roles/resourcemanager.projectIamAdmin) -
Managed Airflow 環境のサービス アカウントを作成するには:
サービス アカウントの作成 (
roles/iam.serviceAccountCreator) -
Managed Airflow 環境を表示、作成、管理するには:
- 環境とストレージ オブジェクトの管理者 (
roles/composer.environmentAndStorageObjectAdmin) - サービス アカウント ユーザー (
roles/iam.serviceAccountUser)
- 環境とストレージ オブジェクトの管理者 (
-
ログを表示するには:
ログビューア (
roles/logging.viewer)
ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。
-
IAM ロールと権限を割り当てるには:
プロジェクト IAM 管理者 (
環境のサービス アカウントを作成する
環境を作成するときに、サービス アカウントを指定します。このサービス アカウントは、環境のサービス アカウントと呼ばれます。環境でこのサービス アカウントを使用して、ほとんどのオペレーションを実行します。
ご使用の環境のサービス アカウントはユーザー アカウントではありません。サービス アカウントは、ユーザーではなく、アプリケーションや仮想マシン(VM)インスタンスで使用される特別なアカウントです。
環境のサービス アカウントを作成するには:
Identity and Access Management のドキュメントの説明に沿って、新しいサービス アカウントを作成します。
Identity and Access Management のドキュメントに記載されているとおりに、ロールを付与します。必要なロールはComposer ワーカー (
composer.worker)です。
環境の作成
最新の Managed Airflow(Gen 3)バージョンで、us-central1
リージョンに example-environment という名前の新しい環境を作成します。
gcloud composer environments create example-environment \
--location us-central1 \
--image-version composer-3-airflow-2.11.1-build.3 \
--service-account ENVIRONMENT_SERVICE_ACCOUNT
ENVIRONMENT_SERVICE_ACCOUNT は、以前に作成した環境のサービス アカウントに置き換えます。
DAG ファイルを作成する
Airflow DAG は、スケジュールを設定して実行する体系的なタスクの集まりです。DAG は、標準の Python ファイルで定義されます。
このガイドでは、quickstart.py ファイルで定義された Airflow DAG の例を使用します。このファイルの Python コードは、次の処理を行います。
- DAG(
composer_sample_dag)を作成します。この DAG は毎日実行されます。 - タスク(
print_dag_run_conf)を実行します。このタスクは、bash 演算子を使用して DAG 実行の構成を出力します。
ローカルマシンに quickstart.py ファイルのコピーを保存します。
DAG ファイルを環境のバケットにアップロードする
すべての Managed Airflow 環境には、Cloud Storage バケットが関連付けられています。Managed Airflow の Airflow は、このバケットの /dags フォルダにある DAG のみをスケジュール設定します。
DAG のスケジュールを設定するには、quickstart.py をローカルマシンから使用中の環境の /dags フォルダにアップロードします。
Google Cloud CLI で quickstart.py をアップロードするには、quickstart.py ファイルがあるフォルダで次のコマンドを実行します。
gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py
DAG を表示する
DAG ファイルをアップロードすると、Airflow によって次の処理が行われます。
- アップロードした DAG ファイルを解析します。DAG が Airflow で使用可能になるまでに数分かかる場合があります。
- DAG を使用可能な DAG のリストに追加します。
- DAG ファイルで指定したスケジュールに沿って DAG を実行します。
DAG UI で DAG を表示して、DAG がエラーなしで処理され、Airflow で使用できることを確認します。DAG UI は、 コンソールで Google Cloud DAG 情報を表示するための Managed Airflow インターフェースです。Managed Airflow は、ネイティブの Airflow ウェブ インターフェースである Airflow UIにもアクセスできます。
以前にアップロードした DAG ファイルを Airflow が処理し、最初の DAG 実行(後述)を完了するまで、約 5 分間待ちます。
Google Cloud CLI で次のコマンドを実行します。このコマンドは、環境内の DAG を一覧表示する
dags listAirflow CLI コマンドを実行します。gcloud composer environments run example-environment \ --location us-central1 \ dags listコマンドの出力に
composer_quickstartDAG のリストが含まれていることを確認します。出力例:
Executing the command: [ airflow dags list ]... Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb Use ctrl-c to interrupt the command dag_id | filepath | owner | paused ====================+=======================+==================+======= airflow_monitoring | airflow_monitoring.py | airflow | False composer_quickstart | dag-quickstart-af2.py | Composer Example | False
DAG 実行の詳細を表示する
DAG の 1 回の実行は DAG 実行と呼ばれます。DAG ファイルの開始日が昨日に設定されているため、Airflow はサンプル DAG の DAG 実行をすぐに実行します。このようにして、Airflow は指定された DAG のスケジュールに追いつきます。
サンプル DAG には、コンソールで echo コマンドを実行する 1 つのタスク print_dag_run_conf が含まれています。このコマンドは、DAG に関するメタ情報(DAG 実行の数値識別子)を出力します。
Google Cloud CLI で次のコマンドを実行します。このコマンドは、composer_quickstart DAG の DAG 実行を一覧表示します。
gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart
出力例:
dag_id | run_id | state | execution_date | start_date | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00
Airflow CLI には、タスクログを表示するコマンドはありません。Airflow タスクログを表示するには、 Managed Airflow DAG UI、Airflow UI、Cloud Logging などの 他の方法を使用できます。このガイドでは、特定の DAG 実行のログについて Cloud Logging にクエリを実行する方法について説明します。
Google Cloud CLI で次のコマンドを実行します。このコマンドは、Cloud Logging から composer_quickstart DAG の特定の DAG 実行に関するログを読み取ります。--format 引数は、ログメッセージのテキストのみが表示されるように出力を整えます。
gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"
以下のように置き換えます。
RUN_IDは、前に実行したtasks states-for-dag-runコマンドの出力のrun_id値に置き換えます。例:2024-02-17T15:38:38.969307+00:00
出力例:
...
Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task
...
Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746
...
Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check
クリーンアップ
このページで使用したリソースについて、アカウントに課金されないようにするには、リソースを含むプロジェクトを削除します。 Google Cloud Google Cloud
このチュートリアルで使用したリソース を削除します。
Managed Airflow 環境を削除します。
コンソールで、[Environments] ページに移動します。 Google Cloud
[
example-environment] を選択し、[削除] をクリックします。環境が削除されるまで待ちます。
環境のバケットを削除します。Managed Airflow 環境を削除しても、バケットは削除されません。
コンソールで、[ストレージ] > [ブラウザ] ページに移動します。 Google Cloud
環境のバケットを選択して、[削除] をクリックします。たとえば、このバケットの名前を
us-central1-example-environ-c1616fe8-bucketにします。
次のステップ