Dataflow-Pipeline mit Python erstellen

In diesem Dokument erfahren Sie, wie Sie das Apache Beam SDK für Python verwenden, um ein Programm zu erstellen, das eine Pipeline definiert. Anschließend führen Sie die Pipeline mit einem direkten lokalen Runner oder einem cloudbasierten Runner wie Dataflow aus. Eine Einführung in die WordCount-Pipeline finden Sie im Video How to use WordCount in Apache Beam.


Eine detaillierte Anleitung dazu finden Sie direkt in der Google Cloud Console. Klicken Sie dazu einfach auf Anleitung:

Anleitung


Hinweis

  1. Melden Sie sich in Ihrem Google Cloud -Konto an. Wenn Sie mit Google Cloudnoch nicht vertraut sind, erstellen Sie ein Konto, um die Leistungsfähigkeit unserer Produkte in der Praxis sehen und bewerten zu können. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.
  2. Installieren Sie die Google Cloud CLI.

  3. Wenn Sie einen externen Identitätsanbieter (IdP) verwenden, müssen Sie sich zuerst mit Ihrer föderierten Identität in der gcloud CLI anmelden.

  4. Führen Sie den folgenden Befehl aus, um die gcloud CLI zu initialisieren:

    gcloud init
  5. Erstellen Sie ein Google Cloud Projekt oder wählen Sie eines aus.

    Rollen, die zum Auswählen oder Erstellen eines Projekts erforderlich sind

    • Projekt auswählen: Für die Auswahl eines Projekts ist keine bestimmte IAM-Rolle erforderlich. Sie können jedes Projekt auswählen, für das Ihnen eine Rolle zugewiesen wurde.
    • Projekt erstellen: Zum Erstellen eines Projekts benötigen Sie die Rolle „Projektersteller“ (roles/resourcemanager.projectCreator), die die Berechtigung resourcemanager.projects.create enthält. Weitere Informationen zum Zuweisen von Rollen
    • So erstellen Sie ein Google Cloud -Projekt:

      gcloud projects create PROJECT_ID

      Ersetzen Sie PROJECT_ID durch einen Namen für das Google Cloud -Projekt, das Sie erstellen.

    • Wählen Sie das von Ihnen erstellte Google Cloud Projekt aus:

      gcloud config set project PROJECT_ID

      Ersetzen Sie PROJECT_ID durch den Namen Ihres Projekts in Google Cloud .

  6. Prüfen Sie, ob für Ihr Google Cloud Projekt die Abrechnung aktiviert ist.

  7. Aktivieren Sie die Dataflow API, die Compute Engine API, die Cloud Logging API, die Cloud Storage API, die Google Cloud Storage JSON API, die BigQuery API, die Cloud Pub/Sub API, die Cloud Datastore API und die Cloud Resource Manager API:

    Rollen, die zum Aktivieren von APIs erforderlich sind

    Zum Aktivieren von APIs benötigen Sie die IAM-Rolle „Service Usage-Administrator“ (roles/serviceusage.serviceUsageAdmin), die die Berechtigung serviceusage.services.enable enthält. Weitere Informationen zum Zuweisen von Rollen

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  8. Erstellen Sie lokale Anmeldedaten zur Authentifizierung für Ihr Nutzerkonto:

    gcloud auth application-default login

    Wenn ein Authentifizierungsfehler zurückgegeben wird und Sie einen externen Identitätsanbieter (IdP) verwenden, prüfen Sie, ob Sie sich mit Ihrer föderierten Identität in der gcloud CLI angemeldet haben.

  9. Weisen Sie Ihrem Nutzerkonto Rollen zu. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus: roles/iam.supportUser, roles/datastream.admin, roles/monitoring.metricsScopesViewer, roles/cloudaicompanion.settingsAdmin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Ersetzen Sie Folgendes:

    • PROJECT_ID: Ihre Projekt-ID.
    • USER_IDENTIFIER: Die Kennung für Ihr Nutzerkonto . Beispiel: myemail@example.com
    • ROLE: Die IAM-Rolle, die Sie Ihrem Nutzerkonto zuweisen.
  10. Installieren Sie die Google Cloud CLI.

  11. Wenn Sie einen externen Identitätsanbieter (IdP) verwenden, müssen Sie sich zuerst mit Ihrer föderierten Identität in der gcloud CLI anmelden.

  12. Führen Sie den folgenden Befehl aus, um die gcloud CLI zu initialisieren:

    gcloud init
  13. Erstellen Sie ein Google Cloud Projekt oder wählen Sie eines aus.

    Rollen, die zum Auswählen oder Erstellen eines Projekts erforderlich sind

    • Projekt auswählen: Für die Auswahl eines Projekts ist keine bestimmte IAM-Rolle erforderlich. Sie können jedes Projekt auswählen, für das Ihnen eine Rolle zugewiesen wurde.
    • Projekt erstellen: Zum Erstellen eines Projekts benötigen Sie die Rolle „Projektersteller“ (roles/resourcemanager.projectCreator), die die Berechtigung resourcemanager.projects.create enthält. Weitere Informationen zum Zuweisen von Rollen
    • So erstellen Sie ein Google Cloud -Projekt:

      gcloud projects create PROJECT_ID

      Ersetzen Sie PROJECT_ID durch einen Namen für das Google Cloud -Projekt, das Sie erstellen.

    • Wählen Sie das von Ihnen erstellte Google Cloud Projekt aus:

      gcloud config set project PROJECT_ID

      Ersetzen Sie PROJECT_ID durch den Namen Ihres Projekts in Google Cloud .

  14. Prüfen Sie, ob für Ihr Google Cloud Projekt die Abrechnung aktiviert ist.

  15. Aktivieren Sie die Dataflow API, die Compute Engine API, die Cloud Logging API, die Cloud Storage API, die Google Cloud Storage JSON API, die BigQuery API, die Cloud Pub/Sub API, die Cloud Datastore API und die Cloud Resource Manager API:

    Rollen, die zum Aktivieren von APIs erforderlich sind

    Zum Aktivieren von APIs benötigen Sie die IAM-Rolle „Service Usage-Administrator“ (roles/serviceusage.serviceUsageAdmin), die die Berechtigung serviceusage.services.enable enthält. Weitere Informationen zum Zuweisen von Rollen

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  16. Erstellen Sie lokale Anmeldedaten zur Authentifizierung für Ihr Nutzerkonto:

    gcloud auth application-default login

    Wenn ein Authentifizierungsfehler zurückgegeben wird und Sie einen externen Identitätsanbieter (IdP) verwenden, prüfen Sie, ob Sie sich mit Ihrer föderierten Identität in der gcloud CLI angemeldet haben.

  17. Weisen Sie Ihrem Nutzerkonto Rollen zu. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus: roles/iam.supportUser, roles/datastream.admin, roles/monitoring.metricsScopesViewer, roles/cloudaicompanion.settingsAdmin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Ersetzen Sie Folgendes:

    • PROJECT_ID: Ihre Projekt-ID.
    • USER_IDENTIFIER: Die Kennung für Ihr Nutzerkonto . Beispiel: myemail@example.com
    • ROLE: Die IAM-Rolle, die Sie Ihrem Nutzerkonto zuweisen.
  18. Weisen Sie Ihrem Compute Engine-Standarddienstkonto Rollen zu. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • Ersetzen Sie PROJECT_ID durch Ihre Projekt-ID.
    • Ersetzen Sie PROJECT_NUMBER durch die Projekt-ID. Ihre Projektnummer finden Sie unter Projekte identifizieren oder verwenden Sie den Befehl gcloud projects describe.
    • Ersetzen Sie SERVICE_ACCOUNT_ROLE durch jede einzelne Rolle.
  19. Erstellen Sie einen Cloud Storage-Bucket und konfigurieren Sie ihn so:
    • Legen Sie die Speicherklasse auf S (Standard) fest.
    • Legen Sie als Speicherort Folgendes fest: US (USA).
    • Ersetzen Sie BUCKET_NAME durch einen eindeutigen Bucket-Namen. Der Bucket-Name darf keine vertraulichen Informationen enthalten, da der Bucket-Namespace global und öffentlich sichtbar ist.
    gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
  20. Kopieren Sie die Google Cloud Projekt-ID und den Namen des Cloud Storage-Bucket. Sie benötigen diese Werte später in diesem Dokument.

Umgebung einrichten

In diesem Abschnitt richten Sie über die Eingabeaufforderung eine isolierte virtuelle Python-Umgebung ein, um Ihr Pipeline-Projekt mit venv auszuführen. Auf diese Weise können Sie die Abhängigkeiten eines Projekts von den Abhängigkeiten anderer Projekte isolieren.

Wenn Ihnen im Moment keine Eingabeaufforderung zur Verfügung steht, können Sie Cloud Shell verwenden. Der Paketmanager für Python 3 ist in Cloud Shell bereits installiert, sodass Sie mit dem Erstellen einer virtuellen Umgebung fortfahren können.

So installieren Sie Python und erstellen dann eine virtuelle Umgebung:

  1. Prüfen Sie, ob Python 3 und pip in Ihrem System ausgeführt werden:
    python --version
    python -m pip --version
  2. Installieren Sie gegebenenfalls Python 3 und richten Sie dann eine virtuelle Python-Umgebung ein. Folgen Sie dazu der Anleitung in den Abschnitten Python installieren und venv einrichten auf der Seite Python-Entwicklungsumgebung einrichten.

Nachdem Sie die Kurzanleitung durchgearbeitet haben, können Sie die virtuelle Umgebung mit dem Befehl deactivate deaktivieren.

Apache Beam SDK abrufen

Das Apache Beam SDK ist ein Open-Source-Programmiermodell für Datenpipelines. Sie definieren eine Pipeline mit einem Apache Beam-Programm und wählen dann einen Runner wie Dataflow aus, um Ihre Pipeline auszuführen.

So laden Sie das Apache Beam SDK herunter und installieren es:

  1. Prüfen Sie, ob Sie sich in der virtuellen Python-Umgebung befinden, die Sie im vorherigen Abschnitt erstellt haben. Die Eingabeaufforderung beginnt mit <env_name>, wobei env_name der Name der virtuellen Umgebung ist.
  2. Installieren Sie die neueste Version des Apache Beam SDK für Python:
  3. pip install apache-beam[gcp]

Pipeline lokal ausführen

Wenn Sie sehen möchten, wie eine Pipeline lokal ausgeführt wird, verwenden Sie ein fertiges Python-Modul für das Beispiel wordcount, das im Paket apache_beam enthalten ist.

Das Pipeline-Beispiel wordcount führt Folgendes aus:

  1. Sie nimmt eine Textdatei als Eingabe an.

    Sie finden die Textdatei in einem Cloud Storage-Bucket mit dem Ressourcennamen gs://dataflow-samples/shakespeare/kinglear.txt.

  2. Sie parst jede Zeile und unterteilt sie in Wörter.
  3. Sie misst die Häufigkeit der tokenisierten Wörter.

Führen Sie die folgenden Schritte aus, um die Pipeline wordcount lokal bereitzustellen:

  1. Führen Sie auf Ihrem lokalen Terminal das Beispiel wordcount aus:
    python -m apache_beam.examples.wordcount \
      --output outputs
  2. Sehen Sie sich die Ausgabe der Pipeline an:
    more outputs*
  3. Drücken Sie zum Beenden q.
Wenn Sie die Pipeline lokal ausführen, können Sie das Apache Beam-Programm testen und eventuelle Fehler beheben. Sie können den Quellcode von wordcount.py auf dem GitHub für Apache Beam ansehen.

Pipeline im Dataflow-Dienst ausführen

In diesem Abschnitt führen Sie die wordcount-Beispielpipeline aus dem Paket apache_beam im Dataflow-Dienst aus. In diesem Beispiel wird DataflowRunner als Parameter für --runner angegeben.
  • Führen Sie die Pipeline aus:
    python -m apache_beam.examples.wordcount \
        --region DATAFLOW_REGION \
        --input gs://dataflow-samples/shakespeare/kinglear.txt \
        --output gs://BUCKET_NAME/results/outputs \
        --runner DataflowRunner \
        --project PROJECT_ID \
        --temp_location gs://BUCKET_NAME/tmp/

    Ersetzen Sie Folgendes:

    • DATAFLOW_REGION: Die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. europe-west1

      Das Flag --region überschreibt die Standardregion, die auf dem Metadatenserver, auf Ihrem lokalen Client oder in Umgebungsvariablen festgelegt ist.

    • BUCKET_NAME: der Name des Cloud Storage-Bucket, den Sie zuvor kopiert haben
    • PROJECT_ID: die Google Cloud Projekt-ID, die Sie zuvor kopiert haben

Ergebnisse ansehen

Wenn Sie eine Pipeline in Dataflow ausführen, werden die Ergebnisse in einem Cloud Storage-Bucket gespeichert. Prüfen Sie in diesem Abschnitt, ob die Pipeline mit der Google Cloud Console oder dem lokalen Terminal ausgeführt wird.

Google Cloud Console

So rufen Sie Ihre Ergebnisse in der Google Cloud Console auf:

  1. Rufen Sie in der Google Cloud Console die Dataflow-Seite Jobs auf.

    Zu Jobs

    Auf der Seite Jobs werden Details zum wordcount-Job angezeigt, z. B. der Status Aktiv und dann Erfolgreich.

  2. Rufen Sie die Seite Cloud Storage-Buckets auf:

    Buckets aufrufen

  3. Klicken Sie in der Liste der Buckets in Ihrem Projekt auf den Storage-Bucket, den Sie zuvor erstellt haben.

    Im Verzeichnis wordcount werden die von Ihrem Job erstellten Ausgabedateien angezeigt.

Lokales Terminal

Sehen Sie sich die Ergebnisse über Ihr Terminal oder mithilfe von Cloud Shell an.

  1. Verwenden Sie den Befehl gcloud storage ls, um die Ausgabedateien aufzulisten:
    gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
  2. Ersetzen Sie BUCKET_NAME durch den Namen des Cloud Pipeline-Buckets, der im Pipelineprogramm verwendet wird.

  3. Verwenden Sie den Befehl gcloud storage cat, um die Ergebnisse in den Ausgabedateien aufzurufen:
    gcloud storage cat gs://BUCKET_NAME/results/outputs*

Pipelinecode ändern

Die wordcount-Pipeline in den vorherigen Beispielen unterscheidet zwischen groß- und kleingeschriebenen Wörtern. In den folgenden Schritten wird gezeigt, wie Sie die Pipeline so ändern, dass die Groß- und Kleinschreibung bei der wordcount-Pipeline nicht berücksichtigt wird.
  1. Laden Sie auf Ihrem lokalen Computer die neueste Kopie des wordcount-Codes aus dem Apache Beam GitHub-Repository herunter.
  2. Führen Sie die Pipeline über das lokale Terminal aus:
    python wordcount.py --output outputs
  3. Rufen Sie die Ergebnisse auf:
    more outputs*
  4. Drücken Sie zum Beenden q.
  5. Öffnen Sie die Datei wordcount.py in einem Editor Ihrer Wahl.
  6. Sehen Sie sich die Pipelineschritte in der Funktion run an:
    counts = (
            lines
            | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
            | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum))

    Nach split werden die Zeilen in Wörter als Strings unterteilt.

  7. Wenn Sie die Strings in Kleinbuchstaben darstellen möchten, ändern Sie die Zeile nach split:
    counts = (
            lines
            | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
            | 'lowercase' >> beam.Map(str.lower)
            | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum)) 
    Durch diese Änderung wird die Funktion str.lower jedem Wort zugeordnet. Diese Zeile entspricht beam.Map(lambda word: str.lower(word)).
  8. Speichern Sie die Datei und führen Sie den geänderten Job wordcount aus:
    python wordcount.py --output outputs
  9. Sehen Sie sich die Ergebnisse der geänderten Pipeline an:
    more outputs*
  10. Drücken Sie zum Beenden q.
  11. Führen Sie die geänderte Pipeline im Dataflow-Dienst aus:
    python wordcount.py \
        --region DATAFLOW_REGION \
        --input gs://dataflow-samples/shakespeare/kinglear.txt \
        --output gs://BUCKET_NAME/results/outputs \
        --runner DataflowRunner \
        --project PROJECT_ID \
        --temp_location gs://BUCKET_NAME/tmp/

    Ersetzen Sie Folgendes:

    • DATAFLOW_REGION: die Region, in der Sie den Dataflow-Job bereitstellen möchten
    • BUCKET_NAME: Name Ihres Cloud Storage-Buckets.
    • PROJECT_ID: Ihre Google Cloud Projekt-ID

Bereinigen

Löschen Sie das Projekt von Google Cloud zusammen mit den Ressourcen, damit Ihrem Konto von Google Cloud die auf dieser Seite verwendeten Ressourcen nicht in Rechnung gestellt werden.

  1. Wechseln Sie in der Google Cloud Console unter „Cloud Storage“ zur Seite Buckets.

    Buckets aufrufen

  2. Klicken Sie auf das Kästchen neben dem Bucket, der gelöscht werden soll.
  3. Klicken Sie zum Löschen des Buckets auf Löschen und folgen Sie der Anleitung.
  4. Wenn Sie Ihr Projekt beibehalten, widerrufen Sie die Rollen, die Sie dem Compute Engine-Standarddienstkonto zugewiesen haben. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=SERVICE_ACCOUNT_ROLE
  5. Optional: Widerrufen Sie die von Ihnen erstellten Anmeldedaten für die Authentifizierung und löschen Sie die lokale Datei mit den Anmeldedaten:

    gcloud auth application-default revoke
  6. Optional: Widerrufen Sie Anmeldedaten von der gcloud-CLI.

    gcloud auth revoke

Nächste Schritte