Managed Airflow(第 3 世代) | Managed Airflow(第 2 世代) | Managed Airflow(以前の第 1 世代)
このページでは、KubernetesPodOperator を使用して、Managed Service for Apache Airflow から Managed Service for Apache Airflow 環境の一部である Google Kubernetes Engine クラスタに Kubernetes Pod をデプロイする方法について説明します。
KubernetesPodOperator は、使用中の環境のクラスタで Kubernetes Pod を起動します。これに対し、Google Kubernetes Engine 演算子は、指定されたクラスタ内で Kubernetes Pod を実行します。このクラスタは、使用中の環境とは関係がない別のクラスタであっても構いません。Google Kubernetes Engine 演算子を使用してクラスタを作成、削除することもできます。
KubernetesPodOperator は、以下を必要とする場合に適しています。
- 公開 PyPI リポジトリでは使用できないカスタム Python 依存関係。
- Managed Airflow ワーカーのストック画像では使用できないバイナリ依存関係。
始める前に
- Managed Airflow の最新バージョンを使用することをおすすめします。このバージョンは、少なくとも非推奨とサポート ポリシーの一部としてサポートされている必要があります。
- 環境に十分なリソースが存在することを確認してください。リソースが不足している Pod を環境で起動すると、Airflow ワーカーと Airflow スケジューラでエラーが発生する可能性があります。
Managed Service for Apache Airflow 環境リソースを設定する
Managed Airflow 環境を作成する際には、環境のクラスタのパフォーマンス パラメータなどのパフォーマンス パラメータを指定します。Kubernetes Pod を環境クラスタで起動すると、CPU やメモリなどのクラスタ リソースの競合が発生する可能性があります。Airflow スケジューラと Airflow ワーカーは同じ GKE クラスタにあるため、競合によってリソース不足になった場合は、スケジューラとワーカーは正常に動作しません。
リソースの不足を防ぐには、次の操作を行います。
ノードプールを作成
マネージド Airflow 環境でリソースの不足を防ぐには、新しいノードプールを作成し、そのプールからのリソースのみを使用して実行するように Kubernetes Pod を構成します。
コンソール
gcloud
環境のクラスタの名前を決定します。
gcloud composer environments describe ENVIRONMENT_NAME \ --location LOCATION \ --format="value(config.gkeCluster)"以下のように置き換えます。
ENVIRONMENT_NAMEを環境の名前に置き換えます。LOCATIONは、環境が配置されているリージョン。
出力には、環境のクラスタの名前が含まれます。例:
europe-west3-example-enviro-af810e25-gkeノードプールの追加の説明に沿って、ノードプールを作成します。
環境内のノード数を増やす
Managed Airflow 環境でノード数を増やすと、ワークロードが使用できるコンピューティング能力が増加します。指定したマシンタイプに用意されているよりも多くの CPU または RAM を必要とするタスクには、この増加によって追加のリソースは割り当てられません。
ノード数を増やすには、環境を更新してください。
適切なマシンタイプを指定する
Managed Airflow 環境の作成の際に、マシンタイプを指定できます。使用可能なリソースを確保するには、Managed Airflow 環境で行うコンピューティングの種類のマシンタイプを指定します。
構成を最小限にしたい
KubernetesPodOperator の作成に必要になるのは、Pod の name、使用する image、task_id パラメータのみです。/home/airflow/composer_kube_config には、GKE に対する認証に必要な認証情報が含まれています。
Airflow 2
Airflow 1
Pod アフィニティの構成
KubernetesPodOperator で affinity パラメータを構成する際に、Pod のスケジュールを設定する対象ノードを制御します(特定のノードプール内のノードなど)。この例では、演算子は pool-0 や pool-1 という名前のノードプールでのみ実行されます。Managed Airflow(以前の Gen 1)環境のノードは default-pool にあるため、Pod は環境内のノードで実行されません。
Airflow 2
Airflow 1
例のように構成されている場合、タスクは失敗します。ログを確認すると、ノードプール pool-0 と pool-1 が存在しないためタスクが失敗しています。
values のノードプールが存在することを確認するために、次のいずれかの構成の変更を行います。
ノードプールを以前に作成した場合は、
pool-0とpool-1をノードプールの名前に置き換えてからもう一度 DAG をアップロードします。pool-0またはpool-1という名前のノードプールを作成します。両方のノードプールを作成できますが、タスクを成功させるために必要なのは 1 つのみです。pool-0とpool-1をdefault-poolに置き換えます。これは、Airflow が使用するデフォルトのプールです。もう一度 DAG をアップロードします。
変更を行ったら、環境が更新されるまで数分待ちます。次に、ex-pod-affinity タスクを再度実行し、ex-pod-affinity タスクが成功することを確認します。
追加構成
この例では、KubernetesPodOperator で構成できる追加のパラメータを示します。
詳細については、以下のリソースをご覧ください。
Kubernetes Secret と ConfigMap の使用方法については、Kubernetes Secret と ConfigMap を使用するをご覧ください。
KubernetesPodOperator で Jinja テンプレートを使用する方法については、Jinja テンプレートを使用するをご覧ください。
KubernetesPodOperator パラメータの詳細については、Airflow ドキュメントのオペレーターのリファレンスをご覧ください。
Airflow 2
Airflow 1
Jinja テンプレートを使用する
Airflow は、DAG で Jinja テンプレートをサポートしています。
必要な Airflow パラメータ(task_id、name、image)を演算子で宣言する必要があります。次の例で示すように、Jinja を使用してその他のすべてのパラメータ(cmds、arguments、env_vars、config_file など)をテンプレート化できます。
この例の env_vars パラメータは、my_value という名前の Airflow 変数から設定されます。例の DAG は、Airflow の vars テンプレート変数から値を取得します。Airflow には、さまざまな種類の情報にアクセスできる変数が他にもあります。たとえば、conf テンプレート変数を使用して、Airflow 構成オプションの値にアクセスできます。詳細と Airflow で使用可能な変数のリストについては、Airflow ドキュメントのテンプレート リファレンスをご覧ください。
DAG を変更するか env_vars 変数を作成しないと、変数が存在しないため、例の ex-kube-templates タスクは失敗します。この変数は、Airflow UI または Google Cloud CLI で作成します。
Airflow UI
Airflow UI に移動します。
ツールバーで、[管理者] > [変数] を選択します。
[List Variable] ページで、[新しいレコードを追加する] をクリックします。
[Add Variable] ページで、次の情報を入力します。
- Key:
my_value - Val:
example_value
- Key:
[保存] をクリックします。
環境で Airflow 1 を使用している場合は、代わりに次のコマンドを実行します。
Airflow UI に移動します。
ツールバーで、[管理者] > [変数] を選択します。
[Variables] ページで、[Create] タブをクリックします。
[Variable] ページで、次の情報を入力します。
- Key:
my_value - Val:
example_value
- Key:
[保存] をクリックします。
gcloud
次のコマンドを入力します。
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables set -- \
my_value example_value
環境で Airflow 1 を使用している場合は、代わりに次のコマンドを実行します。
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables -- \
--set my_value example_value
以下のように置き換えます。
ENVIRONMENTを環境の名前に置き換えます。LOCATIONは、環境が配置されているリージョン。
次の例では、KubernetesPodOperator で Jinja テンプレートを使用する方法を示します。
Airflow 2
Airflow 1
Kubernetes Secret と ConfigMap を使用する
Kubernetes Secret は、センシティブ データを含むオブジェクトです。Kubernetes ConfigMap は、Key-Value ペアに非機密データを含むオブジェクトです。
Managed Airflow(第 2 世代)では、Google Cloud CLI、API、Terraform を使用して Secret と ConfigMap を作成し、KubernetesPodOperator からそれらにアクセスできます。
YAML 構成ファイルについて
Google Cloud CLI と API を使用して Kubernetes Secret または ConfigMap を作成する場合は、YAML 形式のファイルを指定します。このファイルは、Kubernetes Secret と ConfigMap で使用される形式と同じ形式にする必要があります。Kubernetes ドキュメントには、ConfigMap と Secret の多くのコードサンプルが用意されています。使用を開始するには、Secret を使用して安全に認証情報を配布するページと ConfigMaps をご覧ください。
Kubernetes Secret の場合と同じように、Secret で値を定義する場合は base64 表現を使用します。
値をエンコードするには、次のコマンドを使用します(これは、Base64 でエンコードされた値を取得する多くの方法の一つです)。
echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64
出力:
cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==
次の 2 つの YAML ファイルの例は、このガイドの後半のサンプルで使用されます。Kubernetes Secret の YAML 構成ファイルの例:
apiVersion: v1
kind: Secret
metadata:
name: airflow-secrets
data:
sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==
ファイルを含める方法を示す別の例。前の例と同様に、まずファイルの内容(cat ./key.json | base64)をエンコードしてから、YAML ファイルでこの値を指定します。
apiVersion: v1
kind: Secret
metadata:
name: service-account
data:
service-account.json: |
ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K
ConfigMap の YAML 構成ファイルの例。ConfigMap で base64 表現を使用する必要はありません。
apiVersion: v1
kind: ConfigMap
metadata:
name: example-configmap
data:
example_key: example_value
Kubernetes Secret を管理する
マネージド Airflow(第 2 世代)では、Google Cloud CLI と kubectl を使用して Secret を作成します。
環境のクラスタに関する情報を取得する
次のコマンドを実行します。
gcloud composer environments describe ENVIRONMENT \ --location LOCATION \ --format="value(config.gkeCluster)"以下のように置き換えます。
ENVIRONMENTは、環境の名前です。LOCATIONは、Managed Airflow 環境が配置されているリージョンに置き換えます。
このコマンドの出力では、
projects/<your-project-id>/zones/<zone-of-composer-env>/clusters/<your-cluster-id>の形式が使用されます。GKE クラスタ ID を取得するには、
/clusters/の後の出力(文末は-gke)をコピーします。ゾーンを取得するには、
/zones/の後の出力をコピーします。
次のコマンドを使用して GKE クラスタに接続します。
gcloud container clusters get-credentials CLUSTER_ID \ --project PROJECT \ --zone ZONE以下のように置き換えます。
CLUSTER_ID: 環境のクラスタ ID。PROJECT_ID: プロジェクト IDZONEは、環境のクラスタが配置されているゾーンに置き換えます。
Kubernetes Secret を作成します。
次のコマンドは、Kubernetes Secret を作成するための 2 つの方法を示しています。
--from-literalの方法では、Key-Value ペアを使用します。--from-fileの方法では、ファイルの内容を使用します。Key-Value ペアを指定して Kubernetes Secret を作成するには、次のコマンドを実行します。この例では、
test_valueの値を持つsql_alchemy_connフィールドがあるairflow-secretsという名前の Secret を作成します。kubectl create secret generic airflow-secrets \ --from-literal sql_alchemy_conn=test_valueファイルの内容を指定して Kubernetes Secret を作成するには、次のコマンドを実行します。この例では、ローカル
./key.jsonファイルの内容から取得した値を持つservice-account.jsonフィールドがあるservice-accountという名前の Secret を作成します。kubectl create secret generic service-account \ --from-file service-account.json=./key.json
DAG で Kubernetes Secret を使用する
この例では、Kubernetes Secret を使用する 2 つの方法を紹介します。環境変数としての方法と、Pod によってマウントされたボリュームによる方法です。
最初の Secret である airflow-secrets は、SQL_CONN という名前の Kubernetes の環境変数に設定されます(Airflow の環境変数や Managed Airflow の環境変数とは異なります)。
2 番目の Secret である service-account は、サービス アカウント トークンを含むファイルである service-account.json を /var/secrets/google にマウントします。
Secret オブジェクトは次のようになります。
Airflow 2
Airflow 1
最初の Kubernetes Secret の名前は、secret_env 変数で定義されます。この Secret の名前は airflow-secrets です。deploy_type パラメータでは、環境変数として公開する必要があることを指定します。環境変数の名前は SQL_CONN で、deploy_target パラメータで指定されます。最後に、SQL_CONN 環境変数の値が sql_alchemy_conn キーの値に設定されます。
2 番目の Kubernetes Secret の名前は、secret_volume 変数で定義されています。この Secret の名前は service-account です。deploy_type で指定されるように、ボリュームとして公開されます。マウントするファイルのパス deploy_target は /var/secrets/google です。最後に、deploy_target に保管される Secret の key は service-account.json です。
演算子の構成は、次のようになります。
Airflow 2
Airflow 1
CNCF Kubernetes プロバイダに関する情報
KubernetesPodOperator は apache-airflow-providers-cncf-kubernetes プロバイダに実装されています。
CNCF Kubernetes プロバイダ向けの詳細なリリースノートについては、CNCF Kubernetes プロバイダのウェブサイトをご覧ください。
バージョン 6.0.0
CNCF Kubernetes プロバイダ パッケージ バージョン 6.0.0 では、KubernetesPodOperator で kubernetes_default 接続がデフォルトで使用されます。
バージョン 5.0.0 でカスタム接続を指定した場合も、このカスタム接続はオペレータによって引き続き使用されます。kubernetes_default 接続を使用するように戻すには、DAG を適宜調整しなければならない場合があります。
バージョン 5.0.0
このバージョンでは、バージョン 4.4.0 と比較して下位互換性のない変更がいくつか導入されています。最も重要なのは、バージョン 5.0.0 で使用されない kubernetes_default 接続に関連した変更です。
kubernetes_default接続を変更する必要があります。Kubernetes 構成パスは/home/airflow/composer_kube_configに設定する必要があります(次の図を参照)。または、config_fileを KubernetesPodOperator 構成に追加する必要があります(次のコード例を参照)。
- 次の方法で KubernetesPodOperator を使用してタスクのコードを変更します。
KubernetesPodOperator(
# config_file parameter - can be skipped if connection contains this setting
config_file="/home/airflow/composer_kube_config",
# definition of connection to be used by the operator
kubernetes_conn_id='kubernetes_default',
...
)
バージョン 5.0.0 の詳細については、CNCF Kubernetes プロバイダ リリースノートをご覧ください。
トラブルシューティング
このセクションでは、KubernetesPodOperator の一般的な問題のトラブルシューティングに関するアドバイスを提供します。
ログを表示
問題のトラブルシューティング時には、次の順序でログを確認できます。
Airflow タスクログ:
Google Cloud コンソールで、[環境] ページに移動します。
環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。
[DAG] タブに移動します。
DAG の名前をクリックし、DAG 実行をクリックして詳細とログを表示します。
Airflow スケジューラ ログ:
[環境の詳細] ページに移動します。
[ログ] タブに移動します。
Airflow スケジューラ ログを調べます。
Google Cloud コンソールで GKE ワークロードの下にある Pod ログ。これらのログには、Pod 定義 YAML ファイル、Pod イベント、および Pod の詳細が含まれます。
ゼロ以外の戻りコード
KubernetesPodOperator(および GKEStartPodOperator)を使用する場合、コンテナのエントリ ポイントの戻りコードから、タスクが成功したかどうかを判断できます。ゼロ以外の戻りコードは失敗を示します。
一般的なパターンは、コンテナのエントリ ポイントとしてシェル スクリプトを実行して、コンテナ内の複数のオペレーションをグループ化することです。
このようなスクリプトを記述する場合は、スクリプト内のコマンドが失敗した場合にスクリプトを終了し Airflow タスク インスタンスに失敗を伝播させるために、スクリプトの先頭に set -e コマンドを含めることをおすすめします。
Pod のタイムアウト
KubernetesPodOperator のデフォルトのタイムアウトは 120 秒であるため、サイズの大きなイメージをダウンロードする前にタイムアウトが発生する場合があります。タイムアウトの時間を長くするには、KubernetesPodOperator を作成する際に startup_timeout_seconds パラメータを変更します。
Pod がタイムアウトすると、Airflow UI にタスク固有のログが表示されます。次に例を示します。
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
手元のタスクを実行するために Managed Airflow サービス アカウントに必要な IAM 権限がない場合でも、Pod のタイムアウトは発生する場合があります。この状態を確認するには、GKE ダッシュボードで特定のワークロードのログを調べて Pod レベルのエラーを確認するか、Cloud Logging を使用します。
新しい接続の確立に失敗した
GKE クラスタでは、自動アップグレードがデフォルトで有効になっています。 アップグレード中のクラスタにノードプールが存在する場合、次のエラーが表示される場合があります。
<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused
クラスタがアップグレードされているかどうかを確認するには、 Google Cloud コンソールで [Kubernetes クラスタ] ページに移動し、環境のクラスタ名の横にある読み込みアイコンを確認します。