Pipelines orchestrieren

Auf dieser Seite wird die Pipeline-Orchestrierung mit Managed Service for Apache Airflow und Triggern erläutert. Cloud Data Fusion empfiehlt die Verwendung von Managed Airflow zum Orchestrieren von Pipelines. Wenn Sie die Orchestrierung einfacher verwalten möchten, verwenden Sie Trigger.

Composer

Pipelines mit Managed Airflow orchestrieren

Die Orchestrierung der Pipelineausführung in Cloud Data Fusion mit Managed Airflow bietet folgende Vorteile:

  • Zentrale Workflowverwaltung:Die Ausführung mehrerer Cloud Data Fusion-Pipelines wird einheitlich verwaltet.
  • Abhängigkeitsverwaltung:Definieren Sie Abhängigkeiten zwischen Pipelines, um die richtige Ausführungsreihenfolge zu gewährleisten.
  • Monitoring und Benachrichtigungen:Managed Airflow bietet Monitoring-Funktionen und Benachrichtigungen bei Fehlern.
  • Einbindung in andere Dienste:Mit Managed Airflow können Sie Workflows orchestrieren, die sich über Cloud Data Fusion und andereGoogle Cloud -Dienste erstrecken.

So orchestrieren Sie Cloud Data Fusion-Pipelines mit Managed Airflow:

  1. Managed Airflow-Umgebung einrichten

    • Managed Airflow-Umgebung erstellen Falls Sie noch keine haben, stellen Sie die Umgebung in Ihrem Google Cloud -Projekt bereit. Diese Umgebung ist Ihr Orchestrierungsarbeitsbereich.
    • Berechtigungen erteilen Das Managed Airflow-Dienstkonto muss die erforderlichen Berechtigungen für den Zugriff auf Cloud Data Fusion haben, z. B. die Berechtigung zum Starten, Beenden und Auflisten von Pipelines.
  2. Gerichtete azyklische Graphen (Directed Acyclic Graphs, DAGs) für die Orchestrierung definieren:

    • DAG erstellen:Erstellen Sie in Managed Airflow einen DAG, der den Orchestrierungsworkflow für Ihre Cloud Data Fusion-Pipelines definiert.
    • Cloud Data Fusion-Operatoren:Verwenden Sie die Cloud Data Fusion-Operatoren von Managed Airflow in Ihrem DAG. Mit diesen Operatoren können Sie programmatisch mit Cloud Data Fusion interagieren.

Cloud Data Fusion-Operatoren

Für die Cloud Data Fusion-Pipelineorchestrierung sind die folgenden Operatoren verfügbar:

CloudDataFusionStartPipelineOperator

Löst die Ausführung einer Cloud Data Fusion-Pipeline anhand ihrer ID aus. Sie hat die folgenden Parameter:

  • Pipeline-ID
  • Standort (Google Cloud Region)
  • Pipeline-Namespace
  • Laufzeitargumente (optional)
  • Auf Abschluss warten (optional)
  • Zeitüberschreitung (optional)
CloudDataFusionStopPipelineOperator

Ermöglicht das Beenden einer laufenden Cloud Data Fusion-Pipeline.

CloudDataFusionDeletePipelineOperator

Löscht eine Cloud Data Fusion-Pipeline.

DAG-Workflow erstellen

Beachten Sie beim Erstellen des DAG-Workflows Folgendes:

  • Abhängigkeiten definieren:Verwenden Sie die DAG-Struktur, um Abhängigkeiten zwischen Aufgaben zu definieren. Sie haben beispielsweise eine Aufgabe, die darauf wartet, dass eine Pipeline in einem Namespace erfolgreich abgeschlossen wird, bevor eine andere Pipeline in einem anderen Namespace ausgelöst wird.
  • Planung:Sie können die DAG so planen, dass sie in bestimmten Intervallen ausgeführt wird, z. B. täglich oder stündlich, oder sie manuell auslösen.

Weitere Informationen finden Sie in der Übersicht über Managed Airflow.

Trigger

Pipelines mit Triggern orchestrieren

Mit Cloud Data Fusion-Triggern können Sie eine nachgelagerte Pipeline automatisch ausführen lassen, wenn eine oder mehrere vorgelagerte Pipelines abgeschlossen sind (erfolgreich, fehlgeschlagen oder bei einer beliebigen angegebenen Bedingung).

Trigger sind für die folgenden Aufgaben nützlich:

  • Bereinigen Ihrer Daten und für mehrere nachgelagerte Pipelines zur Verwendung zur Verfügung stellen.
  • Informationen wie Laufzeitargumente und Plug-in-Konfigurationen zwischen Pipelines freigeben. Diese Aufgabe wird als Nutzlastkonfiguration bezeichnet.
  • Eine Reihe dynamischer Pipelines haben, die mit den Daten von Stunde, Tag, Woche oder Monat ausgeführt werden können, anstelle einer statischen Pipeline, die bei jeder Ausführung aktualisiert werden muss.

Sie haben beispielsweise ein Dataset, das alle Informationen zu den Lieferungen Ihres Unternehmens enthält. Anhand dieser Daten möchten Sie mehrere geschäftliche Fragen beantworten. Dazu erstellen Sie eine Pipeline, mit der die Rohdaten zu Lieferungen bereinigt werden. Diese Pipeline heißt Shipments Data Cleaning. Anschließend erstellen Sie eine zweite Pipeline, Delayed Shipments USA, die die bereinigten Daten liest und die Lieferungen innerhalb der USA findet, die um mehr als einen angegebenen Schwellenwert verzögert wurden. Die Pipeline Delayed Shipments USA kann ausgelöst werden, sobald die Upstream-Pipeline Shipments Data Cleaning erfolgreich abgeschlossen wurde.

Da die nachgelagerte Pipeline die Ausgabe der vorgelagerten Pipeline verwendet, müssen Sie außerdem angeben, dass die nachgelagerte Pipeline bei der Ausführung mit diesem Trigger auch das Eingabeverzeichnis erhält, aus dem sie Daten lesen kann. Das ist das Verzeichnis, in dem die vorgelagerte Pipeline ihre Ausgabe generiert hat. Dieser Vorgang wird als Nutzlastkonfiguration übergeben bezeichnet. Sie wird mit Laufzeitargumenten definiert. Sie können eine Reihe dynamischer Pipelines erstellen, die mit den Daten von Stunde, Tag, Woche oder Monat ausgeführt werden (im Gegensatz zu einer statischen Pipeline, die bei jeder Ausführung aktualisiert werden muss).

So orchestrieren Sie Pipelines mit Triggern:

  1. Vorgelagerte und nachgelagerte Pipelines erstellen:

    • Entwerfen und stellen Sie im Cloud Data Fusion Studio die Pipelines bereit, aus denen Ihre Orchestrierungskette besteht.
    • Überlegen Sie, durch welche Pipeline die nächste Pipeline (Downstream) in Ihrem Workflow aktiviert wird.
  2. Optional: Laufzeitargumente für vorgelagerte Pipelines übergeben

  3. Eingehenden Trigger in der nachgelagerten Pipeline erstellen

    • Rufen Sie in Cloud Data Fusion Studio die Seite Liste auf. Klicken Sie auf dem Tab Bereitgestellt auf den Namen der nachgelagerten Pipeline. Die Bereitstellungsansicht für diese Pipeline wird angezeigt.
    • Klicken Sie in der Mitte der linken Seite der Seite auf Eingehende Trigger. Eine Liste der verfügbaren Pipelines wird angezeigt.
    • Klicken Sie auf die vorgelagerte Pipeline. Wählen Sie einen oder mehrere Abschlussstatus der vorgelagerten Pipeline als Bedingung aus (Erfolgreich, Fehlgeschlagen oder Angehalten), für den Zeitpunkt, an dem die nachgelagerte Pipeline ausgeführt werden soll.
    • Wenn die vorgelagerte Pipeline Informationen (auch als Nutzlastkonfiguration bezeichnet) an die nachgelagerte Pipeline weitergeben soll, klicken Sie auf Triggerkonfiguration und folgen Sie der Anleitung zum Übergeben der Nutzlastkonfiguration als Laufzeitargument. Klicken Sie andernfalls auf Trigger aktivieren.
  4. Trigger testen

    • Starten Sie eine Ausführung der vorgelagerten Pipeline.
    • Wenn der Trigger richtig konfiguriert ist, wird die nachgelagerte Pipeline nach Abschluss der vorgelagerten Pipelines automatisch ausgeführt, basierend auf der von Ihnen konfigurierten Bedingung.

Nutzlastkonfiguration als Laufzeitargumente übergeben

Die Nutzlastkonfiguration ermöglicht die Freigabe von Informationen von der vorgelagerten Pipeline an die nachgelagerte Pipeline. Diese Informationen können beispielsweise das Ausgabeverzeichnis, das Datenformat oder der Tag sein, an dem die Pipeline ausgeführt wurde. Diese Informationen werden dann von der nachgelagerten Pipeline für Entscheidungen verwendet, z. B. zum Bestimmen des richtigen Datasets, aus dem gelesen werden soll.

Zur Übergabe von Informationen aus der vorgelagerten Pipeline legen Sie die Laufzeitargumente der nachgelagerten Pipeline mit den Werten der Laufzeitargumente oder der Konfiguration eines beliebigen Plug-ins in der vorgelagerten Pipeline fest.

Bei jeder Ausführung der nachgelagerten Pipeline wird deren Nutzlastkonfiguration mit den Laufzeitargumenten der jeweiligen Ausführung der vorgelagerten Pipeline festgelegt, die die nachgelagerte Pipeline ausgelöst hat.

So übergeben Sie die Nutzlastkonfiguration als Laufzeitargumente:

  1. Wenn Sie an der Stelle im Abschnitt Eingehenden Trigger erstellen fortfahren, wo Sie aufgehört haben, nachdem Sie auf Triggerkonfiguration geklickt haben, werden alle Laufzeitargumente angezeigt, die Sie für die Upstream-Pipeline zuvor festgelegt haben. Wählen Sie die Laufzeitargumente aus, die Sie von der vorgelagerten Pipeline zur nachgelagerten Pipeline übergeben möchten, wenn dieser Trigger ausgeführt wird.
  2. Klicken Sie auf den Tab Plug-in-Konfiguration, um eine Liste der Elemente zu öffnen, die von Ihrer vorgelagerten Pipeline an Ihre nachgelagerte Pipeline übergeben werden, wenn diese ausgelöst wird.
  3. Klicken Sie auf Trigger konfigurieren und aktivieren.