KubernetesPodOperator を使用する

Managed Airflow(第 3 世代) | Managed Airflow(第 2 世代) | Managed Airflow(以前の第 1 世代)

このページでは、KubernetesPodOperator を使用して、Managed Service for Apache Airflow から Managed Service for Apache Airflow 環境の一部である Google Kubernetes Engine クラスタに Kubernetes Pod をデプロイする方法について説明します。

KubernetesPodOperator は、使用中の環境のクラスタで Kubernetes Pod を起動します。これに対し、Google Kubernetes Engine 演算子は、指定されたクラスタ内で Kubernetes Pod を実行します。このクラスタは、使用中の環境とは関係がない別のクラスタであっても構いません。Google Kubernetes Engine 演算子を使用してクラスタを作成、削除することもできます。

KubernetesPodOperator は、以下を必要とする場合に適しています。

  • 公開 PyPI リポジトリでは使用できないカスタム Python 依存関係。
  • Managed Airflow ワーカーのストック画像では使用できないバイナリ依存関係。

始める前に

  • Managed Airflow の最新バージョンを使用することをおすすめします。このバージョンは、少なくとも非推奨とサポート ポリシーの一部としてサポートされている必要があります。
  • 環境に十分なリソースが存在することを確認してください。リソースが不足している Pod を環境で起動すると、Airflow ワーカーと Airflow スケジューラでエラーが発生する可能性があります。

Managed Service for Apache Airflow 環境リソースを設定する

Managed Airflow 環境を作成する際には、環境のクラスタのパフォーマンス パラメータなどのパフォーマンス パラメータを指定します。Kubernetes Pod を環境クラスタで起動すると、CPU やメモリなどのクラスタ リソースの競合が発生する可能性があります。Airflow スケジューラと Airflow ワーカーは同じ GKE クラスタにあるため、競合によってリソース不足になった場合は、スケジューラとワーカーは正常に動作しません。

リソースの不足を防ぐには、次の操作を行います。

ノードプールを作成

マネージド Airflow 環境でリソースの不足を防ぐには、新しいノードプールを作成し、そのプールからのリソースのみを使用して実行するように Kubernetes Pod を構成します。

コンソール

  1. Google Cloud コンソールで、[環境] ページに移動します。

    [環境] に移動

  2. 環境の名前をクリックします。

  3. [環境の詳細] ページで [環境の構成] タブに移動します。

  4. [リソース] > [GKE クラスタ] セクションで、[クラスタの詳細を表示] リンクをクリックします。

  5. ノードプールの追加の説明に沿って、ノードプールを作成します。

gcloud

  1. 環境のクラスタの名前を決定します。

    gcloud composer environments describe ENVIRONMENT_NAME \
      --location LOCATION \
      --format="value(config.gkeCluster)"
    

    以下のように置き換えます。

    • ENVIRONMENT_NAME を環境の名前に置き換えます。
    • LOCATION は、環境が配置されているリージョン。
  2. 出力には、環境のクラスタの名前が含まれます。例: europe-west3-example-enviro-af810e25-gke

  3. ノードプールの追加の説明に沿って、ノードプールを作成します。

環境内のノード数を増やす

Managed Airflow 環境でノード数を増やすと、ワークロードが使用できるコンピューティング能力が増加します。指定したマシンタイプに用意されているよりも多くの CPU または RAM を必要とするタスクには、この増加によって追加のリソースは割り当てられません。

ノード数を増やすには、環境を更新してください

適切なマシンタイプを指定する

Managed Airflow 環境の作成の際に、マシンタイプを指定できます。使用可能なリソースを確保するには、Managed Airflow 環境で行うコンピューティングの種類のマシンタイプを指定します。

構成を最小限にしたい

KubernetesPodOperator の作成に必要になるのは、Pod の name、使用する imagetask_id パラメータのみです。/home/airflow/composer_kube_config には、GKE に対する認証に必要な認証情報が含まれています。

Airflow 2

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes, default namespace is
    # `default`. In Composer 1 there is the potential for
    # the resource starvation of Airflow workers and scheduler
    # within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources,
    # and using Composer 2 will mean the environment will autoscale.
    namespace="default",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="marketplace.gcr.io/google/ubuntu2204",
)

Airflow 1

kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes, default namespace is
    # `default`. There is the potential for the resource starvation of
    # Airflow workers and scheduler within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources.
    namespace="default",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="marketplace.gcr.io/google/ubuntu2204",
)

Pod アフィニティの構成

KubernetesPodOperator で affinity パラメータを構成する際に、Pod のスケジュールを設定する対象ノードを制御します(特定のノードプール内のノードなど)。この例では、演算子は pool-0pool-1 という名前のノードプールでのみ実行されます。Managed Airflow(以前の Gen 1)環境のノードは default-pool にあるため、Pod は環境内のノードで実行されません。

Airflow 2

# Pod affinity with the KubernetesPodOperator
# is not supported with Composer 2
# instead, create a cluster and use the GKEStartPodOperator
# https://cloud.google.com/composer/docs/using-gke-operator
kubernetes_affinity_ex = KubernetesPodOperator(
    task_id="ex-pod-affinity",
    name="ex-pod-affinity",
    namespace="default",
    image="perl:5.34.0",
    cmds=["perl"],
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        "nodeAffinity": {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            "requiredDuringSchedulingIgnoredDuringExecution": {
                "nodeSelectorTerms": [
                    {
                        "matchExpressions": [
                            {
                                # When nodepools are created in Google Kubernetes
                                # Engine, the nodes inside of that nodepool are
                                # automatically assigned the label
                                # 'cloud.google.com/gke-nodepool' with the value of
                                # the nodepool's name.
                                "key": "cloud.google.com/gke-nodepool",
                                "operator": "In",
                                # The label key's value that pods can be scheduled
                                # on.
                                "values": [
                                    "pool-0",
                                    "pool-1",
                                ],
                            }
                        ]
                    }
                ]
            }
        }
    },
)

Airflow 1

kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-pod-affinity",
    name="ex-pod-affinity",
    namespace="default",
    image="perl:5.34.0",
    cmds=["perl"],
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        "nodeAffinity": {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            "requiredDuringSchedulingIgnoredDuringExecution": {
                "nodeSelectorTerms": [
                    {
                        "matchExpressions": [
                            {
                                # When nodepools are created in Google Kubernetes
                                # Engine, the nodes inside of that nodepool are
                                # automatically assigned the label
                                # 'cloud.google.com/gke-nodepool' with the value of
                                # the nodepool's name.
                                "key": "cloud.google.com/gke-nodepool",
                                "operator": "In",
                                # The label key's value that pods can be scheduled
                                # on.
                                "values": [
                                    "pool-0",
                                    "pool-1",
                                ],
                            }
                        ]
                    }
                ]
            }
        }
    },
)

例のように構成されている場合、タスクは失敗します。ログを確認すると、ノードプール pool-0pool-1 が存在しないためタスクが失敗しています。

values のノードプールが存在することを確認するために、次のいずれかの構成の変更を行います。

  • ノードプールを以前に作成した場合は、pool-0pool-1 をノードプールの名前に置き換えてからもう一度 DAG をアップロードします。

  • pool-0 または pool-1 という名前のノードプールを作成します。両方のノードプールを作成できますが、タスクを成功させるために必要なのは 1 つのみです。

  • pool-0pool-1default-pool に置き換えます。これは、Airflow が使用するデフォルトのプールです。もう一度 DAG をアップロードします。

変更を行ったら、環境が更新されるまで数分待ちます。次に、ex-pod-affinity タスクを再度実行し、ex-pod-affinity タスクが成功することを確認します。

追加構成

この例では、KubernetesPodOperator で構成できる追加のパラメータを示します。

詳細については、以下のリソースをご覧ください。

Airflow 2

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="default",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        limits={"memory": "250M", "cpu": "100m"},
    ),
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

Airflow 1

kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="default",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 1.10.4, resource specifications were
    # passed as a Pod Resources Class object,
    # If using this example on a version of Airflow prior to 1.10.4,
    # import the "pod" package from airflow.contrib.kubernetes and use
    # resources = pod.Resources() instead passing a dict
    # For more info see:
    # https://github.com/apache/airflow/pull/4551
    resources={"limit_memory": "250M", "limit_cpu": "100m"},
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    affinity={},
)

Jinja テンプレートを使用する

Airflow は、DAG で Jinja テンプレートをサポートしています。

必要な Airflow パラメータ(task_idnameimage)を演算子で宣言する必要があります。次の例で示すように、Jinja を使用してその他のすべてのパラメータ(cmdsargumentsenv_varsconfig_file など)をテンプレート化できます。

この例の env_vars パラメータは、my_value という名前の Airflow 変数から設定されます。例の DAG は、Airflow の vars テンプレート変数から値を取得します。Airflow には、さまざまな種類の情報にアクセスできる変数が他にもあります。たとえば、conf テンプレート変数を使用して、Airflow 構成オプションの値にアクセスできます。詳細と Airflow で使用可能な変数のリストについては、Airflow ドキュメントのテンプレート リファレンスをご覧ください。

DAG を変更するか env_vars 変数を作成しないと、変数が存在しないため、例の ex-kube-templates タスクは失敗します。この変数は、Airflow UI または Google Cloud CLI で作成します。

Airflow UI

  1. Airflow UI に移動します。

  2. ツールバーで、[管理者] > [変数] を選択します。

  3. [List Variable] ページで、[新しいレコードを追加する] をクリックします。

  4. [Add Variable] ページで、次の情報を入力します。

    • Key:my_value
    • Val: example_value
  5. [保存] をクリックします。

環境で Airflow 1 を使用している場合は、代わりに次のコマンドを実行します。

  1. Airflow UI に移動します。

  2. ツールバーで、[管理者] > [変数] を選択します。

  3. [Variables] ページで、[Create] タブをクリックします。

  4. [Variable] ページで、次の情報を入力します。

    • Key:my_value
    • Val: example_value
  5. [保存] をクリックします。

gcloud

次のコマンドを入力します。

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

環境で Airflow 1 を使用している場合は、代わりに次のコマンドを実行します。

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables -- \
    --set my_value example_value

以下のように置き換えます。

  • ENVIRONMENT を環境の名前に置き換えます。
  • LOCATION は、環境が配置されているリージョン。

次の例では、KubernetesPodOperator で Jinja テンプレートを使用する方法を示します。

Airflow 2

kubernetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="default",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
)

Airflow 1

kubernetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="default",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
)

Kubernetes Secret と ConfigMap を使用する

Kubernetes Secret は、センシティブ データを含むオブジェクトです。Kubernetes ConfigMap は、Key-Value ペアに非機密データを含むオブジェクトです。

Managed Airflow(第 2 世代)では、Google Cloud CLI、API、Terraform を使用して Secret と ConfigMap を作成し、KubernetesPodOperator からそれらにアクセスできます。

YAML 構成ファイルについて

Google Cloud CLI と API を使用して Kubernetes Secret または ConfigMap を作成する場合は、YAML 形式のファイルを指定します。このファイルは、Kubernetes Secret と ConfigMap で使用される形式と同じ形式にする必要があります。Kubernetes ドキュメントには、ConfigMap と Secret の多くのコードサンプルが用意されています。使用を開始するには、Secret を使用して安全に認証情報を配布するページと ConfigMaps をご覧ください。

Kubernetes Secret の場合と同じように、Secret で値を定義する場合は base64 表現を使用します。

値をエンコードするには、次のコマンドを使用します(これは、Base64 でエンコードされた値を取得する多くの方法の一つです)。

echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64

出力:

cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

次の 2 つの YAML ファイルの例は、このガイドの後半のサンプルで使用されます。Kubernetes Secret の YAML 構成ファイルの例:

apiVersion: v1
kind: Secret
metadata:
  name: airflow-secrets
data:
  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

ファイルを含める方法を示す別の例。前の例と同様に、まずファイルの内容(cat ./key.json | base64)をエンコードしてから、YAML ファイルでこの値を指定します。

apiVersion: v1
kind: Secret
metadata:
  name: service-account
data:
  service-account.json: |
    ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K

ConfigMap の YAML 構成ファイルの例。ConfigMap で base64 表現を使用する必要はありません。

apiVersion: v1
kind: ConfigMap
metadata:
  name: example-configmap
data:
  example_key: example_value

Kubernetes Secret を管理する

マネージド Airflow(第 2 世代)では、Google Cloud CLI と kubectl を使用して Secret を作成します。

  1. 環境のクラスタに関する情報を取得する

    1. 次のコマンドを実行します。

      gcloud composer environments describe ENVIRONMENT \
          --location LOCATION \
          --format="value(config.gkeCluster)"
      

      以下のように置き換えます。

      • ENVIRONMENT は、環境の名前です。
      • LOCATION は、Managed Airflow 環境が配置されているリージョンに置き換えます。

      このコマンドの出力では、projects/<your-project-id>/zones/<zone-of-composer-env>/clusters/<your-cluster-id> の形式が使用されます。

    2. GKE クラスタ ID を取得するには、/clusters/ の後の出力(文末は -gke)をコピーします。

    3. ゾーンを取得するには、/zones/ の後の出力をコピーします。

  2. 次のコマンドを使用して GKE クラスタに接続します。

    gcloud container clusters get-credentials CLUSTER_ID \
      --project PROJECT \
      --zone ZONE
    

    以下のように置き換えます。

    • CLUSTER_ID: 環境のクラスタ ID。
    • PROJECT_ID: プロジェクト ID
    • ZONE は、環境のクラスタが配置されているゾーンに置き換えます。
  3. Kubernetes Secret を作成します。

    次のコマンドは、Kubernetes Secret を作成するための 2 つの方法を示しています。--from-literal の方法では、Key-Value ペアを使用します。--from-file の方法では、ファイルの内容を使用します。

    • Key-Value ペアを指定して Kubernetes Secret を作成するには、次のコマンドを実行します。この例では、test_value の値を持つ sql_alchemy_conn フィールドがある airflow-secrets という名前の Secret を作成します。

      kubectl create secret generic airflow-secrets \
        --from-literal sql_alchemy_conn=test_value
      
    • ファイルの内容を指定して Kubernetes Secret を作成するには、次のコマンドを実行します。この例では、ローカル ./key.json ファイルの内容から取得した値を持つ service-account.json フィールドがある service-account という名前の Secret を作成します。

      kubectl create secret generic service-account \
        --from-file service-account.json=./key.json
      

DAG で Kubernetes Secret を使用する

この例では、Kubernetes Secret を使用する 2 つの方法を紹介します。環境変数としての方法と、Pod によってマウントされたボリュームによる方法です。

最初の Secret である airflow-secrets は、SQL_CONN という名前の Kubernetes の環境変数に設定されます(Airflow の環境変数や Managed Airflow の環境変数とは異なります)。

2 番目の Secret である service-account は、サービス アカウント トークンを含むファイルである service-account.json/var/secrets/google にマウントします。

Secret オブジェクトは次のようになります。

Airflow 2

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

Airflow 1

secret_env = secret.Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = secret.Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

最初の Kubernetes Secret の名前は、secret_env 変数で定義されます。この Secret の名前は airflow-secrets です。deploy_type パラメータでは、環境変数として公開する必要があることを指定します。環境変数の名前は SQL_CONN で、deploy_target パラメータで指定されます。最後に、SQL_CONN 環境変数の値が sql_alchemy_conn キーの値に設定されます。

2 番目の Kubernetes Secret の名前は、secret_volume 変数で定義されています。この Secret の名前は service-account です。deploy_type で指定されるように、ボリュームとして公開されます。マウントするファイルのパス deploy_target/var/secrets/google です。最後に、deploy_target に保管される Secret の keyservice-account.json です。

演算子の構成は、次のようになります。

Airflow 2

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="default",
    image="ubuntu",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
    },
)

Airflow 1

kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="default",
    image="ubuntu",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
    },
)

CNCF Kubernetes プロバイダに関する情報

KubernetesPodOperator は apache-airflow-providers-cncf-kubernetes プロバイダに実装されています。

CNCF Kubernetes プロバイダ向けの詳細なリリースノートについては、CNCF Kubernetes プロバイダのウェブサイトをご覧ください。

バージョン 6.0.0

CNCF Kubernetes プロバイダ パッケージ バージョン 6.0.0 では、KubernetesPodOperator で kubernetes_default 接続がデフォルトで使用されます。

バージョン 5.0.0 でカスタム接続を指定した場合も、このカスタム接続はオペレータによって引き続き使用されます。kubernetes_default 接続を使用するように戻すには、DAG を適宜調整しなければならない場合があります。

バージョン 5.0.0

このバージョンでは、バージョン 4.4.0 と比較して下位互換性のない変更がいくつか導入されています。最も重要なのは、バージョン 5.0.0 で使用されない kubernetes_default 接続に関連した変更です。

  • kubernetes_default 接続を変更する必要があります。Kubernetes 構成パスは /home/airflow/composer_kube_config に設定する必要があります(次の図を参照)。または、config_file を KubernetesPodOperator 構成に追加する必要があります(次のコード例を参照)。
Airflow UI の Kube 構成パスフィールド
図 1Airflow UI、kubernetes_default 接続を変更する(クリックして拡大)
  • 次の方法で KubernetesPodOperator を使用してタスクのコードを変更します。
KubernetesPodOperator(
  # config_file parameter - can be skipped if connection contains this setting
  config_file="/home/airflow/composer_kube_config",
  # definition of connection to be used by the operator
  kubernetes_conn_id='kubernetes_default',
  ...
)

バージョン 5.0.0 の詳細については、CNCF Kubernetes プロバイダ リリースノートをご覧ください。

トラブルシューティング

このセクションでは、KubernetesPodOperator の一般的な問題のトラブルシューティングに関するアドバイスを提供します。

ログを表示

問題のトラブルシューティング時には、次の順序でログを確認できます。

  1. Airflow タスクログ:

    1. Google Cloud コンソールで、[環境] ページに移動します。

      [環境] に移動

    2. 環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。

    3. [DAG] タブに移動します。

    4. DAG の名前をクリックし、DAG 実行をクリックして詳細とログを表示します。

  2. Airflow スケジューラ ログ:

    1. [環境の詳細] ページに移動します。

    2. [ログ] タブに移動します。

    3. Airflow スケジューラ ログを調べます。

  3. Google Cloud コンソールで GKE ワークロードの下にある Pod ログ。これらのログには、Pod 定義 YAML ファイル、Pod イベント、および Pod の詳細が含まれます。

ゼロ以外の戻りコード

KubernetesPodOperator(および GKEStartPodOperator)を使用する場合、コンテナのエントリ ポイントの戻りコードから、タスクが成功したかどうかを判断できます。ゼロ以外の戻りコードは失敗を示します。

一般的なパターンは、コンテナのエントリ ポイントとしてシェル スクリプトを実行して、コンテナ内の複数のオペレーションをグループ化することです。

このようなスクリプトを記述する場合は、スクリプト内のコマンドが失敗した場合にスクリプトを終了し Airflow タスク インスタンスに失敗を伝播させるために、スクリプトの先頭に set -e コマンドを含めることをおすすめします。

Pod のタイムアウト

KubernetesPodOperator のデフォルトのタイムアウトは 120 秒であるため、サイズの大きなイメージをダウンロードする前にタイムアウトが発生する場合があります。タイムアウトの時間を長くするには、KubernetesPodOperator を作成する際に startup_timeout_seconds パラメータを変更します。

Pod がタイムアウトすると、Airflow UI にタスク固有のログが表示されます。次に例を示します。

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

手元のタスクを実行するために Managed Airflow サービス アカウントに必要な IAM 権限がない場合でも、Pod のタイムアウトは発生する場合があります。この状態を確認するには、GKE ダッシュボードで特定のワークロードのログを調べて Pod レベルのエラーを確認するか、Cloud Logging を使用します。

新しい接続の確立に失敗した

GKE クラスタでは、自動アップグレードがデフォルトで有効になっています。 アップグレード中のクラスタにノードプールが存在する場合、次のエラーが表示される場合があります。

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused

クラスタがアップグレードされているかどうかを確認するには、 Google Cloud コンソールで [Kubernetes クラスタ] ページに移動し、環境のクラスタ名の横にある読み込みアイコンを確認します。

次のステップ