KubernetesPodOperator を使用する

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

このページでは、KubernetesPodOperator を使用して、Cloud Composer から Cloud Composer 環境の一部である Google Kubernetes Engine クラスタに Kubernetes Pod をデプロイする方法について説明します。

KubernetesPodOperator は、使用中の環境のクラスタで Kubernetes Pod を起動します。これに対し、Google Kubernetes Engine 演算子は、指定されたクラスタ内で Kubernetes Pod を実行します。このクラスタは、使用中の環境とは関係がない別のクラスタであっても構いません。Google Kubernetes Engine 演算子を使用してクラスタを作成、削除することもできます。

KubernetesPodOperator は、以下を必要とする場合に適しています。

  • 公開 PyPI リポジトリでは使用できないカスタム Python 依存関係。
  • Cloud Composer ワーカーのストック画像では使用できないバイナリ依存関係。

始める前に

  • CNCF Kubernetes プロバイダのバージョン 5.0.0 を使用している場合は、CNCF Kubernetes プロバイダのセクションに記載されている手順に沿って操作します。

  • Pod アフィニティの構成は Cloud Composer 2 では使用できません。Pod アフィニティを使用する場合は、GKE オペレーターを使用して、別のクラスタで Pod を起動します。

Cloud Composer 2 の KubernetesPodOperator について

このセクションでは、Cloud Composer 2 で KubernetesPodOperator がどのように機能するかについて説明します。

リソースの使用量

Cloud Composer 2 では、お使いの環境のクラスタは自動的にスケーリングされます。KubernetesPodOperator を使用して実行する追加のワークロードは、環境とは独立してスケーリングされます。

環境は、リソース需要の増加による影響を受けませんが、環境のクラスタは、リソース需要に応じてスケールアップおよびスケールダウンされます。

環境のクラスタで実行する追加のワークロードの料金は Cloud Composer 2 料金モデルに従い、Cloud Composer Compute SKU を使用します。

Cloud Composer 2 では、コンピューティング クラスの概念が導入されている Autopilot クラスタを使用します。

  • Cloud Composer は、general-purpose コンピューティング クラスのみをサポートしています。

  • デフォルトでは、クラスが選択されていない場合、KubernetesPodOperator を使用して Pod を作成する際には general-purpose クラスが想定されます。

  • 各クラスは特定のプロパティとリソース制限に関連付けられています。それらの詳細については、Autopilot のドキュメントをご覧ください。 たとえば、general-purpose クラス内で実行される Pod は、最大 110 GiB のメモリを使用できます。

プロジェクトのリソースへのアクセス

Cloud Composer 2 は、Workload Identity Federation for GKE で GKE クラスタを使用します。composer-user-workloads 名前空間で実行される Pod は、追加の構成なしでプロジェクト内の Google Cloud リソースにアクセスできます。これらのリソースへのアクセスには、環境のサービス アカウントが使用されます。

カスタム名前空間を使用する場合は、この名前空間に関連付けられた Kubernetes サービス アカウントを Google Cloud サービス アカウントにマッピングして、Google API やその他のサービスへのリクエストに対してサービス ID の承認を有効にする必要があります。環境のクラスタ内のカスタム名前空間で Pod を実行すると、Kubernetes とGoogle Cloud サービス アカウント間の IAM バインディングが作成されず、これらの Pod は Google Cloud プロジェクトのリソースにアクセスできません。

カスタム名前空間を使用していて、Pod がGoogle Cloud リソースにアクセスできるようにするには、Workload Identity Federation for GKE のガイダンスに沿って、カスタム名前空間のバインディングを設定します。

  1. 環境のクラスタに別の名前空間を作成します。
  2. カスタム名前空間の Kubernetes サービス アカウントと環境のサービス アカウントの間にバインディングを作成します。
  3. 環境のサービス アカウント アノテーションを Kubernetes サービス アカウントに追加します。
  4. KubernetesPodOperator を使用する場合は、namespace パラメータと service_account_name パラメータで名前空間と Kubernetes サービス アカウントを指定します。

構成を最小限にしたい

KubernetesPodOperator の作成に必要になるのは、Pod の name、使用する imagetask_id パラメータのみです。/home/airflow/composer_kube_config には、GKE に対する認証に必要な認証情報が含まれています。

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes. In Composer 2 environments
    # after December 2022, the default namespace is
    # `composer-user-workloads`. Always use the
    # `composer-user-workloads` namespace with Composer 3.
    namespace="composer-user-workloads",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

追加構成

この例では、KubernetesPodOperator で構成できる追加のパラメータを示します。

詳細については、以下のリソースをご覧ください。

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="composer-user-workloads",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The Docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 600.
    startup_timeout_seconds=600,
    # The environment variables to be initialized in the container.
    # The env_vars parameter is templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
    ),
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

Jinja テンプレートを使用する

Airflow は、DAG で Jinja テンプレートをサポートしています。

必要な Airflow パラメータ(task_idnameimage)を演算子で宣言する必要があります。次の例で示すように、Jinja を使用してその他のすべてのパラメータ(cmdsargumentsenv_varsconfig_file など)をテンプレート化できます。

この例の env_vars パラメータは、my_value という名前の Airflow 変数から設定されます。例の DAG は、Airflow の vars テンプレート変数から値を取得します。Airflow には、さまざまな種類の情報にアクセスできる変数が他にもあります。たとえば、conf テンプレート変数を使用して、Airflow 構成オプションの値にアクセスできます。詳細と Airflow で使用可能な変数のリストについては、Airflow ドキュメントのテンプレート リファレンスをご覧ください。

DAG を変更するか env_vars 変数を作成しないと、変数が存在しないため、例の ex-kube-templates タスクは失敗します。この変数は、Airflow UI または Google Cloud CLI で作成します。

Airflow UI

  1. Airflow UI に移動します。

  2. ツールバーで、[管理者] > [変数] を選択します。

  3. [List Variable] ページで、[新しいレコードを追加する] をクリックします。

  4. [Add Variable] ページで、次の情報を入力します。

    • Key:my_value
    • Val: example_value
  5. [保存] をクリックします。

gcloud

次のコマンドを入力します。

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

以下のように置き換えます。

  • ENVIRONMENT を環境の名前に置き換えます。
  • LOCATION は、環境が配置されているリージョン。

次の例では、KubernetesPodOperator で Jinja テンプレートを使用する方法を示します。

kubernetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="composer-user-workloads",
    image="bash",
    # All parameters below can be templated with Jinja. For more information
    # and the list of variables available in Airflow, see
    # the Airflow templates reference:
    # https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in Jinja is the execution date as YYYY-MM-DD, this Docker image
    # will echo the execution date. Arguments to the entrypoint. The Docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI. The env_vars parameter
    # is templated.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Specifies path to Kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Kubernetes Secret と ConfigMap を使用する

Kubernetes Secret は、センシティブ データを含むオブジェクトです。Kubernetes ConfigMap は、Key-Value ペアに非機密データを含むオブジェクトです。

Cloud Composer 2 では、Google Cloud CLI、API、Terraform を使用して Secret と ConfigMap を作成し、KubernetesPodOperator からそれらにアクセスできます。

YAML 構成ファイルについて

Google Cloud CLI と API を使用して Kubernetes Secret または ConfigMap を作成する場合は、YAML 形式のファイルを指定します。このファイルは、Kubernetes Secret と ConfigMap で使用される形式と同じ形式にする必要があります。Kubernetes ドキュメントには、ConfigMap と Secret の多くのコードサンプルが用意されています。使用を開始するには、Secret を使用して安全に認証情報を配布するページと ConfigMaps をご覧ください。

Kubernetes Secret の場合と同じように、Secret で値を定義する場合は base64 表現を使用します。

値をエンコードするには、次のコマンドを使用します(これは、Base64 でエンコードされた値を取得する多くの方法の一つです)。

echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64

出力:

cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

次の 2 つの YAML ファイルの例は、このガイドの後半のサンプルで使用されます。Kubernetes Secret の YAML 構成ファイルの例:

apiVersion: v1
kind: Secret
metadata:
  name: airflow-secrets
data:
  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

ファイルを含める方法を示す別の例。前の例と同様に、まずファイルの内容(cat ./key.json | base64)をエンコードしてから、YAML ファイルでこの値を指定します。

apiVersion: v1
kind: Secret
metadata:
  name: service-account
data:
  service-account.json: |
    ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K

ConfigMap の YAML 構成ファイルの例。ConfigMap で base64 表現を使用する必要はありません。

apiVersion: v1
kind: ConfigMap
metadata:
  name: example-configmap
data:
  example_key: example_value

Kubernetes Secret を管理する

Cloud Composer 2 では、Google Cloud CLI と kubectl を使用して Secret を作成します。

  1. 環境のクラスタに関する情報を取得する

    1. 次のコマンドを実行します。

      gcloud composer environments describe ENVIRONMENT \
          --location LOCATION \
          --format="value(config.gkeCluster)"
      

      以下のように置き換えます。

      • ENVIRONMENT は、環境の名前です。
      • LOCATION は、Cloud Composer 環境が配置されているリージョンです。

      このコマンドの出力では、projects/<your-project-id>/locations/<location-of-composer-env>/clusters/<your-cluster-id> の形式が使用されます。

    2. GKE クラスタ ID を取得するには、/clusters/ の後の出力(文末は -gke)をコピーします。

  2. 次のコマンドを使用して GKE クラスタに接続します。

    gcloud container clusters get-credentials CLUSTER_ID \
      --project PROJECT \
      --region LOCATION
    

    以下を置き換えます。

    • CLUSTER_ID: 環境のクラスタ ID。
    • PROJECT_ID: プロジェクト ID
    • LOCATION: 環境が配置されているリージョン。

  3. Kubernetes Secret を作成します。

    次のコマンドは、Kubernetes Secret を作成するための 2 つの方法を示しています。--from-literal の方法では、Key-Value ペアを使用します。--from-file の方法では、ファイルの内容を使用します。

    • Key-Value ペアを指定して Kubernetes Secret を作成するには、次のコマンドを実行します。この例では、test_value の値を持つ sql_alchemy_conn フィールドがある airflow-secrets という名前の Secret を作成します。

      kubectl create secret generic airflow-secrets \
        --from-literal sql_alchemy_conn=test_value -n composer-user-workloads
      

    • ファイルの内容を指定して Kubernetes Secret を作成するには、次のコマンドを実行します。この例では、ローカル ./key.json ファイルの内容から取得した値を持つ service-account.json フィールドがある service-account という名前の Secret を作成します。

      kubectl create secret generic service-account \
        --from-file service-account.json=./key.json -n composer-user-workloads
      

DAG で Kubernetes Secret を使用する

この例では、Kubernetes Secret を使用する 2 つの方法を紹介します。環境変数としての方法と、Pod によってマウントされたボリュームによる方法です。

最初の Secret である airflow-secrets は、SQL_CONN という名前の Kubernetes の環境変数に設定されます(Airflow の環境変数や Cloud Composer の環境変数とは異なります)。

2 番目の Secret である service-account は、サービス アカウント トークンを含むファイルである service-account.json/var/secrets/google にマウントします。

Secret オブジェクトは次のようになります。

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

最初の Kubernetes Secret の名前は、secret_env 変数で定義されます。この Secret の名前は airflow-secrets です。deploy_type パラメータでは、環境変数として公開する必要があることを指定します。環境変数の名前は SQL_CONN で、deploy_target パラメータで指定されます。最後に、SQL_CONN 環境変数の値が sql_alchemy_conn キーの値に設定されます。

2 番目の Kubernetes Secret の名前は、secret_volume 変数で定義されています。この Secret の名前は service-account です。deploy_type で指定されるように、ボリュームとして公開されます。マウントするファイルのパス deploy_target/var/secrets/google です。最後に、deploy_target に保管される Secret の keyservice-account.json です。

演算子の構成は、次のようになります。

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="composer-user-workloads",
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # env_vars allows you to specify environment variables for your
    # container to use. The env_vars parameter is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
    },
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

CNCF Kubernetes プロバイダに関する情報

KubernetesPodOperator は apache-airflow-providers-cncf-kubernetes プロバイダに実装されています。

CNCF Kubernetes プロバイダ向けの詳細なリリースノートについては、CNCF Kubernetes プロバイダのウェブサイトをご覧ください。

バージョン 6.0.0

CNCF Kubernetes プロバイダ パッケージ バージョン 6.0.0 では、KubernetesPodOperator で kubernetes_default 接続がデフォルトで使用されます。

バージョン 5.0.0 でカスタム接続を指定した場合も、このカスタム接続はオペレータによって引き続き使用されます。kubernetes_default 接続を使用するように戻すには、DAG を適宜調整しなければならない場合があります。

バージョン 5.0.0

このバージョンでは、バージョン 4.4.0 と比較して下位互換性のない変更がいくつか導入されています。最も重要なのは、バージョン 5.0.0 で使用されない kubernetes_default 接続に関連した変更です。

  • kubernetes_default 接続を変更する必要があります。Kubernetes 構成パスは /home/airflow/composer_kube_config に設定する必要があります(次の図を参照)。または、config_file を KubernetesPodOperator 構成に追加する必要があります(次のコード例を参照)。
Airflow UI の Kube 構成パスフィールド
図 1Airflow UI、kubernetes_default 接続を変更する(クリックして拡大)
  • 次の方法で KubernetesPodOperator を使用してタスクのコードを変更します。
KubernetesPodOperator(
  # config_file parameter - can be skipped if connection contains this setting
  config_file="/home/airflow/composer_kube_config",
  # definition of connection to be used by the operator
  kubernetes_conn_id='kubernetes_default',
  ...
)

バージョン 5.0.0 の詳細については、CNCF Kubernetes プロバイダ リリースノートをご覧ください。

トラブルシューティング

このセクションでは、KubernetesPodOperator の一般的な問題のトラブルシューティングに関するアドバイスを提供します。

ログを表示

問題のトラブルシューティング時には、次の順序でログを確認できます。

  1. Airflow タスクログ:

    1. Google Cloud コンソールで、[環境] ページに移動します。

      [環境] に移動

    2. 環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。

    3. [DAG] タブに移動します。

    4. DAG の名前をクリックし、DAG 実行をクリックして詳細とログを表示します。

  2. Airflow スケジューラ ログ:

    1. [環境の詳細] ページに移動します。

    2. [ログ] タブに移動します。

    3. Airflow スケジューラ ログを調べます。

  3. Google Cloud コンソールで GKE ワークロードの下にある Pod ログ。これらのログには、Pod 定義 YAML ファイル、Pod イベント、および Pod の詳細が含まれます。

ゼロ以外の戻りコード

KubernetesPodOperator(および GKEStartPodOperator)を使用する場合、コンテナのエントリ ポイントの戻りコードから、タスクが成功したかどうかを判断できます。ゼロ以外の戻りコードは失敗を示します。

一般的なパターンは、コンテナのエントリ ポイントとしてシェル スクリプトを実行して、コンテナ内の複数のオペレーションをグループ化することです。

このようなスクリプトを記述する場合は、スクリプト内のコマンドが失敗した場合にスクリプトを終了し Airflow タスク インスタンスに失敗を伝播させるために、スクリプトの先頭に set -e コマンドを含めることをおすすめします。

Pod のタイムアウト

KubernetesPodOperator のデフォルトのタイムアウトは 120 秒であるため、サイズの大きなイメージをダウンロードする前にタイムアウトが発生する場合があります。タイムアウトの時間を長くするには、KubernetesPodOperator を作成する際に startup_timeout_seconds パラメータを変更します。

Pod がタイムアウトすると、Airflow UI にタスク固有のログが表示されます。例:

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

手元のタスクを実行するために Cloud Composer サービス アカウントに必要な IAM 権限がない場合でも、Pod のタイムアウトは発生する場合があります。この状態を確認するには、GKE ダッシュボードで特定のワークロードのログを調べて Pod レベルのエラーを確認するか、Cloud Logging を使用します。

新しい接続の確立に失敗した

GKE クラスタでは、自動アップグレードがデフォルトで有効になっています。 アップグレード中のクラスタにノードプールが存在する場合、次のエラーが表示される場合があります。

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused

クラスタがアップグレードされているかどうかを確認するには、 Google Cloud コンソールで [Kubernetes クラスタ] ページに移動し、環境のクラスタ名の横にある読み込みアイコンを確認します。

次のステップ