Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Apache Airflow bietet eine REST API-Schnittstelle, mit der Sie Aufgaben wie das Abrufen von Informationen zu DAG-Ausführungen und -Aufgaben, das Aktualisieren von DAGs, das Abrufen von Airflow-Konfigurationen, das Hinzufügen und Löschen von Verbindungen und das Auflisten von Nutzern ausführen können.
Ein Beispiel für die Verwendung der Airflow REST API mit Cloud Run Functions finden Sie unter DAGs mit Cloud Run Functions auslösen.
Airflow REST API-Versionen
- Airflow 3 verwendet die Airflow REST API v2.
- Airflow 2 verwendet die Airflow REST API v1.
Airflow REST API konfigurieren
Airflow 3
Airflow 3 verwendet die Airflow REST API v2.
Cloud Composer verwendet ein eigenes API-Authentifizierungs-Backend.Die Autorisierung funktioniert wie gewohnt von Airflow 3.
Wenn ein neuer Nutzer über die API autorisiert wird, erhält das Nutzerkonto standardmäßig die Rolle Op.
Die Airflow REST API in Airflow 3 ist immer aktiviert und kann nicht deaktiviert werden. Sie können die Standardnutzerrolle ändern, indem Sie die folgende Airflow-Konfigurationsoption überschreiben:
| Bereich | Schlüssel | Wert | Hinweise |
|---|---|---|---|
api
|
composer_auth_user_registration_role
|
Op
|
Sie können eine beliebige andere Rolle angeben |
Airflow 2
Die Airflow REST API v1 ist in Airflow 2 standardmäßig aktiviert.
Cloud Composer verwendet ein eigenes API-Authentifizierungs-Backend.Die Autorisierung funktioniert wie gewohnt von Airflow. Wenn ein neuer Nutzer über die API autorisiert wird, erhält das Nutzerkonto standardmäßig die Rolle Op.
Sie können die Airflow REST API aktivieren oder deaktivieren oder die Standardnutzerrolle ändern. Dazu überschreiben Sie die folgenden Airflow-Konfigurationsoptionen:
| Bereich | Schlüssel | Wert | Hinweise |
|---|---|---|---|
api
|
auth_backends
|
airflow.composer.api.backend.composer_auth
|
Wenn Sie die Airflow REST API deaktivieren möchten, ändern Sie die Einstellung zu airflow.api.auth.backend.deny_all. |
api
|
composer_auth_user_registration_role
|
Op
|
Sie können eine beliebige andere Rolle angeben |
API-Aufrufe an die Airflow REST API über die Zugriffssteuerung des Webservers zulassen
Je nach Methode, die zum Aufrufen der Airflow REST API verwendet wird, kann die Aufrufmethode entweder eine IPv4- oder eine IPv6-Adresse verwenden. Denken Sie daran, den IP-Traffic zur Airflow REST API mithilfe der Webserver-Zugriffskontrolle zu entsperren.
Verwenden Sie die Standardkonfigurationsoption All IP addresses have access (default), wenn Sie nicht sicher sind, von welchen IP-Adressen Ihre Aufrufe der Airflow REST API gesendet werden.
Aufrufe an Airflow REST API senden
Dieser Abschnitt enthält ein Beispielskript in Python, mit dem Sie DAGs mit der Airflow REST API auslösen können.
Legen Sie im Skript die folgenden Variablen fest:
dag_id: Name eines DAG, wie in der DAG-Quelldatei definiert.dag_config: Konfiguration für die DAG-Ausführung.web_server_url: Ihre Airflow-Webserver-URL. Das Format dafür isthttps://<web-server-id>.composer.googleusercontent.com.(Airflow 3)
logical_date: Das logische Datum für die DAG-Ausführung.
Airflow 3
"""
Trigger a DAG in Cloud Composer 3 environment with Airflow 3 using the Airflow REST API v2.
"""
from __future__ import annotations
from typing import Any
import google.auth
from google.auth.transport.requests import AuthorizedSession
import requests
# Following best practices, these credentials should be
# constructed at start-up time and used throughout
# https://cloud.google.com/apis/docs/client-libraries-best-practices
AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])
def make_composer3_web_server_request(
url: str, method: str = "GET", **kwargs: Any
) -> google.auth.transport.Response:
"""
Make a request to Cloud Composer 3 environment's web server with Airflow 3.
Args:
url: The URL to fetch.
method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
'PATCH', 'DELETE')
**kwargs: Any of the parameters defined for the request function:
https://github.com/requests/requests/blob/master/requests/api.py
If no timeout is provided, it is set to 90 by default.
"""
authed_session = AuthorizedSession(CREDENTIALS)
# Set the default timeout, if missing
if "timeout" not in kwargs:
kwargs["timeout"] = 90
return authed_session.request(method, url, **kwargs)
def trigger_dag(web_server_url: str, dag_id: str, data: dict, logical_date: str) -> str:
"""
Make a request to trigger a DAG using the Airflow REST API v2.
https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html
Args:
web_server_url: The URL of the Airflow 3 web server.
dag_id: The DAG ID.
data: Additional configuration parameters for the DAG run (json).
"""
endpoint = f"api/v2/dags/{dag_id}/dagRuns"
request_url = f"{web_server_url}/{endpoint}"
json_data = {"conf": data, "logical_date": logical_date}
response = make_composer3_web_server_request(
request_url, method="POST", json=json_data
)
if response.status_code == 403:
raise requests.HTTPError(
"You do not have a permission to perform this operation. "
"Check Airflow RBAC roles for your account."
f"{response.headers} / {response.text}"
)
elif response.status_code != 200:
response.raise_for_status()
else:
return response.text
if __name__ == "__main__":
# TODO(developer): replace with your values
dag_id = "airflow_monitoring" # Replace with the ID of the DAG that you want to run.
dag_config = {
"your-key": "your-value"
} # Replace with configuration parameters for the DAG run.
# Replace web_server_url with the Airflow web server address. To obtain this
# URL, run the following command for your environment:
# gcloud composer environments describe example-environment \
# --location=your-composer-region \
# --format="value(config.airflowUri)"
logical_date = "2025-01-01T14:00:00Z" # Replace with the data interval for which to run the DAG
web_server_url = (
"https://example-airflow-ui-url-dot-us-central1.composer-staging.googleusercontent.com"
)
response_text = trigger_dag(
web_server_url=web_server_url, dag_id=dag_id, data=dag_config, logical_date=logical_date
)
print(response_text)
Airflow 2
Mit einem Dienstkonto auf die Airflow REST API zugreifen
In der Airflow-Datenbank in Airflow-Versionen vor 2.3.0 ist die Länge des E-Mail-Felds auf 64 Zeichen begrenzt. Dienstkonten haben manchmal E‑Mail-Adressen, die länger als 64 Zeichen sind. Es ist nicht möglich, Airflow-Nutzer für solche Dienstkonten auf die übliche Weise zu erstellen. Wenn für ein solches Dienstkonto kein Airflow-Nutzer vorhanden ist, führt der Zugriff auf die Airflow REST API zu den HTTP-Fehlern 401 und 403.
Als Workaround können Sie einen Airflow-Nutzer für ein Dienstkonto vorregistrieren. Verwenden Sie dazu accounts.google.com:NUMERIC_USER_ID als Nutzernamen und einen beliebigen eindeutigen String als E-Mail-Adresse.
So rufen Sie
NUMERIC_USER_IDfür ein Dienstkonto ab:gcloud iam service-accounts describe \ SA_NAME@PROJECT_ID.iam.gserviceaccount.com \ --format="value(oauth2ClientId)"Ersetzen Sie:
SA_NAMEdurch den Namen des Dienstkontos.PROJECT_IDdurch die Projekt-ID.
Erstellen Sie einen Airflow-Nutzer mit der Rolle
Opfür das Dienstkonto:Airflow-UI
Gehen Sie zu Sicherheit > Nutzer auflisten und klicken Sie auf Neuen Datensatz hinzufügen. Ihr Airflow-Nutzer muss die Rolle
Adminhaben, um diese Seite zu öffnen.Geben Sie
accounts.google.com:NUMERIC_USER_IDals Nutzernamen an. Ersetzen SieNUMERIC_USER_IDdurch die Nutzer-ID, die Sie im vorherigen Schritt erhalten haben.Geben Sie eine eindeutige Kennung als E‑Mail-Adresse an. Sie können einen beliebigen eindeutigen String verwenden.
Geben Sie die Rolle für den Nutzer an. Beispiel:
Op.Achten Sie darauf, dass das Kästchen Aktiv? angeklickt ist.
Geben Sie den Vor- und Nachnamen des Nutzers an. Sie können einen beliebigen String verwenden.
Klicken Sie auf Speichern.
gcloud
Führen Sie den folgenden Airflow-Befehlszeilenbefehl aus:
gcloud composer environments run ENVIRONMENT_NAME \ --location LOCATION \ users create -- \ -u accounts.google.com:NUMERIC_USER_ID \ -e UNIQUE_ID \ -f UNIQUE_ID \ -l - -r Op --use-random-passwordErsetzen Sie:
ENVIRONMENT_NAMEdurch den Namen der Umgebung.LOCATIONdurch die Region, in der sich die Umgebung befindet.- Ersetzen Sie
NUMERIC_USER_IDdurch die Nutzer-ID, die Sie im vorherigen Schritt erhalten haben. UNIQUE_IDdurch die Kennung für den Airflow-Nutzer. Sie können einen beliebigen eindeutigen String verwenden.
Nachdem Sie einen Airflow-Nutzer für ein Dienstkonto erstellt haben, wird ein Aufrufer, der als Dienstkonto authentifiziert ist, als vorregistrierter Nutzer erkannt und in Airflow angemeldet.
Airflow REST API-Komponente skalieren
Airflow REST API- und Airflow UI-Endpunkte werden auf dem Airflow-Webserver ausgeführt. Wenn Sie die REST API intensiv nutzen, sollten Sie die Menge an CPU und Arbeitsspeicher, die für den Airflow-Webserver verfügbar ist, entsprechend der erwarteten Last erhöhen.