Managed Airflow(第 3 世代) | Managed Airflow(第 2 世代) | Managed Airflow(以前の第 1 世代)
このページでは、KubernetesPodOperator を使用して、Managed Service for Apache Airflow から Managed Service for Apache Airflow 環境の一部である Google Kubernetes Engine クラスタに Kubernetes Pod をデプロイする方法について説明します。
KubernetesPodOperator は、使用中の環境のクラスタで Kubernetes Pod を起動します。これに対し、Google Kubernetes Engine 演算子は、指定されたクラスタ内で Kubernetes Pod を実行します。このクラスタは、使用中の環境とは関係がない別のクラスタであっても構いません。Google Kubernetes Engine 演算子を使用してクラスタを作成、削除することもできます。
KubernetesPodOperator は、以下を必要とする場合に適しています。
- 公開 PyPI リポジトリでは使用できないカスタム Python 依存関係。
- Managed Airflow ワーカーのストック画像では使用できないバイナリ依存関係。
始める前に
Managed Airflow(第 3 世代)の KubernetesPodOperator と Managed Airflow(第 2 世代)の KubernetesPodOperator の違いの一覧を確認し、DAG が互換性があることを確認します。
Managed Airflow(第 3 世代)でカスタム名前空間を作成することはできません。Pod は、別の名前空間が指定されている場合でも、常に
composer-user-workloads名前空間で実行されます。この名前空間内の Pod は、追加の構成を行うことなくプロジェクトのリソースと VPC ネットワーク(有効な場合)にアクセスできます。Managed Airflow(第 3 世代)では、複数の追加のサイドカー コンテナを実行することはできません。サイドカー コンテナの名前が
airflow-xcom-sidecarの場合は、1 つのサイドカー コンテナを実行できます。Kubernetes Secret と ConfigMap は Kubernetes API を使用して作成することができません。代わりに、Managed Airflow は、Kubernetes Secret と ConfigMap を管理するための Google Cloud CLI コマンド、Terraform リソース、Cloud Composer API を提供します。詳細については、Kubernetes Secretと ConfigMap を使用するをご覧ください。
Managed Airflow(第 3 世代)にカスタム ワークロードをデプロイすることはできません。変更できるのは Kubernetes Secret と ConfigMap のみで、他の構成変更はできません。
リソース要件(CPU、メモリ、ストレージ)は、サポートされている値を使用して指定する必要があります。
Managed Airflow(第 2 世代)と同様に、Pod アフィニティの構成は使用できません。Pod アフィニティを使用する場合は、GKE オペレーターを使用して、別のクラスタで Pod を起動します。
Managed Airflow(第 3 世代)の KubernetesPodOperator について
このセクションでは、Managed Airflow(第 3 世代)で KubernetesPodOperator がどのように機能するかについて説明します。
リソースの利用
Managed Airflow(第 3 世代)では、環境のクラスタは自動的にスケーリングされます。KubernetesPodOperator を使用して実行する追加のワークロードは、環境とは独立してスケーリングされます。環境は、リソース需要の増加による影響を受けませんが、環境のクラスタは、リソース需要に応じてスケールアップおよびスケールダウンされます。
環境のクラスタで実行する追加のワークロードの料金は Managed Airflow(第 3 世代)の料金モデルに従い、Managed Airflow(第 3 世代)の SKU を使用します。
Managed Airflow(第 3 世代)では、コンピューティング クラスの概念が導入されている Autopilot クラスタを使用します。
Managed Airflow は、
general-purposeコンピューティング クラスのみをサポートしています。デフォルトでは、クラスが選択されていない場合、KubernetesPodOperator を使用して Pod を作成する際には
general-purposeクラスが想定されます。各クラスは特定のプロパティとリソース制限に関連付けられています。それらの詳細については、Autopilot のドキュメントをご覧ください。 たとえば、
general-purposeクラス内で実行される Pod は、最大 110 GiB のメモリを使用できます。
プロジェクトのリソースへのアクセス
マネージド Airflow(第 3 世代)では、環境のクラスタはテナント プロジェクトに配置され、構成することはできません。Pod は環境のクラスタ内の分離された名前空間で実行されます。
Managed Airflow(第 3 世代)では、別の名前空間が指定されている場合でも、Pod は常に composer-user-workloads 名前空間で実行されます。この名前空間内の Pod は、追加の構成を行うことなく、プロジェクト内の Google Cloudリソースと VPC ネットワーク(有効になっている場合)にアクセスできます。これらのリソースへのアクセスには、環境のサービス アカウントが使用されます。別のサービス アカウントを指定することはできません。
構成を最小限にしたい
KubernetesPodOperator の作成に必要になるのは、Pod の name、使用する image、task_id パラメータのみです。/home/airflow/composer_kube_config には、GKE に対する認証に必要な認証情報が含まれています。
追加構成
この例では、KubernetesPodOperator で構成できる追加のパラメータを示します。
詳細については、以下のリソースをご覧ください。
Kubernetes Secret と ConfigMap の使用方法については、Kubernetes Secret と ConfigMap を使用するをご覧ください。
KubernetesPodOperator で Jinja テンプレートを使用する方法については、Jinja テンプレートを使用するをご覧ください。
リソース要件(CPU、メモリ、ストレージ)でサポートされている値については、リソース要件をご覧ください。
KubernetesPodOperator パラメータの詳細については、Airflow ドキュメントのオペレーターのリファレンスをご覧ください。
Jinja テンプレートを使用する
Airflow は、DAG で Jinja テンプレートをサポートしています。
必要な Airflow パラメータ(task_id、name、image)を演算子で宣言する必要があります。次の例で示すように、Jinja を使用してその他のすべてのパラメータ(cmds、arguments、env_vars、config_file など)をテンプレート化できます。
この例の env_vars パラメータは、my_value という名前の Airflow 変数から設定されます。例の DAG は、Airflow の vars テンプレート変数から値を取得します。Airflow には、さまざまな種類の情報にアクセスできる変数が他にもあります。たとえば、conf テンプレート変数を使用して、Airflow 構成オプションの値にアクセスできます。詳細と Airflow で使用可能な変数のリストについては、Airflow ドキュメントのテンプレート リファレンスをご覧ください。
DAG を変更するか env_vars 変数を作成しないと、変数が存在しないため、例の ex-kube-templates タスクは失敗します。この変数は、Airflow UI または Google Cloud CLI で作成します。
Airflow UI
Airflow UI に移動します。
ツールバーで、[管理者] > [変数] を選択します。
[List Variable] ページで、[新しいレコードを追加する] をクリックします。
[Add Variable] ページで、次の情報を入力します。
- Key:
my_value - Val:
example_value
- Key:
[保存] をクリックします。
gcloud
次のコマンドを入力します。
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables set -- \
my_value example_value
以下のように置き換えます。
ENVIRONMENTを環境の名前に置き換えます。LOCATIONは、環境が配置されているリージョン。
次の例では、KubernetesPodOperator で Jinja テンプレートを使用する方法を示します。
Kubernetes Secret と ConfigMap を使用する
Kubernetes Secret は、センシティブ データを含むオブジェクトです。Kubernetes ConfigMap は、Key-Value ペアに非機密データを含むオブジェクトです。
Managed Airflow(第 3 世代)では、Google Cloud CLI、API、Terraform を使用して Secret と ConfigMap を作成し、KubernetesPodOperator からそれらにアクセスできます。
- Google Cloud CLI と API を使用する場合は、YAML 構成ファイルを指定します。
- Terraform では、Terraform 構成ファイルで Secret と ConfigMap を別個のリソースとして定義します。
YAML 構成ファイルについて
Google Cloud CLI と API を使用して Kubernetes Secret または ConfigMap を作成する場合は、YAML 形式のファイルを指定します。このファイルは、Kubernetes Secret と ConfigMap で使用される形式と同じ形式にする必要があります。Kubernetes ドキュメントには、ConfigMap と Secret の多くのコードサンプルが用意されています。使用を開始するには、Secret を使用して安全に認証情報を配布するページと ConfigMaps をご覧ください。
Kubernetes Secret の場合と同じように、Secret で値を定義する場合は base64 表現を使用します。
値をエンコードするには、次のコマンドを使用します(これは、Base64 でエンコードされた値を取得する多くの方法の一つです)。
echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64
出力:
cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==
次の 2 つの YAML ファイルの例は、このガイドの後半のサンプルで使用されます。Kubernetes Secret の YAML 構成ファイルの例:
apiVersion: v1
kind: Secret
metadata:
name: airflow-secrets
data:
sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==
ファイルを含める方法を示す別の例。前の例と同様に、まずファイルの内容(cat ./key.json | base64)をエンコードしてから、YAML ファイルでこの値を指定します。
apiVersion: v1
kind: Secret
metadata:
name: service-account
data:
service-account.json: |
ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K
ConfigMap の YAML 構成ファイルの例。ConfigMap で base64 表現を使用する必要はありません。
apiVersion: v1
kind: ConfigMap
metadata:
name: example-configmap
data:
example_key: example_value
Kubernetes Secret を管理する
gcloud
シークレットを作成
Kubernetes Secret を作成するには、次のコマンドを実行します。
gcloud beta composer environments user-workloads-secrets create \
--environment ENVIRONMENT_NAME \
--location LOCATION \
--secret-file-path SECRET_FILE
以下を置き換えます。
ENVIRONMENT_NAME: 環境の名前。LOCATION: 環境が配置されているリージョン。SECRET_FILE: Secret の構成を含むローカル YAML ファイルへのパス。
例:
gcloud beta composer environments user-workloads-secrets create \
--environment example-environment \
--location us-central1 \
--secret-file-path ./secrets/example-secret.yaml
Secret を更新する
Kubernetes Secret を更新するには、次のコマンドを実行します。Secret の名は指定された YAML ファイルから取得され、Secret のコンテンツが置き換えられます。
gcloud beta composer environments user-workloads-secrets update \
--environment ENVIRONMENT_NAME \
--location LOCATION \
--secret-file-path SECRET_FILE
以下を置き換えます。
ENVIRONMENT_NAME: 環境の名前。LOCATION: 環境が配置されているリージョン。SECRET_FILE: Secret の構成を含むローカル YAML ファイルへのパス。このファイルのmetadata>nameフィールドに Secret の名を指定します。
シークレットの一覧表示
環境の Secrets とそのフィールドの一覧を取得するには、次のコマンドを実行します。出力におけるキー値はアスタリスクに置き換えられます。
gcloud beta composer environments user-workloads-secrets list \
--environment ENVIRONMENT_NAME \
--location LOCATION
以下を置き換えます。
ENVIRONMENT_NAME: 環境の名前。LOCATION: 環境が配置されているリージョン。
Secret の詳細を取得する
Secret の詳細情報を取得するには、次のコマンドを実行します。出力におけるキー値はアスタリスクに置き換えられます。
gcloud beta composer environments user-workloads-secrets describe \
SECRET_NAME \
--environment ENVIRONMENT_NAME \
--location LOCATION
以下を置き換えます。
SECRET_NAME: Secret の名前。Secret の構成を含む YAML ファイルのmetadata>nameフィールドで定義されています。ENVIRONMENT_NAME: 環境の名前。LOCATION: 環境が配置されているリージョン。
Secret を削除する
Secret を削除するには、次のコマンドを実行します。
gcloud beta composer environments user-workloads-secrets delete \
SECRET_NAME \
--environment ENVIRONMENT_NAME \
--location LOCATION
SECRET_NAME: Secret の名前。Secret の構成を含む YAML ファイルのmetadata>nameフィールドで定義されています。ENVIRONMENT_NAME: 環境の名前。LOCATION: 環境が配置されているリージョン。
API
シークレットを作成
environments.userWorkloadsSecrets.createAPI リクエストを作成します。このリクエストで次のように操作します。
- リクエストの本文の
nameフィールドに、新しい Secret の URI を指定します。 - リクエスト本文の
dataフィールドに、Secret のキーと Base64 でエンコードされた値を指定します。
- リクエストの本文の
例:
// POST https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets
{
"name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret",
"data": {
"example": "ZXhhbXBsZV92YWx1ZSAtbgo="
}
}
Secret を更新する
environments.userWorkloadsSecrets.updateAPI リクエストを作成します。このリクエストで次のように操作します。
- リクエストの本文の
nameフィールドに、Secret の URI を指定します。 - リクエスト本文の
dataフィールドに、Secret のキーと Base64 でエンコードされた値を指定します。値が置き換えられます。
- リクエストの本文の
例:
// PUT https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret
{
"name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret",
"data": {
"example": "ZXhhbXBsZV92YWx1ZSAtbgo=",
"another-example": "YW5vdGhlcl9leGFtcGxlX3ZhbHVlIC1uCg=="
}
}
シークレットの一覧表示
environments.userWorkloadsSecrets.list API リクエストを作成します。出力に含まれるキー値はアスタリスクに置き換えられます。このリクエストでページネーションを使用できます。詳細については、リクエストのリファレンスをご覧ください。
例:
// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets
Secret の詳細を取得する
environments.userWorkloadsSecrets.get API リクエストを作成します。出力に含まれるキー値はアスタリスクに置き換えられます。
例:
// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret
Secret を削除する
environments.userWorkloadsSecrets.delete API リクエストを作成します。
例:
// DELETE https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret
Terraform
google_composer_user_workloads_secret リソースは、data ブロックで定義された キー と値を持つ Kubernetes Secret を定義します。
resource "google_composer_user_workloads_secret" "example_secret" {
provider = google-beta
environment = google_composer_environment.ENVIRONMENT_RESOURCE_NAME.name
name = "SECRET_NAME"
region = "LOCATION"
data = {
KEY_NAME: "KEY_VALUE"
}
}
ENVIRONMENT_RESOURCE_NAME: Terraform での環境の定義を含む、環境のリソースの名前。実際の環境の名前もこのリソースで指定されます。LOCATION: 環境が配置されているリージョン。SECRET_NAME: Secret の名前。KEY_NAME: この Secret の 1 つ以上のキー。KEY_VALUE: キーの Base64 エンコード値。base64encode関数を使用して値をエンコードできます(例を参照)。
Kubernetes Secret の次の 2 つの例は、このガイドの後半のサンプルで使用します。
resource "google_composer_user_workloads_secret" "example_secret" {
provider = google-beta
name = "airflow-secrets"
environment = google_composer_environment.example_environment.name
region = "us-central1"
data = {
sql_alchemy_conn: base64encode("postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db")
}
}
ファイルを含める方法を示す別の例。file 関数を使用すると、ファイルの内容を文字列として読み取り、Base64 でエンコードできます。
resource "google_composer_user_workloads_secret" "service_account_secret" {
provider = google-beta
name = "service-account"
environment = google_composer_environment.example_environment.name
region = "us-central1"
data = {
"service-account.json": base64encode(file("./key.json"))
}
}
DAG で Kubernetes Secret を使用する
この例では、Kubernetes Secret を使用する 2 つの方法を紹介します。環境変数としての方法と、Pod によってマウントされたボリュームによる方法です。
最初の Secret である airflow-secrets は、SQL_CONN という名前の Kubernetes の環境変数に設定されます(Airflow の環境変数や Managed Airflow の環境変数とは異なります)。
2 番目の Secret である service-account は、サービス アカウント トークンを含むファイルである service-account.json を /var/secrets/google にマウントします。
Secret オブジェクトは次のようになります。
最初の Kubernetes Secret の名前は、secret_env 変数で定義されます。この Secret の名前は airflow-secrets です。deploy_type パラメータでは、環境変数として公開する必要があることを指定します。環境変数の名前は SQL_CONN で、deploy_target パラメータで指定されます。最後に、SQL_CONN 環境変数の値が sql_alchemy_conn キーの値に設定されます。
2 番目の Kubernetes Secret の名前は、secret_volume 変数で定義されています。この Secret の名前は service-account です。deploy_type で指定されるように、ボリュームとして公開されます。マウントするファイルのパス deploy_target は /var/secrets/google です。最後に、deploy_target に保管される Secret の key は service-account.json です。
演算子の構成は、次のようになります。
Kubernetes ConfigMap を管理する
gcloud
ConfigMap の作成
ConfigMap を作成するには、次のコマンドを実行します。
gcloud beta composer environments user-workloads-config-maps create \
--environment ENVIRONMENT_NAME \
--location LOCATION \
--config-map-file-path CONFIG_MAP_FILE
以下を置き換えます。
ENVIRONMENT_NAME: 環境の名前。LOCATION: 環境が配置されているリージョン。CONFIG_MAP_FILE: ConfigMap の構成を含むローカル YAML ファイルへのパス。
例:
gcloud beta composer environments user-workloads-config-maps create \
--environment example-environment \
--location us-central1 \
--config-map-file-path ./configs/example-configmap.yaml
ConfigMap を更新する
ConfigMap を更新するには、次のコマンドを実行します。ConfigMap の名前は指定された YAML ファイルから取得され、ConfigMap のコンテンツが置き換えられます。
gcloud beta composer environments user-workloads-config-maps update \
--environment ENVIRONMENT_NAME \
--location LOCATION \
--config-map-file-path CONFIG_MAP_FILE
以下を置き換えます。
ENVIRONMENT_NAME: 環境の名前。LOCATION: 環境が配置されているリージョン。CONFIG_MAP_FILE: ConfigMap の構成を含むローカル YAML ファイルへのパス。このファイルのmetadata>nameフィールドに ConfigMap の名を指定します。
ConfigMap の一覧表示
環境の ConfigMap とそのフィールドの一覧を取得するには、次のコマンドを実行します。出力に含まれるキー値はそのまま表示されます。
gcloud beta composer environments user-workloads-config-maps list \
--environment ENVIRONMENT_NAME \
--location LOCATION
以下を置き換えます。
ENVIRONMENT_NAME: 環境の名前。LOCATION: 環境が配置されているリージョン。
ConfigMap の詳細を取得する
ConfigMap の詳細情報を取得するには、次のコマンドを実行します。出力に含まれるキー値はそのまま表示されます。
gcloud beta composer environments user-workloads-config-maps describe \
CONFIG_MAP_NAME \
--environment ENVIRONMENT_NAME \
--location LOCATION
以下を置き換えます。
CONFIG_MAP_NAME: ConfigMap の名前。ConfigMap の構成を含む YAML ファイルのmetadata>nameフィールドで定義されています。ENVIRONMENT_NAME: 環境の名前。LOCATION: 環境が配置されているリージョン。
ConfigMap を削除する
ConfigMap を削除するには、次のコマンドを実行します。
gcloud beta composer environments user-workloads-config-maps delete \
CONFIG_MAP_NAME \
--environment ENVIRONMENT_NAME \
--location LOCATION
CONFIG_MAP_NAME: ConfigMap の名前。ConfigMap の構成を含む YAML ファイルのmetadata>nameフィールドで定義されています。ENVIRONMENT_NAME: 環境の名前。LOCATION: 環境が配置されているリージョン。
API
ConfigMap の作成
environments.userWorkloadsConfigMaps.createAPI リクエストを作成します。このリクエストで次のように操作します。
- リクエスト本文の
nameフィールドに、新しい ConfigMap の URI を指定します。 - リクエスト本文の
dataフィールドに、ConfigMap のキーと値を指定します。
- リクエスト本文の
例:
// POST https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps
{
"name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap",
"data": {
"example_key": "example_value"
}
}
ConfigMap を更新する
environments.userWorkloadsConfigMaps.updateAPI リクエストを作成します。このリクエストで次のように操作します。
- リクエスト本文の
nameフィールドに、ConfigMap の URI を指定します。 - リクエスト本文の
dataフィールドに、ConfigMap のキーと値を指定します。 値が置き換えられます。
- リクエスト本文の
例:
// PUT https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap
{
"name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap",
"data": {
"example_key": "example_value",
"another_key": "another_value"
}
}
ConfigMap の一覧表示
environments.userWorkloadsConfigMaps.list API リクエストを作成します。出力に含まれるキー値はそのまま表示されます。このリクエストでページネーションを使用できます。詳細については、リクエストのリファレンスをご覧ください。
例:
// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps
ConfigMap の詳細を取得する
environments.userWorkloadsConfigMaps.get API リクエストを作成します。出力に含まれるキー値はそのまま表示されます。
例:
// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap
ConfigMap を削除する
environments.userWorkloadsConfigMaps.delete API リクエストを作成します。
例:
// DELETE https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap
Terraform
google_composer_user_workloads_config_map リソースでは ConfigMap を定義します。キーと値は data ブロックで定義します。
resource "google_composer_user_workloads_config_map" "example_config_map" {
provider = google-beta
environment = google_composer_environment.ENVIRONMENT_RESOURCE_NAME.name
name = "CONFIG_MAP_NAME"
region = "LOCATION"
data = {
KEY_NAME: "KEY_VALUE"
}
}
ENVIRONMENT_RESOURCE_NAME: Terraform での環境の定義を含む、環境のリソースの名前。実際の環境の名前もこのリソースで指定されます。LOCATION: 環境が配置されているリージョン。CONFIG_MAP_NAME: ConfigMap の名前。KEY_NAME: この ConfigMap の 1 つ以上のキー。KEY_VALUE: キーの値。
例:
resource "google_composer_user_workloads_config_map" "example_config_map" {
provider = google-beta
name = "example-config-map"
environment = google_composer_environment.example_environment.name
region = "us-central1"
data = {
"example_key": "example_value"
}
}
DAG で ConfigMap を使用する
この例では、DAG で ConfigMap を使用する方法を示します。
次の例では、ConfigMap が configmaps パラメータで渡されます。この ConfigMap のすべてのキーは、環境変数として使用できます。
import datetime
from airflow import models
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
with models.DAG(
dag_id="composer_kubernetes_pod_configmap",
schedule=None,
start_date=datetime.datetime(2024, 1, 1),
) as dag:
KubernetesPodOperator(
task_id='kpo_configmap_env_vars',
image='busybox:1.28',
cmds=['sh'],
arguments=[
'-c',
'echo "Value: $example_key"',
],
configmaps=["example-configmap"],
config_file="/home/airflow/composer_kube_config",
)
次の例では、ConfigMap をボリュームとしてマウントする方法を示します。
import datetime
from airflow import models
from kubernetes.client import models as k8s
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
volume_mount = k8s.V1VolumeMount(name='confmap-example',
mount_path='/config',
sub_path=None,
read_only=False)
volume = k8s.V1Volume(name='confmap-example',
config_map=k8s.V1ConfigMapVolumeSource(name='example-configmap'))
with models.DAG(
dag_id="composer_kubernetes_pod_configmap",
schedule=None,
start_date=datetime.datetime(2024, 1, 1),
) as dag:
KubernetesPodOperator(
task_id='kpo_configmap_volume_mount',
image='busybox:1.28',
cmds=['sh'],
arguments=[
'-c',
'ls /config'
],
volumes=[volume],
volume_mounts=[volume_mount],
configmaps=["example-configmap"],
config_file="/home/airflow/composer_kube_config",
)
CNCF Kubernetes プロバイダに関する情報
KubernetesPodOperator は apache-airflow-providers-cncf-kubernetes プロバイダに実装されています。
CNCF Kubernetes プロバイダ向けの詳細なリリースノートについては、CNCF Kubernetes プロバイダのウェブサイトをご覧ください。
リソース要件
Managed Airflow(第 3 世代)は、リソース要件に対して次の値をサポートしています。リソース要件の使用例については、追加の構成をご覧ください。
| リソース | 最小 | 最大 | ステップ |
|---|---|---|---|
| CPU | 0.25 | 32 | 段階値: 0.25、0.5、1、2、4、6、8、10、...、32。リクエストされた値は、サポートされている最も近い段階値に切り上げられます(例: 5 は 6 に切り上げられます)。 |
| メモリ | 2G(GB) | 128G(GB) | 段階値: 2、3、4、5、...、128。リクエストされた値は、サポートされている最も近い段階値に切り上げられます(3.5G は 4G に切り上げられます)。 |
| ストレージ | - | 100G(GB) | 任意の値。100 GB を超える容量がリクエストされた場合は、100 GB のみが提供されます。 |
Kubernetes のリソース ユニットの詳細については、Kubernetes のリソース ユニットをご覧ください。
トラブルシューティング
このセクションでは、KubernetesPodOperator の一般的な問題のトラブルシューティングに関するアドバイスを提供します。
ログを表示
問題のトラブルシューティング時には、次の順序でログを確認できます。
Airflow タスクログ:
Google Cloud コンソールで、[環境] ページに移動します。
環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。
[DAG] タブに移動します。
DAG の名前をクリックし、DAG 実行をクリックして詳細とログを表示します。
Airflow スケジューラ ログ:
[環境の詳細] ページに移動します。
[ログ] タブに移動します。
Airflow スケジューラ ログを調べます。
ユーザー ワークロード ログ:
[環境の詳細] ページに移動します。
[Monitoring] タブに移動します。
[ユーザー ワークロード] を選択します。
実行されたワークロードのリストを確認します。各ワークロードのログとリソース使用率情報を確認できます。
ゼロ以外の戻りコード
KubernetesPodOperator(および GKEStartPodOperator)を使用する場合、コンテナのエントリ ポイントの戻りコードから、タスクが成功したかどうかを判断できます。ゼロ以外の戻りコードは失敗を示します。
一般的なパターンは、コンテナのエントリ ポイントとしてシェル スクリプトを実行して、コンテナ内の複数のオペレーションをグループ化することです。
このようなスクリプトを記述する場合は、スクリプト内のコマンドが失敗した場合にスクリプトを終了し Airflow タスク インスタンスに失敗を伝播させるために、スクリプトの先頭に set -e コマンドを含めることをおすすめします。
Pod のタイムアウト
KubernetesPodOperator のデフォルトのタイムアウトは 120 秒であるため、サイズの大きなイメージをダウンロードする前にタイムアウトが発生する場合があります。タイムアウトの時間を長くするには、KubernetesPodOperator を作成する際に startup_timeout_seconds パラメータを変更します。
Pod がタイムアウトすると、Airflow UI にタスク固有のログが表示されます。次に例を示します。
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
手元のタスクを実行するために Managed Airflow サービス アカウントに必要な IAM 権限がない場合でも、Pod のタイムアウトは発生する場合があります。この状態を確認するには、GKE ダッシュボードで特定のワークロードのログを調べて Pod レベルのエラーを確認するか、Cloud Logging を使用します。
大量のタスクが実行されると KubernetesPodOperator タスクが失敗する
環境で大量の KubernetesPodOperator タスクまたは KubernetesExecutor タスクが同時に実行されると、既存のタスクの一部が完了するまで、Managed Airflow(第 3 世代)は新しいタスクを受け入れません。
この問題のトラブルシューティングの詳細については、KubernetesExecutor タスクのトラブルシューティングをご覧ください。