Managed Airflow (Gen 3) | Managed Airflow (Gen 2) | Managed Airflow (Legacy Gen 1)
Auf dieser Seite wird erläutert, wie die Planung und das Auslösen von DAGs in Airflow funktionieren, wie Sie einen Zeitplan für einen DAG definieren und wie Sie einen DAG manuell auslösen oder pausieren.
Airflow-DAGs in Managed Airflow
Airflow-DAGs in Managed Airflow werden in einer oder mehreren Managed Airflow-Umgebungen in Ihrem Projekt ausgeführt. Sie laden die Quelldateien Ihrer Airflow-DAGs in einen Cloud Storage-Bucket hoch, der mit einer Umgebung verknüpft ist. Die Instanz von Airflow in der Umgebung parst dann diese Dateien und plant DAG-Ausführungen gemäß dem Zeitplan der einzelnen DAGs. Während einer DAG-Ausführung plant und führt Airflow einzelne Aufgaben aus, aus denen ein DAG besteht, und zwar in einer Reihenfolge, die durch den DAG definiert ist.
Weitere Informationen zu den Kernkonzepten von Airflow wie Airflow-DAGs, DAG-Ausführungen, Aufgaben oder Operatoren finden Sie auf der Seite Kernkonzepte in der Airflow-Dokumentation.
DAG-Planung in Airflow
Airflow bietet die folgenden Konzepte für den Planungsmechanismus:
- Logisches Datum
Stellt ein Datum dar, für das eine bestimmte DAG-Ausführung ausgeführt wird.
Dies ist nicht das tatsächliche Datum, an dem Airflow einen DAG ausführt, sondern ein Zeitraum, der von einer bestimmten DAG-Ausführung verarbeitet werden muss. Bei einem DAG, der beispielsweise täglich um 12:00 Uhr ausgeführt werden soll, ist das logische Datum ebenfalls 12:00 Uhr an einem bestimmten Tag. Da er zweimal täglich ausgeführt wird, muss er den Zeitraum der letzten 12 Stunden verarbeiten. Gleichzeitig wird das logische Datum oder das Zeitintervall möglicherweise überhaupt nicht in der im DAG selbst definierten Logik verwendet. Ein DAG kann beispielsweise einmal täglich dasselbe Skript ausführen, ohne den Wert des logischen Datums zu verwenden.
In Airflow-Versionen vor 2.2 wird dieses Datum als Ausführungsdatum bezeichnet.
- Ausführungsdatum
Stellt ein Datum dar, an dem eine bestimmte DAG-Ausführung ausgeführt wird.
Bei einem DAG, der beispielsweise täglich um 12:00 Uhr ausgeführt werden soll, kann die tatsächliche Ausführung des DAG um 12:05 Uhr erfolgen, also einige Zeit nach dem logischen Datum.
- Zeitplanintervall
Gibt an, wann und wie oft ein DAG in Bezug auf logische Daten ausgeführt werden muss.
Ein täglicher Zeitplan bedeutet beispielsweise, dass ein DAG einmal pro Tag ausgeführt wird und die logischen Daten für die DAG-Ausführungen Intervalle von 24 Stunden haben.
- Startdatum
Gibt an, wann Airflow Ihren DAG planen soll.
Aufgaben in Ihrem DAG können individuelle Startzeiten haben oder Sie können ein einziges Startdatum für alle Aufgaben festlegen. Basierend auf dem Mindeststartdatum für Aufgaben in Ihrem DAG und dem Zeitplanintervall plant Airflow DAG-Ausführungen.
- Catchup, Backfill und Wiederholungen
Mechanismen zum Ausführen von DAG-Ausführungen für vergangene Daten.
Mit Catchup werden DAG-Ausführungen ausgeführt, die noch nicht ausgeführt wurden, z. B. wenn der DAG für einen längeren Zeitraum pausiert und dann wieder aktiviert wurde. Mit Backfill können Sie DAG-Ausführungen für einen bestimmten Zeitraum ausführen. Mit Wiederholungen wird angegeben, wie viele Versuche Airflow beim Ausführen von Aufgaben aus einem DAG unternehmen muss.
Die Planung funktioniert folgendermaßen:
Nach dem Startdatum wartet Airflow auf das nächste Auftreten des Zeitplanintervalls.
Airflow plant die erste DAG-Ausführung am Ende dieses Zeitplanintervalls.
Wenn ein DAG beispielsweise jede Stunde ausgeführt werden soll und das Startdatum um 12:00 Uhr ist, erfolgt die erste DAG-Ausführung heute um 13:00 Uhr.
Im Abschnitt Airflow-DAG planen dieses Dokuments wird beschrieben, wie Sie die Planung für Ihre DAGs mithilfe dieser Konzepte einrichten. Weitere Informationen zu DAG-Ausführungen und zur Planung finden Sie in der Airflow-Dokumentation unter DAG-Ausführungen.
Möglichkeiten zum Auslösen eines DAG
Airflow bietet die folgenden Möglichkeiten zum Auslösen eines DAG:
Trigger nach Zeitplan starten. Airflow löst den DAG automatisch anhand des Zeitplans aus, der in der DAG-Datei angegeben ist.
Manuell auslösen. Sie können einen DAG manuell über die Google Cloud Console, die Airflow-UI oder durch einen Airflow-Befehlszeilenbefehl von der Google Cloud CLI auslösen.
Trigger als Reaktion auf Ereignisse. Als Standardmethode zum Auslösen eines DAG als Reaktion auf Ereignisse wird die Verwendung eines Sensors verwendet.
Weitere Möglichkeiten zum Auslösen von DAGs:
Programmatisch auslösen. Sie können einen DAG mit der Airflow REST API auslösen. Zum Beispiel aus einem Python-Skript.
Programmatisch als Reaktion auf Ereignisse auslösen. Sie können DAGs als Reaktion auf Ereignisse mithilfe von Cloud Run-Funktionen und der Airflow REST APIauslösen.
Hinweis
- Ihr Konto muss eine Rolle haben, mit der Objekte in den Umgebungs-Buckets verwaltet sowie DAGs aufgerufen und ausgelöst werden können. Weitere Informationen finden Sie unter Zugriffssteuerung.
Airflow-DAG planen
Sie definieren einen Zeitplan für einen DAG in der DAG-Datei. Bearbeiten Sie die Definition des DAG so:
Suchen und bearbeiten Sie die DAG-Datei auf Ihrem Computer. Wenn Sie die DAG Datei nicht haben, können Sie eine Kopie aus dem Bucket der Umgebung herunterladen. Bei einem neuen DAG können Sie alle Parameter definieren, wenn Sie die DAG-Datei erstellen.
Definieren Sie den Zeitplan im Parameter
schedule. Sie können einen Cron-Ausdruck wie0 0 * * *oder eine Voreinstellung wie@dailyverwenden. Weitere Informationen finden Sie in der Airflow-Dokumentation unter Cron-Ausdrücke und Zeitintervalle.Airflow bestimmt logische Daten für DAG-Ausführungen basierend auf dem von Ihnen festgelegten Zeitplan.
Definieren Sie das Startdatum im Parameter
start_date.Airflow bestimmt das logische Datum der ersten DAG-Ausführung anhand dieses Parameters.
(Optional) Definieren Sie im Parameter
catchup, ob Airflow alle vorherigen Ausführungen dieses DAG vom Startdatum bis zum aktuellen Datum ausführen muss, die noch nicht ausgeführt wurden.Bei DAG-Ausführungen, die während des Catchups ausgeführt werden, liegt das logische Datum in der Vergangenheit und das Ausführungsdatum gibt den Zeitpunkt an, zu dem die DAG-Ausführung tatsächlich ausgeführt wurde.
(Optional) Definieren Sie im Parameter
retries, wie oft Airflow Aufgaben wiederholen muss, die fehlgeschlagen sind (jeder DAG besteht aus einer oder mehreren einzelnen Aufgaben). Standardmäßig werden Aufgaben in Managed Airflow zweimal wiederholt.Laden Sie die neue Version des DAG in den Bucket der Umgebung hoch.
Warten Sie, bis Airflow den DAG erfolgreich geparst hat. Sie können beispielsweise die Liste der DAGs in Ihrer Umgebung in der Google Cloud Console oder in der Airflow-UI prüfen.
Die folgende Beispiel-DAG-Definition wird zweimal täglich um 00:00 Uhr und 12:00 Uhr ausgeführt. Das Startdatum ist auf den 1. Januar 2024 festgelegt. Airflow führt den DAG jedoch nicht für vergangene Daten aus, nachdem Sie ihn hochgeladen oder pausiert haben, da der Catchup deaktiviert ist.
Der DAG enthält eine Aufgabe mit dem Namen insert_query_job, die mit dem Operator BigQueryInsertJobOperator eine Zeile in eine Tabelle einfügt. Dieser Operator ist einer der
Google Cloud BigQuery-Operatoren,
mit denen Sie Datasets und Tabellen verwalten, Abfragen ausführen und Daten validieren können.
Wenn eine bestimmte Ausführung dieser Aufgabe fehlschlägt, wiederholt Airflow sie mit dem Standardintervall für Wiederholungen vier weitere Male. Das logische Datum für diese Wiederholungen bleibt gleich.
Die SQL-Abfrage für diese Zeile verwendet Airflow-Vorlagen, um das logische Datum und den Namen des DAG in die Zeile zu schreiben.
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
with DAG(
"bq_example_scheduling_dag",
start_date=datetime.datetime(2024, 1, 1),
schedule='0 */12 * * *',
catchup=False
) as dag:
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
retries=4,
configuration={
"query": {
# schema: date (string), description (string)
# example row: "20240101T120000", "DAG run: <DAG: bq_example_scheduling_dag>"
"query": "INSERT example_dataset.example_table VALUES ('{{ ts_nodash }}', 'DAG run: {{ dag }}' )",
"useLegacySql": False,
"priority": "BATCH",
}
},
location="us-central1"
)
insert_query_job
Wenn Sie diesen DAG testen möchten, können Sie ihn manuell auslösen und sich dann die Logs der Aufgabenausführung ansehen.
Weitere Beispiele für Planungsparameter
Die folgenden Beispiele für Planungsparameter zeigen, wie die Planung mit verschiedenen Parameterkombinationen funktioniert:
Wenn
start_dateden Wertdatetime(2024, 4, 4, 16, 25)undscheduleden Wert30 16 * * *hat, erfolgt die erste DAG-Ausführung am 5. April 2024 um 16:30 Uhr.Wenn
start_dateden Wertdatetime(2024, 4, 4, 16, 35)undscheduleden Wert30 16 * * *hat, erfolgt die erste DAG-Ausführung am 6. April 2024 um 16:30 Uhr. Da das Startdatum nach dem Zeitplanintervall am 4. April 2024 liegt, erfolgt die DAG-Ausführung nicht am 5. April 2024. Stattdessen endet das Zeitplanintervall am 5. April 2024 um 16:35 Uhr. Die nächste DAG-Ausführung wird also am folgenden Tag um 16:30 Uhr geplant.Wenn
start_dateden Wertdatetime(2024, 4, 4)undscheduleden Wert@dailyhat, wird die erste DAG-Ausführung am 5. April 2024 um 00:00 Uhr geplant.Wenn
start_dateden Wertdatetime(2024, 4, 4, 16, 30)undscheduleden Wert0 * * * *hat, wird die erste DAG-Ausführung am 4. April 2024 um 18:00 Uhr geplant. Nach dem angegebenen Datum und der angegebenen Uhrzeit plant Airflow eine DAG-Ausführung zur Minute 0 jeder Stunde. Der nächste Zeitpunkt, zu dem dies der Fall ist, ist 17:00 Uhr. Zu diesem Zeitpunkt plant Airflow eine DAG-Ausführung am Ende des Zeitplanintervalls, also um 18:00 Uhr.
DAG manuell auslösen
Wenn Sie einen Airflow-DAG manuell auslösen, führt Airflow den DAG einmal aus, unabhängig vom Zeitplan, der in der DAG-Datei angegeben ist.
Console
Die DAG-UI wird in Managed Airflow 1.17.8 und höheren Versionen unterstützt.
So lösen Sie einen DAG über die Google Cloud Console aus:
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Wählen Sie eine Umgebung aus, um die zugehörigen Details anzuzeigen.
Wechseln Sie auf der Seite Umgebungsdetails zum Tab DAGs.
Klicken Sie auf den Namen eines DAG.
Klicken Sie auf der Seite DAG-Details auf DAG auslösen. Eine neue DAG-Ausführung wird erstellt.
Airflow-UI
So lösen Sie einen DAG über die Airflow-UI aus:
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Spalte Airflow-Webserver auf den Link Airflow für Ihre Umgebung.
Melden Sie sich mit einem Google-Konto an, das über die entsprechenden Berechtigungen verfügt.
Klicken Sie in der Airflow-UI auf der Seite DAGs für Ihren DAG auf die Schaltfläche DAG auslösen.
gcloud
Führen Sie in Airflow 1.10.12 oder früher den Airflow-Befehlszeilenbefehl trigger_dag aus:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
trigger_dag -- DAG_ID
Führen Sie in Airflow 1.10.14 oder höher, einschließlich Airflow 2, den Airflow-Befehlszeilenbefehl dags trigger aus:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags trigger -- DAG_ID
Ersetzen Sie Folgendes:
ENVIRONMENT_NAME: der Name Ihrer UmgebungLOCATION: die Region, in der sich die Umgebung befindetDAG_ID: der Name des DAG
Weitere Informationen zum Ausführen von Befehlen der Airflow-Befehlszeile in Managed Airflow-Umgebungen finden Sie unter Befehle der Airflow-Befehlszeile ausführen.
Weitere Informationen zu den verfügbaren Airflow-Befehlszeilenbefehlen finden Sie unter
der gcloud composer environments run Befehlsreferenz.
Logs und Details zu DAG-Ausführungen ansehen
In der Google Cloud Console haben Sie folgende Möglichkeiten:
- Status vergangener DAG-Ausführungen und DAG-Details ansehen.
- Detaillierte Logs aller DAG-Ausführungen und aller Aufgaben aus diesen DAGs ansehen
- DAG-Statistiken ansehen
Darüber hinaus bietet Managed Airflow Zugriff auf die Airflow-UI, die eigene Weboberfläche von Airflow.
DAG pausieren
Console
Die DAG-UI wird in Managed Airflow 1.17.8 und höheren Versionen unterstützt.
So pausieren Sie einen DAG über die Google Cloud Console:
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Wählen Sie eine Umgebung aus, um die zugehörigen Details anzuzeigen.
Wechseln Sie auf der Seite Umgebungsdetails zum Tab DAGs.
Klicken Sie auf den Namen eines DAG.
Klicken Sie auf der Seite DAG-Details auf DAG pausieren.
Airflow-UI
So pausieren Sie einen DAG über die Airflow-UI:
- Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Spalte Airflow-Webserver auf den Link Airflow für Ihre Umgebung.
Melden Sie sich mit einem Google-Konto an, das über die entsprechenden Berechtigungen verfügt.
Klicken Sie in der Airflow-Weboberfläche auf der Seite DAGs auf den Schalter neben dem Namen des DAG.
gcloud
Führen Sie in Airflow 1.10.12 oder früher den Airflow-Befehlszeilenbefehl pause aus:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
pause -- DAG_ID
Führen Sie in Airflow 1.10.14 oder höher, einschließlich Airflow 2, den Airflow-Befehlszeilenbefehl dags pause aus:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags pause -- DAG_ID
Ersetzen Sie Folgendes:
ENVIRONMENT_NAME: der Name Ihrer UmgebungLOCATION: die Region, in der sich die Umgebung befindetDAG_ID: der Name des DAG
Weitere Informationen zum Ausführen von Befehlen der Airflow-Befehlszeile in Managed Airflow-Umgebungen finden Sie unter Befehle der Airflow-Befehlszeile ausführen.
Weitere Informationen zu den verfügbaren Airflow-Befehlszeilenbefehlen finden Sie unter
der gcloud composer environments run Befehlsreferenz.