KubernetesPodOperator を使用する

Managed Airflow(第 3 世代) | Managed Airflow(第 2 世代) | Managed Airflow(以前の第 1 世代)

このページでは、KubernetesPodOperator を使用して、Managed Service for Apache Airflow から Managed Service for Apache Airflow 環境の一部である Google Kubernetes Engine クラスタに Kubernetes Pod をデプロイする方法について説明します。

KubernetesPodOperator は、使用中の環境のクラスタで Kubernetes Pod を起動します。これに対し、Google Kubernetes Engine 演算子は、指定されたクラスタ内で Kubernetes Pod を実行します。このクラスタは、使用中の環境とは関係がない別のクラスタであっても構いません。Google Kubernetes Engine 演算子を使用してクラスタを作成、削除することもできます。

KubernetesPodOperator は、以下を必要とする場合に適しています。

  • 公開 PyPI リポジトリでは使用できないカスタム Python 依存関係。
  • Managed Airflow ワーカーのストック画像では使用できないバイナリ依存関係。

始める前に

Managed Airflow(第 3 世代)の KubernetesPodOperator と Managed Airflow(第 2 世代)の KubernetesPodOperator の違いの一覧を確認し、DAG が互換性があることを確認します。

  • Managed Airflow(第 3 世代)でカスタム名前空間を作成することはできません。Pod は、別の名前空間が指定されている場合でも、常に composer-user-workloads 名前空間で実行されます。この名前空間内の Pod は、追加の構成を行うことなくプロジェクトのリソースと VPC ネットワーク(有効な場合)にアクセスできます。

  • Managed Airflow(第 3 世代)では、複数の追加のサイドカー コンテナを実行することはできません。サイドカー コンテナの名前が airflow-xcom-sidecar の場合は、1 つのサイドカー コンテナを実行できます。

  • Kubernetes Secret と ConfigMap は Kubernetes API を使用して作成することができません。代わりに、Managed Airflow は、Kubernetes Secret と ConfigMap を管理するための Google Cloud CLI コマンド、Terraform リソース、Cloud Composer API を提供します。詳細については、Kubernetes Secretと ConfigMap を使用するをご覧ください。

  • Managed Airflow(第 3 世代)にカスタム ワークロードをデプロイすることはできません。変更できるのは Kubernetes Secret と ConfigMap のみで、他の構成変更はできません。

  • リソース要件(CPU、メモリ、ストレージ)は、サポートされている値を使用して指定する必要があります。

  • Managed Airflow(第 2 世代)と同様に、Pod アフィニティの構成は使用できません。Pod アフィニティを使用する場合は、GKE オペレーターを使用して、別のクラスタで Pod を起動します。

Managed Airflow(第 3 世代)の KubernetesPodOperator について

このセクションでは、Managed Airflow(第 3 世代)で KubernetesPodOperator がどのように機能するかについて説明します。

リソースの利用

Managed Airflow(第 3 世代)では、環境のクラスタは自動的にスケーリングされます。KubernetesPodOperator を使用して実行する追加のワークロードは、環境とは独立してスケーリングされます。環境は、リソース需要の増加による影響を受けませんが、環境のクラスタは、リソース需要に応じてスケールアップおよびスケールダウンされます。

環境のクラスタで実行する追加のワークロードの料金は Managed Airflow(第 3 世代)の料金モデルに従い、Managed Airflow(第 3 世代)の SKU を使用します。

Managed Airflow(第 3 世代)では、コンピューティング クラスの概念が導入されている Autopilot クラスタを使用します。

  • Managed Airflow は、general-purpose コンピューティング クラスのみをサポートしています。

  • デフォルトでは、クラスが選択されていない場合、KubernetesPodOperator を使用して Pod を作成する際には general-purpose クラスが想定されます。

  • 各クラスは特定のプロパティとリソース制限に関連付けられています。それらの詳細については、Autopilot のドキュメントをご覧ください。 たとえば、general-purpose クラス内で実行される Pod は、最大 110 GiB のメモリを使用できます。

プロジェクトのリソースへのアクセス

マネージド Airflow(第 3 世代)では、環境のクラスタはテナント プロジェクトに配置され、構成することはできません。Pod は環境のクラスタ内の分離された名前空間で実行されます。

Managed Airflow(第 3 世代)では、別の名前空間が指定されている場合でも、Pod は常に composer-user-workloads 名前空間で実行されます。この名前空間内の Pod は、追加の構成を行うことなく、プロジェクト内の Google Cloudリソースと VPC ネットワーク(有効になっている場合)にアクセスできます。これらのリソースへのアクセスには、環境のサービス アカウントが使用されます。別のサービス アカウントを指定することはできません。

構成を最小限にしたい

KubernetesPodOperator の作成に必要になるのは、Pod の name、使用する imagetask_id パラメータのみです。/home/airflow/composer_kube_config には、GKE に対する認証に必要な認証情報が含まれています。

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes. In Composer 2 environments
    # after December 2022, the default namespace is
    # `composer-user-workloads`. Always use the
    # `composer-user-workloads` namespace with Composer 3.
    namespace="composer-user-workloads",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="marketplace.gcr.io/google/ubuntu2204",
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

追加構成

この例では、KubernetesPodOperator で構成できる追加のパラメータを示します。

詳細については、以下のリソースをご覧ください。

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="composer-user-workloads",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The Docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 600.
    startup_timeout_seconds=600,
    # The environment variables to be initialized in the container.
    # The env_vars parameter is templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
    ),
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

Jinja テンプレートを使用する

Airflow は、DAG で Jinja テンプレートをサポートしています。

必要な Airflow パラメータ(task_idnameimage)を演算子で宣言する必要があります。次の例で示すように、Jinja を使用してその他のすべてのパラメータ(cmdsargumentsenv_varsconfig_file など)をテンプレート化できます。

この例の env_vars パラメータは、my_value という名前の Airflow 変数から設定されます。例の DAG は、Airflow の vars テンプレート変数から値を取得します。Airflow には、さまざまな種類の情報にアクセスできる変数が他にもあります。たとえば、conf テンプレート変数を使用して、Airflow 構成オプションの値にアクセスできます。詳細と Airflow で使用可能な変数のリストについては、Airflow ドキュメントのテンプレート リファレンスをご覧ください。

DAG を変更するか env_vars 変数を作成しないと、変数が存在しないため、例の ex-kube-templates タスクは失敗します。この変数は、Airflow UI または Google Cloud CLI で作成します。

Airflow UI

  1. Airflow UI に移動します。

  2. ツールバーで、[管理者] > [変数] を選択します。

  3. [List Variable] ページで、[新しいレコードを追加する] をクリックします。

  4. [Add Variable] ページで、次の情報を入力します。

    • Key:my_value
    • Val: example_value
  5. [保存] をクリックします。

gcloud

次のコマンドを入力します。

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

以下のように置き換えます。

  • ENVIRONMENT を環境の名前に置き換えます。
  • LOCATION は、環境が配置されているリージョン。

次の例では、KubernetesPodOperator で Jinja テンプレートを使用する方法を示します。

kubernetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="composer-user-workloads",
    image="bash",
    # All parameters below can be templated with Jinja. For more information
    # and the list of variables available in Airflow, see
    # the Airflow templates reference:
    # https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in Jinja is the execution date as YYYY-MM-DD, this Docker image
    # will echo the execution date. Arguments to the entrypoint. The Docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI. The env_vars parameter
    # is templated.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Specifies path to Kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Kubernetes Secret と ConfigMap を使用する

Kubernetes Secret は、センシティブ データを含むオブジェクトです。Kubernetes ConfigMap は、Key-Value ペアに非機密データを含むオブジェクトです。

Managed Airflow(第 3 世代)では、Google Cloud CLI、API、Terraform を使用して Secret と ConfigMap を作成し、KubernetesPodOperator からそれらにアクセスできます。

  • Google Cloud CLI と API を使用する場合は、YAML 構成ファイルを指定します。
  • Terraform では、Terraform 構成ファイルで Secret と ConfigMap を別個のリソースとして定義します。

YAML 構成ファイルについて

Google Cloud CLI と API を使用して Kubernetes Secret または ConfigMap を作成する場合は、YAML 形式のファイルを指定します。このファイルは、Kubernetes Secret と ConfigMap で使用される形式と同じ形式にする必要があります。Kubernetes ドキュメントには、ConfigMap と Secret の多くのコードサンプルが用意されています。使用を開始するには、Secret を使用して安全に認証情報を配布するページと ConfigMaps をご覧ください。

Kubernetes Secret の場合と同じように、Secret で値を定義する場合は base64 表現を使用します。

値をエンコードするには、次のコマンドを使用します(これは、Base64 でエンコードされた値を取得する多くの方法の一つです)。

echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64

出力:

cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

次の 2 つの YAML ファイルの例は、このガイドの後半のサンプルで使用されます。Kubernetes Secret の YAML 構成ファイルの例:

apiVersion: v1
kind: Secret
metadata:
  name: airflow-secrets
data:
  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

ファイルを含める方法を示す別の例。前の例と同様に、まずファイルの内容(cat ./key.json | base64)をエンコードしてから、YAML ファイルでこの値を指定します。

apiVersion: v1
kind: Secret
metadata:
  name: service-account
data:
  service-account.json: |
    ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K

ConfigMap の YAML 構成ファイルの例。ConfigMap で base64 表現を使用する必要はありません。

apiVersion: v1
kind: ConfigMap
metadata:
  name: example-configmap
data:
  example_key: example_value

Kubernetes Secret を管理する

gcloud

シークレットを作成

Kubernetes Secret を作成するには、次のコマンドを実行します。

gcloud beta composer environments user-workloads-secrets create \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --secret-file-path SECRET_FILE

以下を置き換えます。

  • ENVIRONMENT_NAME: 環境の名前。
  • LOCATION: 環境が配置されているリージョン。
  • SECRET_FILE: Secret の構成を含むローカル YAML ファイルへのパス。

例:

gcloud beta composer environments user-workloads-secrets create \
  --environment example-environment \
  --location us-central1 \
  --secret-file-path ./secrets/example-secret.yaml

Secret を更新する

Kubernetes Secret を更新するには、次のコマンドを実行します。Secret の名は指定された YAML ファイルから取得され、Secret のコンテンツが置き換えられます。

gcloud beta composer environments user-workloads-secrets update \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --secret-file-path SECRET_FILE

以下を置き換えます。

  • ENVIRONMENT_NAME: 環境の名前。
  • LOCATION: 環境が配置されているリージョン。
  • SECRET_FILE: Secret の構成を含むローカル YAML ファイルへのパス。このファイルの metadata > name フィールドに Secret の名を指定します。

シークレットの一覧表示

環境の Secrets とそのフィールドの一覧を取得するには、次のコマンドを実行します。出力におけるキー値はアスタリスクに置き換えられます。

gcloud beta composer environments user-workloads-secrets list \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

以下を置き換えます。

  • ENVIRONMENT_NAME: 環境の名前。
  • LOCATION: 環境が配置されているリージョン。

Secret の詳細を取得する

Secret の詳細情報を取得するには、次のコマンドを実行します。出力におけるキー値はアスタリスクに置き換えられます。

gcloud beta composer environments user-workloads-secrets describe \
  SECRET_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

以下を置き換えます。

  • SECRET_NAME: Secret の名前。Secret の構成を含む YAML ファイルの metadata > name フィールドで定義されています。
  • ENVIRONMENT_NAME: 環境の名前。
  • LOCATION: 環境が配置されているリージョン。

Secret を削除する

Secret を削除するには、次のコマンドを実行します。

gcloud beta composer environments user-workloads-secrets delete \
  SECRET_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION
  • SECRET_NAME: Secret の名前。Secret の構成を含む YAML ファイルの metadata > name フィールドで定義されています。
  • ENVIRONMENT_NAME: 環境の名前。
  • LOCATION: 環境が配置されているリージョン。

API

シークレットを作成

  1. environments.userWorkloadsSecrets.create API リクエストを作成します。

  2. このリクエストで次のように操作します。

    1. リクエストの本文の name フィールドに、新しい Secret の URI を指定します。
    2. リクエスト本文の data フィールドに、Secret のキーと Base64 でエンコードされた値を指定します。

例:

// POST https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret",
  "data": {
    "example": "ZXhhbXBsZV92YWx1ZSAtbgo="
  }
}

Secret を更新する

  1. environments.userWorkloadsSecrets.update API リクエストを作成します。

  2. このリクエストで次のように操作します。

    1. リクエストの本文の name フィールドに、Secret の URI を指定します。
    2. リクエスト本文の data フィールドに、Secret のキーと Base64 でエンコードされた値を指定します。値が置き換えられます。

例:

// PUT https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret",
  "data": {
    "example": "ZXhhbXBsZV92YWx1ZSAtbgo=",
    "another-example": "YW5vdGhlcl9leGFtcGxlX3ZhbHVlIC1uCg=="
  }
}

シークレットの一覧表示

environments.userWorkloadsSecrets.list API リクエストを作成します。出力に含まれるキー値はアスタリスクに置き換えられます。このリクエストでページネーションを使用できます。詳細については、リクエストのリファレンスをご覧ください。

例:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets

Secret の詳細を取得する

environments.userWorkloadsSecrets.get API リクエストを作成します。出力に含まれるキー値はアスタリスクに置き換えられます。

例:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

Secret を削除する

environments.userWorkloadsSecrets.delete API リクエストを作成します。

例:

// DELETE https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

Terraform

google_composer_user_workloads_secret リソースは、data ブロックで定義された キー と値を持つ Kubernetes Secret を定義します。

resource "google_composer_user_workloads_secret" "example_secret" {
  provider = google-beta
  environment = google_composer_environment.ENVIRONMENT_RESOURCE_NAME.name
  name = "SECRET_NAME"
  region = "LOCATION"

  data = {
    KEY_NAME: "KEY_VALUE"
  }
}
  • ENVIRONMENT_RESOURCE_NAME: Terraform での環境の定義を含む、環境のリソースの名前。実際の環境の名前もこのリソースで指定されます。
  • LOCATION: 環境が配置されているリージョン。
  • SECRET_NAME: Secret の名前。
  • KEY_NAME: この Secret の 1 つ以上のキー。
  • KEY_VALUE: キーの Base64 エンコード値。base64encode 関数を使用して値をエンコードできます(例を参照)。

Kubernetes Secret の次の 2 つの例は、このガイドの後半のサンプルで使用します。

resource "google_composer_user_workloads_secret" "example_secret" {
  provider = google-beta

  name = "airflow-secrets"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    sql_alchemy_conn: base64encode("postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db")
  }
}

ファイルを含める方法を示す別の例。file 関数を使用すると、ファイルの内容を文字列として読み取り、Base64 でエンコードできます。

resource "google_composer_user_workloads_secret" "service_account_secret" {
  provider = google-beta

  name = "service-account"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    "service-account.json": base64encode(file("./key.json"))
  }
}

DAG で Kubernetes Secret を使用する

この例では、Kubernetes Secret を使用する 2 つの方法を紹介します。環境変数としての方法と、Pod によってマウントされたボリュームによる方法です。

最初の Secret である airflow-secrets は、SQL_CONN という名前の Kubernetes の環境変数に設定されます(Airflow の環境変数や Managed Airflow の環境変数とは異なります)。

2 番目の Secret である service-account は、サービス アカウント トークンを含むファイルである service-account.json/var/secrets/google にマウントします。

Secret オブジェクトは次のようになります。

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

最初の Kubernetes Secret の名前は、secret_env 変数で定義されます。この Secret の名前は airflow-secrets です。deploy_type パラメータでは、環境変数として公開する必要があることを指定します。環境変数の名前は SQL_CONN で、deploy_target パラメータで指定されます。最後に、SQL_CONN 環境変数の値が sql_alchemy_conn キーの値に設定されます。

2 番目の Kubernetes Secret の名前は、secret_volume 変数で定義されています。この Secret の名前は service-account です。deploy_type で指定されるように、ボリュームとして公開されます。マウントするファイルのパス deploy_target/var/secrets/google です。最後に、deploy_target に保管される Secret の keyservice-account.json です。

演算子の構成は、次のようになります。

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="composer-user-workloads",
    image="marketplace.gcr.io/google/ubuntu2204",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # env_vars allows you to specify environment variables for your
    # container to use. The env_vars parameter is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
    },
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Kubernetes ConfigMap を管理する

gcloud

ConfigMap の作成

ConfigMap を作成するには、次のコマンドを実行します。

gcloud beta composer environments user-workloads-config-maps create \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --config-map-file-path CONFIG_MAP_FILE

以下を置き換えます。

  • ENVIRONMENT_NAME: 環境の名前。
  • LOCATION: 環境が配置されているリージョン。
  • CONFIG_MAP_FILE: ConfigMap の構成を含むローカル YAML ファイルへのパス。

例:

gcloud beta composer environments user-workloads-config-maps create \
  --environment example-environment \
  --location us-central1 \
  --config-map-file-path ./configs/example-configmap.yaml

ConfigMap を更新する

ConfigMap を更新するには、次のコマンドを実行します。ConfigMap の名前は指定された YAML ファイルから取得され、ConfigMap のコンテンツが置き換えられます。

gcloud beta composer environments user-workloads-config-maps update \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --config-map-file-path CONFIG_MAP_FILE

以下を置き換えます。

  • ENVIRONMENT_NAME: 環境の名前。
  • LOCATION: 環境が配置されているリージョン。
  • CONFIG_MAP_FILE: ConfigMap の構成を含むローカル YAML ファイルへのパス。このファイルの metadata > name フィールドに ConfigMap の名を指定します。

ConfigMap の一覧表示

環境の ConfigMap とそのフィールドの一覧を取得するには、次のコマンドを実行します。出力に含まれるキー値はそのまま表示されます。

gcloud beta composer environments user-workloads-config-maps list \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

以下を置き換えます。

  • ENVIRONMENT_NAME: 環境の名前。
  • LOCATION: 環境が配置されているリージョン。

ConfigMap の詳細を取得する

ConfigMap の詳細情報を取得するには、次のコマンドを実行します。出力に含まれるキー値はそのまま表示されます。

gcloud beta composer environments user-workloads-config-maps describe \
  CONFIG_MAP_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

以下を置き換えます。

  • CONFIG_MAP_NAME: ConfigMap の名前。ConfigMap の構成を含む YAML ファイルの metadata > name フィールドで定義されています。
  • ENVIRONMENT_NAME: 環境の名前。
  • LOCATION: 環境が配置されているリージョン。

ConfigMap を削除する

ConfigMap を削除するには、次のコマンドを実行します。

gcloud beta composer environments user-workloads-config-maps delete \
  CONFIG_MAP_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION
  • CONFIG_MAP_NAME: ConfigMap の名前。ConfigMap の構成を含む YAML ファイルの metadata > name フィールドで定義されています。
  • ENVIRONMENT_NAME: 環境の名前。
  • LOCATION: 環境が配置されているリージョン。

API

ConfigMap の作成

  1. environments.userWorkloadsConfigMaps.create API リクエストを作成します。

  2. このリクエストで次のように操作します。

    1. リクエスト本文の name フィールドに、新しい ConfigMap の URI を指定します。
    2. リクエスト本文の data フィールドに、ConfigMap のキーと値を指定します。

例:

// POST https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap",
  "data": {
    "example_key": "example_value"
  }
}

ConfigMap を更新する

  1. environments.userWorkloadsConfigMaps.update API リクエストを作成します。

  2. このリクエストで次のように操作します。

    1. リクエスト本文の name フィールドに、ConfigMap の URI を指定します。
    2. リクエスト本文の data フィールドに、ConfigMap のキーと値を指定します。 値が置き換えられます。

例:

// PUT https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap",
  "data": {
    "example_key": "example_value",
    "another_key": "another_value"
  }
}

ConfigMap の一覧表示

environments.userWorkloadsConfigMaps.list API リクエストを作成します。出力に含まれるキー値はそのまま表示されます。このリクエストでページネーションを使用できます。詳細については、リクエストのリファレンスをご覧ください。

例:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps

ConfigMap の詳細を取得する

environments.userWorkloadsConfigMaps.get API リクエストを作成します。出力に含まれるキー値はそのまま表示されます。

例:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

ConfigMap を削除する

environments.userWorkloadsConfigMaps.delete API リクエストを作成します。

例:

// DELETE https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

Terraform

google_composer_user_workloads_config_map リソースでは ConfigMap を定義します。キーと値は data ブロックで定義します。

resource "google_composer_user_workloads_config_map" "example_config_map" {
  provider = google-beta
  environment = google_composer_environment.ENVIRONMENT_RESOURCE_NAME.name
  name = "CONFIG_MAP_NAME"
  region = "LOCATION"

  data = {
    KEY_NAME: "KEY_VALUE"
  }
}
  • ENVIRONMENT_RESOURCE_NAME: Terraform での環境の定義を含む、環境のリソースの名前。実際の環境の名前もこのリソースで指定されます。
  • LOCATION: 環境が配置されているリージョン。
  • CONFIG_MAP_NAME: ConfigMap の名前。
  • KEY_NAME: この ConfigMap の 1 つ以上のキー。
  • KEY_VALUE: キーの値。

例:

resource "google_composer_user_workloads_config_map" "example_config_map" {
  provider = google-beta

  name = "example-config-map"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    "example_key": "example_value"
  }
}

DAG で ConfigMap を使用する

この例では、DAG で ConfigMap を使用する方法を示します。

次の例では、ConfigMap が configmaps パラメータで渡されます。この ConfigMap のすべてのキーは、環境変数として使用できます。

import datetime

from airflow import models
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

with models.DAG(
    dag_id="composer_kubernetes_pod_configmap",
    schedule=None,
    start_date=datetime.datetime(2024, 1, 1),
) as dag:

  KubernetesPodOperator(
    task_id='kpo_configmap_env_vars',
    image='busybox:1.28',
    cmds=['sh'],
    arguments=[
        '-c',
        'echo "Value: $example_key"',
    ],
    configmaps=["example-configmap"],
    config_file="/home/airflow/composer_kube_config",
  )

次の例では、ConfigMap をボリュームとしてマウントする方法を示します。

import datetime

from airflow import models
from kubernetes.client import models as k8s
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

volume_mount = k8s.V1VolumeMount(name='confmap-example',
  mount_path='/config',
  sub_path=None,
  read_only=False)

volume = k8s.V1Volume(name='confmap-example',
  config_map=k8s.V1ConfigMapVolumeSource(name='example-configmap'))

with models.DAG(
    dag_id="composer_kubernetes_pod_configmap",
    schedule=None,
    start_date=datetime.datetime(2024, 1, 1),
) as dag:

  KubernetesPodOperator(
    task_id='kpo_configmap_volume_mount',
    image='busybox:1.28',
    cmds=['sh'],
    arguments=[
        '-c',
        'ls /config'
    ],
    volumes=[volume],
    volume_mounts=[volume_mount],
    configmaps=["example-configmap"],
    config_file="/home/airflow/composer_kube_config",
  )

CNCF Kubernetes プロバイダに関する情報

KubernetesPodOperator は apache-airflow-providers-cncf-kubernetes プロバイダに実装されています。

CNCF Kubernetes プロバイダ向けの詳細なリリースノートについては、CNCF Kubernetes プロバイダのウェブサイトをご覧ください。

リソース要件

Managed Airflow(第 3 世代)は、リソース要件に対して次の値をサポートしています。リソース要件の使用例については、追加の構成をご覧ください。

リソース 最小 最大 ステップ
CPU 0.25 32 段階値: 0.25、0.5、1、2、4、6、8、10、...、32。リクエストされた値は、サポートされている最も近い段階値に切り上げられます(例: 5 は 6 に切り上げられます)。
メモリ 2G(GB) 128G(GB) 段階値: 2、3、4、5、...、128。リクエストされた値は、サポートされている最も近い段階値に切り上げられます(3.5G は 4G に切り上げられます)。
ストレージ - 100G(GB) 任意の値。100 GB を超える容量がリクエストされた場合は、100 GB のみが提供されます。

Kubernetes のリソース ユニットの詳細については、Kubernetes のリソース ユニットをご覧ください。

トラブルシューティング

このセクションでは、KubernetesPodOperator の一般的な問題のトラブルシューティングに関するアドバイスを提供します。

ログを表示

問題のトラブルシューティング時には、次の順序でログを確認できます。

  1. Airflow タスクログ:

    1. Google Cloud コンソールで、[環境] ページに移動します。

      [環境] に移動

    2. 環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。

    3. [DAG] タブに移動します。

    4. DAG の名前をクリックし、DAG 実行をクリックして詳細とログを表示します。

  2. Airflow スケジューラ ログ:

    1. [環境の詳細] ページに移動します。

    2. [ログ] タブに移動します。

    3. Airflow スケジューラ ログを調べます。

  3. ユーザー ワークロード ログ:

    1. [環境の詳細] ページに移動します。

    2. [Monitoring] タブに移動します。

    3. [ユーザー ワークロード] を選択します。

    4. 実行されたワークロードのリストを確認します。各ワークロードのログとリソース使用率情報を確認できます。

ゼロ以外の戻りコード

KubernetesPodOperator(および GKEStartPodOperator)を使用する場合、コンテナのエントリ ポイントの戻りコードから、タスクが成功したかどうかを判断できます。ゼロ以外の戻りコードは失敗を示します。

一般的なパターンは、コンテナのエントリ ポイントとしてシェル スクリプトを実行して、コンテナ内の複数のオペレーションをグループ化することです。

このようなスクリプトを記述する場合は、スクリプト内のコマンドが失敗した場合にスクリプトを終了し Airflow タスク インスタンスに失敗を伝播させるために、スクリプトの先頭に set -e コマンドを含めることをおすすめします。

Pod のタイムアウト

KubernetesPodOperator のデフォルトのタイムアウトは 120 秒であるため、サイズの大きなイメージをダウンロードする前にタイムアウトが発生する場合があります。タイムアウトの時間を長くするには、KubernetesPodOperator を作成する際に startup_timeout_seconds パラメータを変更します。

Pod がタイムアウトすると、Airflow UI にタスク固有のログが表示されます。次に例を示します。

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

手元のタスクを実行するために Managed Airflow サービス アカウントに必要な IAM 権限がない場合でも、Pod のタイムアウトは発生する場合があります。この状態を確認するには、GKE ダッシュボードで特定のワークロードのログを調べて Pod レベルのエラーを確認するか、Cloud Logging を使用します。

大量のタスクが実行されると KubernetesPodOperator タスクが失敗する

環境で大量の KubernetesPodOperator タスクまたは KubernetesExecutor タスクが同時に実行されると、既存のタスクの一部が完了するまで、Managed Airflow(第 3 世代)は新しいタスクを受け入れません。

この問題のトラブルシューティングの詳細については、KubernetesExecutor タスクのトラブルシューティングをご覧ください。

次のステップ