Streaming-Pipeline mit einer Dataflow-Vorlage erstellen

In dieser Kurzanleitung erfahren Sie, wie Sie anhand einer von Google bereitgestellten Dataflow-Vorlage eine Streaming-Pipeline erstellen. Dabei wird speziell auf die Vorlage Pub/Sub für BigQuery Bezug genommen.

Die Vorlage „Pub/Sub für BigQuery“ ist eine Streamingpipeline, die Nachrichten im JSON-Format aus einem Pub/Sub-Thema lesen und in eine BigQuery-Tabelle schreiben kann.


Eine detaillierte Anleitung dazu finden Sie direkt in der Google Cloud Console. Klicken Sie dazu einfach auf Anleitung:

Anleitung


Hinweis

Führen Sie die folgenden Schritte aus, bevor Sie die Pipeline ausführen.

Projekt einrichten

  1. Melden Sie sich in Ihrem Google Cloud -Konto an. Wenn Sie mit Google Cloudnoch nicht vertraut sind, erstellen Sie ein Konto, um die Leistungsfähigkeit unserer Produkte in der Praxis sehen und bewerten zu können. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

Erforderliche Rollen

Für diese Kurzanleitung benötigen Sie die folgenden IAM-Rollen (Identity and Access Management).

Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen für Ihr Projekt zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Ausführen der Kurzanleitung benötigen:

Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.

Bitten Sie Ihren Administrator, dem Compute Engine-Standarddienstkonto die folgenden IAM-Rollen für Ihr Projekt zuzuweisen, damit das Compute Engine-Standarddienstkonto die erforderlichen Berechtigungen zum Ausführen des Dataflow-Jobs hat:

Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Ihr Administrator kann dem Compute Engine-Standarddienstkonto möglicherweise auch die erforderlichen Berechtigungen über benutzerdefinierte Rollen oder andere vordefinierte Rollen erteilen.

Cloud Storage-Bucket erstellen

Bevor Sie eine Pipeline ausführen können, müssen Sie einen Cloud Storage-Bucket erstellen.

  1. Erstellen Sie einen Cloud Storage-Bucket:

    1. Wechseln Sie in der Google Cloud Console unter „Cloud Storage“ zur Seite Buckets.

      Buckets aufrufen

    2. Klicken Sie auf Erstellen.
    3. Geben Sie auf der Seite Bucket erstellen die Bucket-Informationen ein. Klicken Sie auf Weiter, um mit dem nächsten Schritt fortzufahren.
      1. Geben Sie unter Bucket benennen einen eindeutigen Bucket-Namen ein. Der Bucket-Name darf keine vertraulichen Informationen enthalten, da der Bucket-Namespace global und öffentlich sichtbar ist.
      2. Gehen Sie im Bereich Speicherort für Daten auswählen so vor:
        1. Standorttyp auswählen.
        2. Wählen Sie im Drop-down-Menü Standorttyp einen Standort aus, an dem die Daten Ihres Buckets dauerhaft gespeichert werden.
          • Wenn Sie den Standorttyp Dual-Region auswählen, können Sie auch die Turboreplikation aktivieren, indem Sie das entsprechende Kästchen anklicken.
        3. Wenn Sie die Bucket-übergreifende Replikation einrichten möchten, wählen Sie Bucket-übergreifende Replikation über Storage Transfer Service hinzufügen aus und führen Sie die folgenden Schritte aus:

          Bucket-übergreifende Replikation einrichten

          1. Wählen Sie im Menü Bucket einen Bucket aus.
          2. Klicken Sie im Bereich Replikationseinstellungen auf Konfigurieren, um die Einstellungen für den Replikationsjob zu konfigurieren.

            Der Bereich Bucket-übergreifende Replikation konfigurieren wird angezeigt.

            • Wenn Sie die zu replizierenden Objekte nach dem Objektnamenspräfix filtern möchten, geben Sie ein Präfix ein, mit dem Sie Objekte ein- oder ausschließen möchten, und klicken Sie dann auf  Präfix hinzufügen.
            • Wenn Sie eine Speicherklasse für die replizierten Objekte festlegen möchten, wählen Sie im Menü Speicherklasse eine Speicherklasse aus. Wenn Sie diesen Schritt überspringen, wird für replizierte Objekte standardmäßig die Speicherklasse des Ziel-Buckets verwendet.
            • Klicken Sie auf Fertig.
      3. Gehen Sie im Bereich Speicherort für Daten auswählen so vor:
        1. Wählen Sie im Bereich Standardklasse festlegen die Option Standard aus.
        2. Wenn Sie den hierarchischen Namespace aktivieren möchten, wählen Sie im Bereich Speicher für datenintensive Arbeitslasten optimieren die Option Hierarchischen Namespace für diesen Bucket aktivieren aus.
      4. Wählen Sie im Abschnitt Zugriff auf Objekte steuern aus, ob der Bucket Verhinderung des öffentlichen Zugriffs durchsetzt, und wählen Sie eine Zugriffssteuerungsmethode für die Objekte Ihres Buckets aus.
      5. Führen Sie im Bereich Auswählen, wie Objektdaten geschützt werden die folgenden Schritte aus:
        • Wählen Sie unter Datenschutz die gewünschten Optionen für Ihren Bucket aus.
          • Wenn Sie Vorläufiges Löschen aktivieren möchten, klicken Sie das Kästchen Richtlinie für vorläufiges Löschen (zur Datenwiederherstellung) an und geben Sie die Anzahl der Tage an, die Objekte nach dem Löschen beibehalten werden sollen.
          • Wenn Sie die Objektversionsverwaltung festlegen möchten, klicken Sie das Kästchen Objektversionsverwaltung (zur Datenwiederherstellung) an und geben Sie die maximale Anzahl von Versionen pro Objekt und die Anzahl der Tage an, nach denen die nicht aktuellen Versionen ablaufen.
          • Klicken Sie das Kästchen Aufbewahrung (für Compliance) an, um die Aufbewahrungsrichtlinie für Objekte und Buckets zu aktivieren, und gehen Sie dann so vor:
            • Klicken Sie auf das Kästchen Objektaufbewahrung aktivieren, um die Objektaufbewahrungssperre zu aktivieren.
            • Wenn Sie Bucket Lock aktivieren möchten, klicken Sie das Kästchen Bucket-Aufbewahrungsrichtlinie festlegen an und wählen Sie eine Zeiteinheit und eine Zeitdauer für die Aufbewahrungsdauer aus.
        • Um auszuwählen, wie Ihre Objektdaten verschlüsselt werden, maximieren Sie den Bereich Datenverschlüsselung () und wählen Sie eine Methode für die Datenverschlüsselung aus.
    4. Klicken Sie auf Erstellen.
  2. Kopieren Sie Folgendes, was Sie in einem späteren Abschnitt benötigen:

    • : Name Ihres Cloud Storage-Buckets
    • Ihre Google Cloud -Projekt-ID.

    Diese ID finden Sie unter Projekte identifizieren.

VPC-Netzwerk

Standardmäßig beginnt jedes neue Projekt mit einem Standardnetzwerk. Wenn das Standardnetzwerk für Ihr Projekt deaktiviert oder gelöscht wurde, benötigen Sie in Ihrem Projekt ein Netzwerk, für das Ihr Nutzerkonto die Rolle Compute-Netzwerknutzer (roles/compute.networkUser) hat.

BigQuery-Dataset und -Tabelle erstellen

Erstellen Sie in der Google Cloud Console ein BigQuery-Dataset und eine BigQuery-Tabelle mit dem entsprechenden Schema für das Pub/Sub-Thema.

In diesem Beispiel lautet der Name des Datasets taxirides und der Name der Tabelle realtime. So erstellen Sie dieses Dataset und diese Tabelle:

  1. Rufen Sie die Seite BigQuery auf.
    BigQuery aufrufen
  2. Klicken Sie im Steuerfeld Explorer neben dem Projekt, in dem Sie das Dataset erstellen möchten, auf Aktionen anzeigen und dann auf Dataset erstellen.
  3. Führen Sie im Bereich Dataset erstellen die folgenden Schritte aus:
    1. Geben Sie unter Dataset-ID taxirides ein. Dataset-IDs sind für jedes Google Cloud Projekt eindeutig.
    2. Wählen Sie als Standorttyp die Option Mehrere Regionen und dann USA (mehrere Regionen in den USA) aus. Öffentliche Datasets sind am multiregionalen Standort US gespeichert. Der Einfachheit halber sollten Sie Ihr Dataset an diesem Speicherort ablegen.
    3. Übernehmen Sie die anderen Standardeinstellungen und klicken Sie auf Dataset erstellen.
  4. Maximieren Sie im Bereich Explorer Ihr Projekt.
  5. Klicken Sie neben dem Dataset taxirides auf Aktionen anzeigen und dann auf Tabelle erstellen.
  6. Führen Sie im Bereich Tabelle erstellen die folgenden Schritte aus:
    1. Wählen Sie im Abschnitt Quelle unter Tabelle erstellen aus die Option Leere Tabelle aus.
    2. Geben Sie im Abschnitt Ziel unter Tabelle den Wert realtime ein.
    3. Klicken Sie im Abschnitt Schema auf das Optionsfeld Als Text bearbeiten und fügen Sie die folgende Schemadefinition in das Feld ein:
      ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp,
      meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
    4. Wählen Sie unter Partitionierung und Clustereinstellungen für Partitionierung das Feld Zeitstempel aus.
  7. Übernehmen Sie die anderen Standardeinstellungen und klicken Sie auf Tabelle erstellen.

Pipeline ausführen

Führen Sie eine Streamingpipeline mit der von Google bereitgestellten Vorlage „Cloud Pub/Sub für BigQuery“ aus. Die Pipeline erhält eingehende Daten aus dem Eingabethema.

  1. Rufen Sie die Dataflow-Seite Jobs auf.
    ZU JOBS
  2. Klicken Sie auf Job aus Vorlage erstellen.
  3. Geben Sie taxi-data als Jobname für Ihren Dataflow-Job ein.
  4. Wählen Sie für die Dataflow-Vorlage die Vorlage Pub/Sub für BigQuery aus.
  5. Geben Sie für BigQuery-Ausgabetabelle Folgendes ein:
    PROJECT_ID:taxirides.realtime

    Ersetzen Sie dabei PROJECT_ID durch die Projekt-ID des Projekts, in dem Sie das BigQuery-Dataset erstellt haben.

  6. Klicken Sie im Bereich Optionale Quellparameter im Feld Pub/Sub-Eingabethema auf Thema manuell eingeben.
  7. Geben Sie im Dialog unter Themenname Folgendes ein und klicken Sie dann auf Speichern:
    projects/pubsub-public-data/topics/taxirides-realtime

    Dieses öffentlich verfügbare Pub/Sub-Thema basiert auf dem offenen Dataset der NYC Taxi & Limousine Commission. Im Folgenden finden Sie eine Beispielnachricht aus diesem Thema im JSON-Format:

    {
      "ride_id": "19c41fc4-e362-4be5-9d06-435a7dc9ba8e",
      "point_idx": 217,
      "latitude": 40.75399,
      "longitude": -73.96302,
      "timestamp": "2021-03-08T02:29:09.66644-05:00",
      "meter_reading": 6.293821,
      "meter_increment": 0.029003782,
      "ride_status": "enroute",
      "passenger_count": 1
    }
  8. Geben Sie unter Temporärer Standort Folgendes ein:
    gs://BUCKET_NAME/temp/

    Ersetzen Sie BUCKET_NAME durch den Namen des Cloud Storage-Buckets. Der Ordner temp speichert temporäre Dateien, z. B. den bereitgestellten Pipelinejob.

  9. Wenn Ihr Projekt kein Standardnetzwerk hat, geben Sie ein Netzwerk und ein Subnetzwerk ein. Weitere Informationen finden Sie unter Netzwerk und Subnetzwerk angeben.
  10. Klicken Sie auf Job ausführen.

Ergebnisse ansehen

So können Sie sich die in die Tabelle realtime geschriebenen Daten ansehen:

  1. Rufen Sie die Seite BigQuery auf.

    BigQuery aufrufen

  2. Klicken Sie auf Neue Abfrage erstellen. Ein neuer Tab Editor wird geöffnet.

    SELECT * FROM `PROJECT_ID.taxirides.realtime`
    WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
    LIMIT 1000

    Ersetzen Sie dabei PROJECT_ID durch die Projekt-ID des Projekts, in dem Sie das BigQuery-Dataset erstellt haben. Es kann bis zu fünf Minuten dauern, bis Daten in der Tabelle angezeigt werden.

  3. Klicken Sie auf Ausführen.

    Die Abfrage gibt Zeilen zurück, die in den letzten 24 Stunden zu Ihrer Tabelle hinzugefügt wurden. Sie können Abfragen auch mit Standard-SQL ausführen.

Bereinigen

Mit den folgenden Schritten vermeiden Sie, dass Ihrem Google Cloud -Konto die auf dieser Seite verwendeten Ressourcen in Rechnung gestellt werden:

Projekt löschen

Am einfachsten vermeiden Sie weitere Kosten, indem Sie das für den Schnellstart erstellte Projekt Google Cloud löschen.

  1. Wechseln Sie in der Google Cloud -Console zur Seite Ressourcen verwalten.

    Zur Seite „Ressourcen verwalten“

  2. Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie dann auf Löschen.
  3. Geben Sie im Dialogfeld die Projekt-ID ein und klicken Sie auf Shut down (Beenden), um das Projekt zu löschen.

Einzelne Ressourcen löschen

Wenn Sie das in dieser Kurzanleitung verwendete Google Cloud -Projekt beibehalten möchten, löschen Sie die einzelnen Ressourcen:

  1. Rufen Sie die Dataflow-Seite Jobs auf.
    Gehe zu Jobs
  2. Wählen Sie den Streaming-Job aus der Jobliste aus.
  3. Klicken Sie im Navigationsbereich auf Beenden.
  4. Geben Sie im Dialogfeld Job anhalten entweder Pipeline abbrechen oder per Drain beenden ein und klicken Sie dann auf Job beenden.
  5. Rufen Sie die Seite BigQuery auf.
    BigQuery aufrufen
  6. Maximieren Sie im Bereich Explorer Ihr Projekt.
  7. Klicken Sie neben dem Dataset, das Sie löschen möchten, auf Aktionen ansehen und dann auf Öffnen.
  8. Klicken Sie im Detailbereich auf Dataset löschen und folgen Sie der Anleitung.
  9. Wechseln Sie in der Google Cloud Console unter „Cloud Storage“ zur Seite Buckets.

    Buckets aufrufen

  10. Klicken Sie auf das Kästchen neben dem Bucket, der gelöscht werden soll.
  11. Klicken Sie zum Löschen des Buckets auf Löschen und folgen Sie der Anleitung.

Nächste Schritte