Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
このページでは、環境で遅延可能な演算子のサポートを有効にし、DAG で遅延可能な Google Cloud 演算子を使用する方法について説明します。
Cloud Composer での延期可能な演算子について
少なくとも 1 つのトリガー インスタンス(または復元性の高い環境では少なくとも 2 つ)がある場合は、DAG で遅延可能な演算子とトリガーを使用できます。
遅延可能な演算子の場合、Airflow はタスクの実行を次のステージに分割します。
オペレーションを開始します。この段階で、タスクは Airflow ワーカー スロットを占有しています。タスクは、ジョブを別のサービスに委任するオペレーションを実行します。
たとえば、BigQuery ジョブの実行には数秒から数時間の時間がかかります。ジョブの作成後、オペレーションが作業 ID(BigQuery ジョブ ID)を Airflow トリガーに渡します。
トリガーは、ジョブが完了するまでジョブをモニタリングします。このステージでは、ワーカー スロットは占有されません。Airflow トリガーは非同期アーキテクチャを備えており、このようなジョブを数百件処理できます。トリガーは、ジョブの完了を検出すると、最後のステージをトリガーするイベントを送信します。
最終ステージでは、Airflow ワーカーがコールバックを実行します。このコールバックでは、タスクを成功とマークするか、別のオペレーションを実行して、トリガーでモニタリングするジョブを設定できます。
トリガーはステートレスであるため、中断や再起動に対してレジリエンスがあります。このため、短いと予想される最後のステージ中に再起動が発生しない限り、Pod の再起動があっても長時間実行ジョブを復元可能です。
始める前に
遅延可能な演算子のサポートを有効にする
Airflow トリガーという環境コンポーネントは、環境内のすべての遅延タスクを非同期的にモニタリングします。このようなタスクからの遅延可能なオペレーションが完了すると、トリガーはタスクを Airflow ワーカーに渡します。
DAG で遅延可能モードを使用するには、環境内に少なくとも 1 つのトリガー インスタンス(または高復元環境では少なくとも 2 つ)が必要です。 トリガーは、環境の作成時に構成できます。または、既存の環境のトリガーの数とパフォーマンス パラメータを調整することもできます。
遅延可能モードをサポートするGoogle Cloud 演算子
一部の Airflow オペレータのみが遅延可能なモデルをサポートするように拡張されています。
次のリストは、遅延可能モードをサポートする apache-airflow-providers-google パッケージの演算子のリファレンスです。必要最小限の apache-airflow-providers-google パッケージ バージョンを含む列は、その演算子が遅延可能モードをサポートしている最も古いパッケージ バージョンを表します。
BigQuery Operators
| 演算子名 | 必要な apache-airflow-providers-google のバージョン |
|---|---|
| BigQueryCheckOperator | 8.4.0 |
| BigQueryValueCheckOperator | 8.4.0 |
| BigQueryIntervalCheckOperator | 8.4.0 |
| BigQueryGetDataOperator | 8.4.0 |
| BigQueryInsertJobOperator | 8.4.0 |
BigQuery Data Transfer Service の演算子
| 演算子名 | 必要な apache-airflow-providers-google のバージョン |
|---|---|
| BigQueryDataTransferServiceStartTransferRunsOperator | 8.9.0 |
バッチ オペレーター
| 演算子名 | 必要な apache-airflow-providers-google のバージョン |
|---|---|
| CloudBatchSubmitJobOperator | 10.7.0 |
Cloud Build 演算子
| 演算子名 | 必要な apache-airflow-providers-google のバージョン |
|---|---|
| CloudBuildCreateBuildOperator | 8.7.0 |
Cloud Composer 演算子
| 演算子名 | 必要な apache-airflow-providers-google のバージョン |
|---|---|
| CloudComposerCreateEnvironmentOperator | 6.4.0 |
| CloudComposerDeleteEnvironmentOperator | 6.4.0 |
| CloudComposerUpdateEnvironmentOperator | 6.4.0 |
| CloudComposerRunAirflowCLICommandOperator | 11.0.0 |
Cloud Run 演算子
| 演算子名 | 必要な apache-airflow-providers-google のバージョン |
|---|---|
| CloudRunExecuteJobOperator | 10.7.0 |
Cloud SQL 演算子
| 演算子名 | 必要な apache-airflow-providers-google のバージョン |
|---|---|
| CloudSQLExportInstanceOperator | 10.3.0 |
Storage Transfer Service オペレーター
| 演算子名 | 必要な apache-airflow-providers-google のバージョン |
|---|---|
| CloudDataTransferServiceS3ToGCSOperator | 14.0.0 |
| CloudDataTransferServiceGCSToGCSOperator | 14.0.0 |
Dataflow 演算子
| 演算子名 | 必要な apache-airflow-providers-google のバージョン |
|---|---|
| DataflowTemplatedJobStartOperator | 8.9.0 |
| DataflowStartFlexTemplateOperator | 8.9.0 |
| DataflowStartYamlJobOperator | 11.0.0 |
Cloud Data Fusion オペレーター
| 演算子名 | 必要な apache-airflow-providers-google のバージョン |
|---|---|
| CloudDataFusionStartPipelineOperator | 8.9.0 |
Dataplex Universal Catalog オペレーター
| 演算子名 | 必要な apache-airflow-providers-google のバージョン |
|---|---|
| DataplexRunDataQualityScanOperator | 10.8.0 |
| DataplexGetDataQualityScanResultOperator | 10.8.0 |
| DataplexRunDataProfileScanOperator | 11.0.0 |
Dataproc 演算子
| 演算子名 | 必要な apache-airflow-providers-google のバージョン |
|---|---|
| DataprocCreateClusterOperator | 8.9.0 |
| DataprocDeleteClusterOperator | 8.9.0 |
| DataprocJobBaseOperator | 8.4.0 |
| DataprocInstantiateWorkflowTemplateOperator | 9.0.0 |
| DataprocInstantiateInlineWorkflowTemplateOperator | 10.1.0 |
| DataprocSubmitJobOperator | 8.4.0 |
| DataprocUpdateClusterOperator | 8.9.0 |
| DataprocDiagnoseClusterOperator | 11.0.0 |
| DataprocCreateBatchOperator | 8.9.0 |
Google Kubernetes Engine 演算子
| 演算子名 | 必要な apache-airflow-providers-google のバージョン |
|---|---|
| GKEDeleteClusterOperator | 9.0.0 |
| GKECreateClusterOperator | 9.0.0 |
| GKEStartPodOperator | 12.0.0 |
| GKEStartJobOperator | 11.0.0 |
Pub/Sub オペレーター
| 演算子名 | 必要な apache-airflow-providers-google のバージョン |
|---|---|
| PubSubPullOperator | 14.0.0 |
AI Platform 演算子
| 演算子名 | 必要な apache-airflow-providers-google のバージョン |
|---|---|
| MLEngineStartTrainingJobOperator | 8.9.0 |
DAG で遅延可能な演算子を使用する
すべての Google Cloud 演算子の一般的な規則では、deferrable ブール型パラメータを使用して遅延可能モードを有効にします。 Google Cloud演算子にこのパラメータがない場合、遅延可能モードで実行することはできません。他の演算子では、異なる規則が適用される場合があります。たとえば、一部のコミュニティ オペレーターには、名前に Async 接尾辞が付いた別のクラスがあります。
次の DAG の例では、遅延可能モードで DataprocSubmitJobOperator 演算子を使用しています。
PYSPARK_JOB = {
"reference": { "project_id": "PROJECT_ID" },
"placement": { "cluster_name": "PYSPARK_CLUSTER_NAME" },
"pyspark_job": {
"main_python_file_uri": "gs://dataproc-examples/pyspark/hello-world/hello-world.py"
},
}
DataprocSubmitJobOperator(
task_id="dataproc-deferrable-example",
job=PYSPARK_JOB,
deferrable=True,
)
トリガーログを表示する
トリガーは、他の環境コンポーネントのログとともに使用可能なログを生成します。環境ログの表示について詳しくは、ログを表示をご覧ください。
トリガーをモニタリングする
トリガー コンポーネントのモニタリングの詳細については、Airflow の指標をご覧ください。
トリガーのモニタリングに加えて、環境のモニタリング ダッシュボードの [Uncompleted Task] 指標で遅延可能なタスクの数を確認できます。