Cloud Run Functions 및 Airflow REST API로 Cloud Composer DAG 트리거

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

이 페이지에서는 Cloud Run Functions를 사용하여 이벤트에 대한 응답으로 Cloud Composer DAG를 트리거하는 방법을 설명합니다.

Apache Airflow는 DAG를 정기적으로 실행하도록 설계되어 있지만 이벤트에 대한 응답으로 DAG를 트리거할 수도 있습니다. 이를 수행하는 한 가지 방법은 Cloud Run Functions를 사용하여 지정된 이벤트가 발생할 때 Cloud Composer DAG를 트리거하는 것입니다.

이 가이드의 예는 Cloud Storage 버킷에서 변경사항이 발생할 때마다 DAG를 실행합니다. 버킷의 객체가 변경되면 함수가 트리거됩니다. 이 함수는 Cloud Composer 환경의 Airflow REST API를 요청합니다. Airflow가 이 요청을 처리하고 DAG를 실행합니다. DAG는 변경사항에 대한 정보를 출력합니다.

시작하기 전에

환경의 네트워킹 구성 확인

비공개 IP 및 VPC 서비스 제어 구성에서는 Cloud Run Functions에서 Airflow 웹 서버로의 연결을 구성할 수 없으므로 해당 구성에서는 이 솔루션이 작동하지 않습니다.

Cloud Composer 2에서 또 다른 접근 방법인 Cloud Run Functions 및 Pub/Sub 메시지를 사용하여 DAG 트리거를 사용할 수 있습니다.

프로젝트에 API 사용 설정

콘솔

Cloud Composer 및 Cloud Run Functions API를 사용 설정합니다.

API 사용 설정에 필요한 역할

API를 사용 설정하려면 serviceusage.services.enable 권한이 포함된 서비스 사용량 관리자 IAM 역할(roles/serviceusage.serviceUsageAdmin)이 필요합니다. 역할 부여 방법 알아보기.

API 사용 설정

gcloud

Cloud Composer 및 Cloud Run Functions API를 사용 설정합니다.

API 사용 설정에 필요한 역할

API를 사용 설정하려면 역할 (roles/serviceusage.serviceUsageAdmin)이 포함된 서비스 사용량 관리자 IAM serviceusage.services.enable 권한이 필요합니다. 역할 부여 방법 알아보기.

gcloud services enable cloudfunctions.googleapis.com composer.googleapis.com

Airflow REST API 사용 설정

Airflow 버전에 따라 다음 안내를 따르세요.

웹 서버 액세스 제어를 사용하여 Airflow REST API에 대한 API 호출 허용

Cloud Run Functions는 IPv4 또는 IPv6 주소를 사용하여 Airflow REST API에 연결할 수 있습니다.

호출 IP 범위를 모르는 경우 웹 서버 액세스 제어의 기본 구성 옵션인 All IP addresses have access (default)를 사용하여 Cloud Run Functions를 실수로 차단하지 않도록 합니다.

Cloud Storage 버킷 만들기

이 예시에서는 Cloud Storage 버킷의 변경사항에 대한 응답으로 DAG를 트리거합니다. 이 예시에 사용할 새 버킷을 만듭니다.

Airflow 웹 서버 URL 가져오기

이 예시에서는 Airflow 웹 서버 엔드포인트에 REST API 요청을 보냅니다. Cloud 함수 코드에서 .appspot.com 앞에 Airflow 웹 인터페이스 URL의 일부를 사용합니다.

콘솔

  1. 콘솔에서 환경 페이지로 이동합니다. Google Cloud

    환경으로 이동

  2. 환경 이름을 클릭합니다.

  3. 환경 세부정보 페이지에서 환경 구성 탭으로 이동합니다.

  4. Airflow 웹 서버의 URL이 Airflow 웹 UI 항목에 나열됩니다.

gcloud

다음 명령어를 실행합니다.

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format='value(config.airflowUri)'

다음과 같이 바꿉니다.

  • ENVIRONMENT_NAME: 환경 이름
  • LOCATION을 환경이 위치한 리전으로 바꿉니다.

IAM 프록시의 client_id 가져오기

Airflow REST API 엔드포인트를 요청하기 위해 함수에는 Airflow 웹 서버를 보호하는 Identity and Access Management 프록시의 클라이언트 ID가 필요합니다.

Cloud Composer는 이 정보를 직접 제공하지 않습니다. 대신 Airflow 웹 서버에 인증되지 않은 요청을 하고 리디렉션 URL에서 클라이언트 ID를 캡처합니다.

cURL

curl -v AIRFLOW_URL 2>&1 >/dev/null | grep -o "client_id\=[A-Za-z0-9-]*\.apps\.googleusercontent\.com"

AIRFLOW_URL를 Airflow 웹 인터페이스의 URL로 바꿉니다.

출력에서 client_id 다음에 오는 문자열을 검색합니다. 예를 들면 다음과 같습니다.

client_id=836436932391-16q2c5f5dcsfnel77va9bvf4j280t35c.apps.googleusercontent.com

Python

다음 코드를 get_client_id.py 파일에 저장합니다. project_id, location, composer_environment 값을 입력한 후 Cloud Shell이나 로컬 환경에서 코드를 실행합니다.

# This script is intended to be used with Composer 1 environments
# In Composer 2, the Airflow Webserver is not in the tenant project
# so there is no tenant client ID
# See https://cloud.google.com/composer/docs/composer-2/environment-architecture
# for more details
import google.auth
import google.auth.transport.requests
import requests
import six.moves.urllib.parse

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
composer_response = authed_session.request("GET", environment_url)
environment_data = composer_response.json()
composer_version = environment_data["config"]["softwareConfig"]["imageVersion"]
if "composer-1" not in composer_version:
    version_error = (
        "This script is intended to be used with Composer 1 environments. "
        "In Composer 2, the Airflow Webserver is not in the tenant project, "
        "so there is no tenant client ID. "
        "See https://cloud.google.com/composer/docs/composer-2/environment-architecture for more details."
    )
    raise (RuntimeError(version_error))
airflow_uri = environment_data["config"]["airflowUri"]

# The Composer environment response does not include the IAP client ID.
# Make a second, unauthenticated HTTP request to the web server to get the
# redirect URI.
redirect_response = requests.get(airflow_uri, allow_redirects=False)
redirect_location = redirect_response.headers["location"]

# Extract the client_id query parameter from the redirect.
parsed = six.moves.urllib.parse.urlparse(redirect_location)
query_string = six.moves.urllib.parse.parse_qs(parsed.query)
print(query_string["client_id"][0])

환경에 DAG를 업로드합니다.

환경에 DAG를 업로드합니다. 다음 예시 DAG는 수신된 DAG 실행 구성을 출력합니다. 이 가이드에서 나중에 만드는 함수로부터 이 DAG를 트리거합니다.

import datetime

import airflow
from airflow.operators.bash import BashOperator


with airflow.DAG(
    "composer_sample_trigger_response_dag",
    start_date=datetime.datetime(2021, 1, 1),
    # Not scheduled, trigger only
    schedule_interval=None,
) as dag:
    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = BashOperator(
        task_id="print_gcs_info", bash_command="echo {{ dag_run.conf }}"
    )

DAG를 트리거하는 Cloud 함수 배포

Cloud Run Functions 또는 Cloud Run에서 지원되는 선호 언어를 사용하여 Cloud 함수를 배포할 수 있습니다. 이 튜토리얼에서는 PythonJava로 구현된 Cloud 함수를 보여줍니다.

Cloud 함수 구성 매개변수 지정

  • 트리거. 이 예시에서는 새 객체가 버킷에 생성될 때 또는 기존 객체를 덮어쓸 때 작동하는 트리거를 선택합니다.

    • 트리거 유형. Cloud Storage

    • 이벤트 유형. 완료/생성.

    • 버킷. 이 함수를 트리거해야 하는 버킷을 선택합니다.

    • 실패 시 재시도. 이 예시에서는 이 옵션을 사용 중지하는 것이 좋습니다. 프로덕션 환경에서 함수를 사용할 경우 일시적인 오류를 처리하기 위해 이 옵션을 사용 설정합니다.

  • 런타임, 빌드, 연결, 보안 설정 섹션의 런타임 서비스 계정. 환경설정에 따라 다음 옵션 중 하나를 사용합니다.

    • Compute Engine 기본 서비스 계정을 선택합니다. 기본 IAM 권한의 경우 이 계정이 Cloud Composer 환경에 액세스하는 함수를 실행할 수 있습니다.

    • Composer User 역할이 있는 커스텀 서비스 계정을 만들고 이 함수에 대한 런타임 서비스 계정으로 지정합니다. 이 옵션은 최소 권한 원칙을 따릅니다.

  • 코드 단계의 런타임 및 진입점. 이 예시의 코드를 추가할 때 Python 3.7 또는 더 최근의 런타임을 선택하고 trigger_dag를 시작점으로 지정합니다.

요구사항 추가

requirements.txt 파일에 종속 항목을 지정합니다.

requests-toolbelt==1.0.0
google-auth==2.38.0
google-cloud-pubsub==2.28.0

다음 코드를 main.py 파일에 넣고 다음과 같이 바꿉니다.

  • client_id 변수 값을 이전에 가져온 client_id 값으로 바꿉니다.

  • webserver_id 변수 값을 .appspot.com 앞의 Airflow 웹 인터페이스 URL에 포함된 테넌트 프로젝트 ID로 바꿉니다. 이전에 Airflow 웹 인터페이스 URL을 가져왔습니다.

  • 사용할 Airflow REST API 버전을 지정합니다.

    • 안정적인 Airflow REST API를 사용하는 경우 USE_EXPERIMENTAL_API 변수를 False로 설정합니다.
    • 실험용 Airflow REST API를 사용하는 경우 변경할 필요가 없습니다. USE_EXPERIMENTAL_API 변수가 이미 True로 설정되어 있습니다.


from google.auth.transport.requests import Request
from google.oauth2 import id_token
import requests


IAM_SCOPE = "https://www.googleapis.com/auth/iam"
OAUTH_TOKEN_URI = "https://www.googleapis.com/oauth2/v4/token"
# If you are using the stable API, set this value to False
# For more info about Airflow APIs see https://cloud.google.com/composer/docs/access-airflow-api
USE_EXPERIMENTAL_API = True


def trigger_dag(data, context=None):
    """Makes a POST request to the Composer DAG Trigger API

    When called via Google Cloud Functions (GCF),
    data and context are Background function parameters.

    For more info, refer to
    https://cloud.google.com/functions/docs/writing/background#functions_background_parameters-python

    To call this function from a Python script, omit the ``context`` argument
    and pass in a non-null value for the ``data`` argument.

    This function is currently only compatible with Composer v1 environments.
    """

    # Fill in with your Composer info here
    # Navigate to your webserver's login page and get this from the URL
    # Or use the script found at
    # https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/composer/rest/get_client_id.py
    client_id = "YOUR-CLIENT-ID"
    # This should be part of your webserver's URL:
    # {tenant-project-id}.appspot.com
    webserver_id = "YOUR-TENANT-PROJECT"
    # The name of the DAG you wish to trigger
    dag_name = "composer_sample_trigger_response_dag"

    if USE_EXPERIMENTAL_API:
        endpoint = f"api/experimental/dags/{dag_name}/dag_runs"
        json_data = {"conf": data, "replace_microseconds": "false"}
    else:
        endpoint = f"api/v1/dags/{dag_name}/dagRuns"
        json_data = {"conf": data}
    webserver_url = "https://" + webserver_id + ".appspot.com/" + endpoint
    # Make a POST request to IAP which then Triggers the DAG
    make_iap_request(webserver_url, client_id, method="POST", json=json_data)


# This code is copied from
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/iap/make_iap_request.py
# START COPIED IAP CODE
def make_iap_request(url, client_id, method="GET", **kwargs):
    """Makes a request to an application protected by Identity-Aware Proxy.
    Args:
      url: The Identity-Aware Proxy-protected URL to fetch.
      client_id: The client ID used by Identity-Aware Proxy.
      method: The request method to use
              ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                If no timeout is provided, it is set to 90 by default.
    Returns:
      The page body, or raises an exception if the page couldn't be retrieved.
    """
    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    # Obtain an OpenID Connect (OIDC) token from metadata server or using service
    # account.
    google_open_id_connect_token = id_token.fetch_id_token(Request(), client_id)

    # Fetch the Identity-Aware Proxy-protected URL, including an
    # Authorization header containing "Bearer " followed by a
    # Google-issued OpenID Connect token for the service account.
    resp = requests.request(
        method,
        url,
        headers={"Authorization": "Bearer {}".format(google_open_id_connect_token)},
        **kwargs,
    )
    if resp.status_code == 403:
        raise Exception(
            "Service account does not have permission to "
            "access the IAP-protected application."
        )
    elif resp.status_code != 200:
        raise Exception(
            "Bad response from application: {!r} / {!r} / {!r}".format(
                resp.status_code, resp.headers, resp.text
            )
        )
    else:
        return resp.text


# END COPIED IAP CODE

함수 테스트

함수 및 DAG가 의도한 대로 작동하는지 확인하기 위해 다음 안내를 따르세요.

  1. 함수가 배포될 때까지 기다립니다.
  2. 파일을 Cloud Storage 버킷에 업로드합니다. 또는 콘솔에서 함수 테스트 작업을 선택하여 함수를 수동으로 트리거할 수 있습니다. Google Cloud
  3. Airflow 웹 인터페이스에서 DAG 페이지를 확인합니다. DAG는 DAG 실행이 활성 상태 또는 이미 완료된 상태여야 합니다.
  4. Airflow UI에서 이 실행의 작업 로그를 확인합니다. print_gcs_info 태스크가 함수에서 수신된 데이터를 로그에 출력하는지 확인해야 합니다.
[2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
[2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
    {bucket: example-storage-for-gcf-triggers, contentType: text/plain,
    crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
    ... }
[2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
    return code 0h

다음 단계