Managed Airflow(第 2 世代)で Apache Airflow DAG を実行する(Google Cloud CLI)
Managed Airflow(第 3 世代) | Managed Airflow(第 2 世代) | Managed Airflow(以前の第 1 世代)
このクイックスタート ガイドでは、Managed Service for Apache Airflow 環境を作成し、Managed Airflow(第 2 世代)で Apache Airflow DAG を実行する方法について説明します。
Airflow を初めてご利用の場合は、Airflow のコンセプト、オブジェクト、使用状況の詳細について、Apache Airflow ドキュメントの Airflow のコンセプト チュートリアルをご覧ください。
代わりに Google Cloud コンソールを使用する場合は、Managed Service for Apache Airflow で Apache Airflow DAG を実行するをご覧ください。
Terraform を使用して環境を作成する場合は、環境を作成する(Terraform)をご覧ください。
始める前に
- Google Cloud アカウントにログインします。 Google Cloudを初めて使用する場合は、 アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
-
Google Cloud CLI をインストールします。
-
外部 ID プロバイダ(IdP)を使用している場合は、まず連携 ID を使用して gcloud CLI にログインする必要があります。
-
gcloud CLI を初期化するには、次のコマンドを実行します。
gcloud init -
Google Cloud プロジェクトを作成または選択します。
プロジェクトの選択または作成に必要なロール
- プロジェクトを選択する: プロジェクトの選択に特定の IAM ロールは必要ありません。ロールが付与されているプロジェクトであれば、どのプロジェクトでも選択できます。
-
プロジェクトを作成する: プロジェクトを作成するには、
resourcemanager.projects.create権限を含むプロジェクト作成者ロール(roles/resourcemanager.projectCreator)が必要です。詳しくは、ロールを付与する方法をご覧ください。
-
Google Cloud プロジェクトを作成します。
gcloud projects create PROJECT_ID
PROJECT_IDは、作成する Google Cloud プロジェクトの名前に置き換えます。 -
作成した Google Cloud プロジェクトを選択します。
gcloud config set project PROJECT_ID
PROJECT_IDは、 Google Cloud プロジェクトの名前に置き換えます。
-
Google Cloud CLI をインストールします。
-
外部 ID プロバイダ(IdP)を使用している場合は、まず連携 ID を使用して gcloud CLI にログインする必要があります。
-
gcloud CLI を初期化するには、次のコマンドを実行します。
gcloud init -
Google Cloud プロジェクトを作成または選択します。
プロジェクトの選択または作成に必要なロール
- プロジェクトを選択する: プロジェクトの選択に特定の IAM ロールは必要ありません。ロールが付与されているプロジェクトであれば、どのプロジェクトでも選択できます。
-
プロジェクトを作成する: プロジェクトを作成するには、
resourcemanager.projects.create権限を含むプロジェクト作成者ロール(roles/resourcemanager.projectCreator)が必要です。詳しくは、ロールを付与する方法をご覧ください。
-
Google Cloud プロジェクトを作成します。
gcloud projects create PROJECT_ID
PROJECT_IDは、作成する Google Cloud プロジェクトの名前に置き換えます。 -
作成した Google Cloud プロジェクトを選択します。
gcloud config set project PROJECT_ID
PROJECT_IDは、 Google Cloud プロジェクトの名前に置き換えます。
-
マネージド Airflow API を有効にします。
API を有効にするために必要なロール
API を有効にするには、
serviceusage.services.enable権限を含む Service Usage 管理者 IAM ロール(roles/serviceusage.serviceUsageAdmin)が必要です。詳しくは、ロールを付与する方法をご覧ください。gcloud services enable composer.googleapis.com
-
このクイックスタートを完了するために必要な権限を取得するには、プロジェクトに対する次の IAM ロールを付与するよう管理者に依頼してください。
-
IAM ロールと権限を割り当てる: プロジェクト IAM 管理者 (
roles/resourcemanager.projectIamAdmin) -
マネージド Airflow 環境のサービス アカウントを作成する: サービス アカウントの作成 (
roles/iam.serviceAccountCreator) -
Managed Airflow 環境を表示、作成、管理するには:
- 環境とストレージ オブジェクトの管理者(
roles/composer.environmentAndStorageObjectAdmin) - サービス アカウント ユーザー (
roles/iam.serviceAccountUser)
- 環境とストレージ オブジェクトの管理者(
-
ログを表示するには: ログビューア (
roles/logging.viewer)
ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。
-
IAM ロールと権限を割り当てる: プロジェクト IAM 管理者 (
環境のサービス アカウントを作成する
環境を作成するときに、サービス アカウントを指定します。このサービス アカウントは、環境のサービス アカウントと呼ばれます。環境でこのサービス アカウントを使用して、ほとんどのオペレーションを実行します。
ご使用の環境のサービス アカウントはユーザー アカウントではありません。サービス アカウントは、ユーザーではなく、アプリケーションや仮想マシン(VM)インスタンスで使用される特別なアカウントです。
環境のサービス アカウントを作成するには:
Identity and Access Management のドキュメントの説明に沿って、新しいサービス アカウントを作成します。
Identity and Access Management のドキュメントに記載されているとおりに、ロールを付与します。必要なロールは Composer ワーカー(
composer.worker)です。
環境の作成
これがプロジェクトの最初の環境の場合は、Managed Airflow サービス エージェント アカウントを新しいプリンシパルとして環境のサービス アカウントに追加して roles/composer.ServiceAgentV2Ext ロールを付与します。
デフォルトでは、環境はデフォルトの Compute Engine サービス アカウントを使用します。また、次の例では、この権限を追加する方法を示します。
# Get current project's project number
PROJECT_NUMBER=$(gcloud projects list \
--filter="$(gcloud config get-value project)" \
--format="value(PROJECT_NUMBER)" \
--limit=1)
# Add the Cloud Composer v2 API Service Agent Extension role
gcloud iam service-accounts add-iam-policy-binding \
ENVIRONMENT_SERVICE_ACCOUNT \
--member serviceAccount:service-$PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com \
--role roles/composer.ServiceAgentV2Ext
ENVIRONMENT_SERVICE_ACCOUNT は、以前に作成した環境のサービス アカウントに置き換えます。
最新の Managed Airflow(第 2 世代)バージョンで、us-central1 リージョンに example-environment という名前の新しい環境を作成します。
gcloud composer environments create example-environment \
--location us-central1 \
--image-version composer-2.17.0-airflow-2.11.1 \
--service-account ENVIRONMENT_SERVICE_ACCOUNT
ENVIRONMENT_SERVICE_ACCOUNT は、以前に作成した環境のサービス アカウントに置き換えます。
DAG ファイルを作成する
Airflow DAG は、スケジュールを設定して実行する体系的なタスクの集まりです。DAG は、標準の Python ファイルで定義されます。
このガイドでは、quickstart.py ファイルで定義された Airflow DAG の例を使用します。このファイルの Python コードは、次の処理を行います。
- DAG(
composer_sample_dag)を作成します。この DAG は毎日実行されます。 - タスク(
print_dag_run_conf)を実行します。このタスクは、bash 演算子を使用して DAG 実行の構成を出力します。
ローカルマシンに quickstart.py ファイルのコピーを保存します。
DAG ファイルを環境のバケットにアップロードする
すべての Managed Airflow 環境には、Cloud Storage バケットが関連付けられています。Managed Airflow の Airflow は、このバケットの /dags フォルダにある DAG のみをスケジュール設定します。
DAG のスケジュールを設定するには、quickstart.py をローカルマシンから使用中の環境の /dags フォルダにアップロードします。
Google Cloud CLI で quickstart.py をアップロードするには、quickstart.py ファイルがあるフォルダで次のコマンドを実行します。
gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py
DAG を表示する
DAG ファイルをアップロードすると、Airflow によって次の処理が行われます。
- アップロードした DAG ファイルを解析します。DAG が Airflow で使用可能になるまでに数分かかる場合があります。
- DAG を使用可能な DAG のリストに追加します。
- DAG ファイルで指定したスケジュールに沿って DAG を実行します。
DAG UI で DAG を表示して、DAG がエラーなしで処理され、Airflow で使用できることを確認します。DAG UI は、 Google Cloud コンソールで DAG 情報を表示するための Managed Airflow インターフェースです。Managed Airflow は、ネイティブの Airflow ウェブ インターフェースである Airflow UI にもアクセスできます。
以前にアップロードした DAG ファイルを Airflow が処理し、最初の DAG 実行(後述)を完了するまで、約 5 分間待ちます。
Google Cloud CLI で次のコマンドを実行します。このコマンドは、環境内の DAG を一覧表示する
dags listAirflow CLI コマンドを実行します。gcloud composer environments run example-environment \ --location us-central1 \ dags listコマンドの出力に
composer_quickstartDAG のリストが含まれていることを確認します。出力例:
Executing the command: [ airflow dags list ]... Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb Use ctrl-c to interrupt the command dag_id | filepath | owner | paused ====================+=======================+==================+======= airflow_monitoring | airflow_monitoring.py | airflow | False composer_quickstart | dag-quickstart-af2.py | Composer Example | False
DAG 実行の詳細を表示する
DAG の 1 回の実行は DAG 実行と呼ばれます。DAG ファイルの開始日が昨日に設定されているため、Airflow はサンプル DAG の DAG 実行をすぐに実行します。このようにして、Airflow は指定された DAG のスケジュールに追いつきます。
サンプル DAG には、コンソールで echo コマンドを実行する 1 つのタスク print_dag_run_conf が含まれています。このコマンドは、DAG に関するメタ情報(DAG 実行の数値識別子)を出力します。
Google Cloud CLI で次のコマンドを実行します。このコマンドは、composer_quickstart DAG の DAG 実行を一覧表示します。
gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart
出力例:
dag_id | run_id | state | execution_date | start_date | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00
Airflow CLI には、タスクログを表示するコマンドはありません。他の方法で Airflow タスクログを表示できます。Managed Airflow DAG UI、Airflow UI、Cloud Logging を使用します。このガイドでは、特定の DAG 実行のログについて Cloud Logging にクエリを実行する方法について説明します。
Google Cloud CLI で次のコマンドを実行します。このコマンドは、Cloud Logging から composer_quickstart DAG の特定の DAG 実行に関するログを読み取ります。--format 引数は、ログメッセージのテキストのみが表示されるように出力を整えます。
gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"
以下のように置き換えます。
RUN_IDは、前に実行したtasks states-for-dag-runコマンドの出力のrun_id値に置き換えます。例:2024-02-17T15:38:38.969307+00:00
出力例:
...
Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task
...
Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746
...
Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check
クリーンアップ
このページで使用したリソースについて、 Google Cloud アカウントに課金されないようにするには、リソースを含む Google Cloud プロジェクトを削除します。
このチュートリアルで使用したリソースを削除します。
Managed Airflow 環境を削除します。
Google Cloud コンソールで、[環境] ページに移動します。
[
example-environment] を選択し、[削除] をクリックします。環境が削除されるまで待ちます。
環境のバケットを削除します。Managed Airflow 環境を削除しても、バケットは削除されません。
Google Cloud コンソールで、[ストレージ] > [ブラウザ] ページに移動します。
環境のバケットを選択して、[削除] をクリックします。たとえば、このバケットの名前を
us-central1-example-environ-c1616fe8-bucketにします。
次のステップ