Dataflow-Pipeline mit Go erstellen
Auf dieser Seite erfahren Sie, wie Sie mit dem Apache Beam SDK für Go ein Programm erstellen, das eine Pipeline definiert. Anschließend führen Sie die Pipeline lokal und im Dataflow-Dienst aus. Eine Einführung in die WordCount-Pipeline finden Sie im Video How to use WordCount in Apache Beam.
Hinweis
- Melden Sie sich in Ihrem Google Cloud -Konto an. Wenn Sie mit Google Cloudnoch nicht vertraut sind, erstellen Sie ein Konto, um die Leistungsfähigkeit unserer Produkte in der Praxis sehen und bewerten zu können. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.
-
Installieren Sie die Google Cloud CLI.
-
Wenn Sie einen externen Identitätsanbieter (IdP) verwenden, müssen Sie sich zuerst mit Ihrer föderierten Identität in der gcloud CLI anmelden.
-
Führen Sie den folgenden Befehl aus, um die gcloud CLI zu initialisieren:
gcloud init -
Erstellen Sie ein Google Cloud Projekt oder wählen Sie eines aus.
Rollen, die zum Auswählen oder Erstellen eines Projekts erforderlich sind
- Projekt auswählen: Für die Auswahl eines Projekts ist keine bestimmte IAM-Rolle erforderlich. Sie können jedes Projekt auswählen, für das Ihnen eine Rolle zugewiesen wurde.
-
Projekt erstellen: Zum Erstellen eines Projekts benötigen Sie die Rolle „Projektersteller“ (
roles/resourcemanager.projectCreator), die die Berechtigungresourcemanager.projects.createenthält. Weitere Informationen zum Zuweisen von Rollen
-
So erstellen Sie ein Google Cloud -Projekt:
gcloud projects create PROJECT_ID
Ersetzen Sie
PROJECT_IDdurch einen Namen für das Google Cloud -Projekt, das Sie erstellen. -
Wählen Sie das von Ihnen erstellte Google Cloud Projekt aus:
gcloud config set project PROJECT_ID
Ersetzen Sie
PROJECT_IDdurch den Namen Ihres Projekts in Google Cloud .
-
Prüfen Sie, ob für Ihr Google Cloud Projekt die Abrechnung aktiviert ist.
Aktivieren Sie die Dataflow API, die Compute Engine API, die Cloud Logging API, die Cloud Storage API, die Google Cloud Storage JSON API und die Cloud Resource Manager API:
Rollen, die zum Aktivieren von APIs erforderlich sind
Zum Aktivieren von APIs benötigen Sie die IAM-Rolle „Service Usage-Administrator“ (
roles/serviceusage.serviceUsageAdmin), die die Berechtigungserviceusage.services.enableenthält. Weitere Informationen zum Zuweisen von Rollengcloud services enable dataflow
compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com -
Erstellen Sie lokale Anmeldedaten zur Authentifizierung für Ihr Nutzerkonto:
gcloud auth application-default login
Wenn ein Authentifizierungsfehler zurückgegeben wird und Sie einen externen Identitätsanbieter (IdP) verwenden, prüfen Sie, ob Sie sich mit Ihrer föderierten Identität in der gcloud CLI angemeldet haben.
-
Weisen Sie Ihrem Nutzerkonto Rollen zu. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus:
roles/iam.serviceAccountUsergcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Ersetzen Sie Folgendes:
PROJECT_ID: Ihre Projekt-ID.USER_IDENTIFIER: Die Kennung für Ihr Nutzerkonto . Beispiel:myemail@example.comROLE: Die IAM-Rolle, die Sie Ihrem Nutzerkonto zuweisen.
-
Installieren Sie die Google Cloud CLI.
-
Wenn Sie einen externen Identitätsanbieter (IdP) verwenden, müssen Sie sich zuerst mit Ihrer föderierten Identität in der gcloud CLI anmelden.
-
Führen Sie den folgenden Befehl aus, um die gcloud CLI zu initialisieren:
gcloud init -
Erstellen Sie ein Google Cloud Projekt oder wählen Sie eines aus.
Rollen, die zum Auswählen oder Erstellen eines Projekts erforderlich sind
- Projekt auswählen: Für die Auswahl eines Projekts ist keine bestimmte IAM-Rolle erforderlich. Sie können jedes Projekt auswählen, für das Ihnen eine Rolle zugewiesen wurde.
-
Projekt erstellen: Zum Erstellen eines Projekts benötigen Sie die Rolle „Projektersteller“ (
roles/resourcemanager.projectCreator), die die Berechtigungresourcemanager.projects.createenthält. Weitere Informationen zum Zuweisen von Rollen
-
So erstellen Sie ein Google Cloud -Projekt:
gcloud projects create PROJECT_ID
Ersetzen Sie
PROJECT_IDdurch einen Namen für das Google Cloud -Projekt, das Sie erstellen. -
Wählen Sie das von Ihnen erstellte Google Cloud Projekt aus:
gcloud config set project PROJECT_ID
Ersetzen Sie
PROJECT_IDdurch den Namen Ihres Projekts in Google Cloud .
-
Prüfen Sie, ob für Ihr Google Cloud Projekt die Abrechnung aktiviert ist.
Aktivieren Sie die Dataflow API, die Compute Engine API, die Cloud Logging API, die Cloud Storage API, die Google Cloud Storage JSON API und die Cloud Resource Manager API:
Rollen, die zum Aktivieren von APIs erforderlich sind
Zum Aktivieren von APIs benötigen Sie die IAM-Rolle „Service Usage-Administrator“ (
roles/serviceusage.serviceUsageAdmin), die die Berechtigungserviceusage.services.enableenthält. Weitere Informationen zum Zuweisen von Rollengcloud services enable dataflow
compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com -
Erstellen Sie lokale Anmeldedaten zur Authentifizierung für Ihr Nutzerkonto:
gcloud auth application-default login
Wenn ein Authentifizierungsfehler zurückgegeben wird und Sie einen externen Identitätsanbieter (IdP) verwenden, prüfen Sie, ob Sie sich mit Ihrer föderierten Identität in der gcloud CLI angemeldet haben.
-
Weisen Sie Ihrem Nutzerkonto Rollen zu. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus:
roles/iam.serviceAccountUsergcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Ersetzen Sie Folgendes:
PROJECT_ID: Ihre Projekt-ID.USER_IDENTIFIER: Die Kennung für Ihr Nutzerkonto . Beispiel:myemail@example.comROLE: Die IAM-Rolle, die Sie Ihrem Nutzerkonto zuweisen.
Weisen Sie Ihrem Compute Engine-Standarddienstkonto Rollen zu. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus:
roles/dataflow.adminroles/dataflow.workerroles/storage.objectAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
- Ersetzen Sie
PROJECT_IDdurch Ihre Projekt-ID. - Ersetzen Sie
PROJECT_NUMBERdurch die Projekt-ID. Ihre Projektnummer finden Sie unter Projekte identifizieren oder verwenden Sie den Befehlgcloud projects describe. - Ersetzen Sie
SERVICE_ACCOUNT_ROLEdurch jede einzelne Rolle.
-
Erstellen Sie einen Cloud Storage-Bucket und konfigurieren Sie ihn so:
-
Legen Sie die Speicherklasse auf
S(Standard) fest. -
Legen Sie als Speicherort Folgendes fest:
US(USA). -
Ersetzen Sie
BUCKET_NAMEdurch einen eindeutigen Bucket-Namen. Der Bucket-Name darf keine vertraulichen Informationen enthalten, da der Bucket-Namespace global und öffentlich sichtbar ist.
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
-
Legen Sie die Speicherklasse auf
- Kopieren Sie die Google Cloud Projekt-ID und den Namen des Cloud Storage-Bucket. Sie benötigen diese Werte später in dieser Kurzanleitung.
Entwicklungsumgebung einrichten
Das Apache Beam SDK ist ein Open-Source-Programmiermodell für Datenpipelines. Sie definieren eine Pipeline mit einem Apache Beam-Programm und wählen dann einen Runner wie Dataflow aus, um Ihre Pipeline auszuführen.
Wir empfehlen die Verwendung der aktuellen Version von Go, wenn Sie mit dem Apache Beam SDK für Go arbeiten. Wenn Sie die aktuelle Version von Go nicht installiert haben, können Sie Go im Leitfaden zum Herunterladen und Installieren von Go für Ihr Betriebssystem verwenden.
Führen Sie den folgenden Befehl in Ihrem lokalen Terminal aus, um die Version von Go zu prüfen, die Sie installiert haben:
go versionBeispiel für Beam-Wordcount ausführen
Das Apache Beam SDK für Go enthält ein wordcount-Pipeline-Beispiel.
Das wordcount-Beispiel führt Folgendes aus:
- Liest eine Textdatei als Eingabe. Standardmäßig wird eine Textdatei in einem Cloud Storage-Bucket mit dem Ressourcennamen
gs://dataflow-samples/shakespeare/kinglear.txtgelesen. - Sie parst jede Zeile und unterteilt sie in Wörter.
- Sie misst die Häufigkeit der tokenisierten Wörter.
Führen Sie die folgenden Schritte aus, um die neueste Version des Beam-wordcount-Beispiels auf Ihrem lokalen Computer auszuführen.
Klonen Sie das GitHub-Repository
apache/beammit dem Befehlgit clone:git clone https://github.com/apache/beam.gitWechseln Sie in das Verzeichnis
beam/sdks/go:cd beam/sdks/goVerwenden Sie den folgenden Befehl, um die Pipeline auszuführen:
go run examples/wordcount/wordcount.go \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputsDas Flag
inputgibt die zu lesende Datei an und das Flagoutputgibt den Dateinamen für die Ausgabe der Häufigkeit an.
Sehen Sie sich nach Abschluss der Pipeline die Ausgabeergebnisse an:
more outputs*Drücken Sie zum Beenden q.
Pipelinecode ändern
Die Beam-wordcount-Pipeline unterscheidet zwischen Groß- und Kleinbuchstaben. Die folgenden Schritte zeigen, wie Sie Ihr eigenes Go-Modul erstellen, die wordcount-Pipeline so ändern, dass sie Groß- und Kleinschreibung nicht beachtet, und sie auf Dataflow ausführen.
Go-Modul erstellen
Gehen Sie so vor, um Änderungen am Pipelinecode vorzunehmen.
Erstellen Sie ein Verzeichnis für Ihr Go-Modul an einem Speicherort Ihrer Wahl:
mkdir wordcountcd wordcountGo-Modul erstellen Verwenden Sie für dieses Beispiel
example/dataflowals Modulpfad.go mod init example/dataflowLaden Sie die neueste Version des Codes
wordcountaus dem Apache Beam GitHub-Repository herunter. Legen Sie diese Datei in dem Verzeichniswordcountab, das Sie erstellt haben.Wenn Sie ein Nicht-Linux-Betriebssystem verwenden, müssen Sie das Go-Paket
unixabrufen. Dieses Paket ist erforderlich, um Pipelines im Dataflow-Dienst auszuführen.go get -u golang.org/x/sys/unixPrüfen Sie, ob die Datei
go.modmit dem Quellcode des Moduls übereinstimmt:go mod tidy
Unveränderte Pipeline ausführen
Prüfen Sie, ob die unveränderte wordcount-Pipeline lokal ausgeführt wird.
Erstellen Sie die Pipeline über das Terminal und führen Sie sie lokal aus:
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputsAusgabeergebnisse ansehen:
more outputs*Drücken Sie zum Beenden q.
Pipelinecode ändern
Wenn Sie die Pipeline so ändern möchten, dass sie nicht mehr zwischen Groß- und Kleinschreibung unterscheidet, ändern Sie den Code so, dass die Funktion strings.ToLower auf alle Wörter angewendet wird.
Öffnen Sie die Datei
wordcount.goin einem Editor Ihrer Wahl.Sehen Sie sich den
init-Block an (Kommentare wurden entfernt, um die Übersichtlichkeit zu verbessern):func init() { register.DoFn3x0[context.Context, string, func(string)](&extractFn{}) register.Function2x1(formatFn) register.Emitter1[string]() }Fügen Sie eine neue Zeile hinzu, um die Funktion
strings.ToLowerzu registrieren:func init() { register.DoFn3x0[context.Context, string, func(string)](&extractFn{}) register.Function2x1(formatFn) register.Emitter1[string]() register.Function1x1(strings.ToLower) }Sehen Sie sich die Funktion
CountWordsan:func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines) // Count the number of times each word occurs. return stats.Count(s, col) }Fügen Sie zum Entfernen der Kleinbuchstaben ein ParDo hinzu, das
strings.ToLowerauf jedes Wort anwendet:func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines) // Map all letters to lowercase. lowercaseWords := beam.ParDo(s, strings.ToLower, col) // Count the number of times each word occurs. return stats.Count(s, lowercaseWords) }Speichern Sie die Datei.
Aktualisierte Pipeline lokal ausführen
Führen Sie die aktualisierte wordcount-Pipeline lokal aus und prüfen Sie, ob sich die Ausgabe geändert hat.
Erstellen Sie die geänderte
wordcount-Pipeline und führen Sie sie aus:go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputsSehen Sie sich die Ausgabeergebnisse der geänderten Pipeline an. Alle Wörter sollten Kleinbuchstaben sein.
more outputs*Drücken Sie zum Beenden q.
Pipeline im Dataflow-Dienst ausführen
Führen Sie den folgenden Befehl aus, um das aktualisierte wordcount-Beispiel im Dataflow-Dienst auszuführen:
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://BUCKET_NAME/results/outputs \
--runner dataflow \
--project PROJECT_ID \
--region DATAFLOW_REGION \
--staging_location gs://BUCKET_NAME/binaries/Dabei gilt:
BUCKET_NAME: der Name des Cloud Storage-Buckets.PROJECT_ID: die Google Cloud Projekt-ID.DATAFLOW_REGION: die Region, in der Sie den Dataflow-Job bereitstellen möchten. Beispiel:europe-west1Eine Liste der verfügbaren Dataflow-Standorte finden Sie hier. Das Flag--regionüberschreibt die Standardregion, die auf dem Metadatenserver, auf Ihrem lokalen Client oder in Umgebungsvariablen festgelegt ist.
Ergebnisse ansehen
Sie können eine Liste Ihrer Dataflow-Jobs in derGoogle Cloud -Konsole aufrufen. Rufen Sie in der Google Cloud Console die Dataflow-Seite Jobs auf.
Auf der Seite Jobs werden Details zum wordcount-Job angezeigt, z. B. der Status Aktiv und dann Erfolgreich.
Wenn Sie eine Pipeline mit Dataflow ausführen, werden die Ergebnisse in einem Cloud Storage-Bucket gespeichert. Sehen Sie sich die Ausgabeergebnisse entweder mit derGoogle Cloud -Konsole oder mit dem lokalen Terminal an.
Console
Wechseln Sie in der Google Cloud Console unter „Cloud Storage“ zur Seite Buckets, um Ihre Ergebnisse anzusehen.
Klicken Sie in der Liste der Buckets in Ihrem Projekt auf den Storage-Bucket, den Sie zuvor erstellt haben. Die von Ihrem Job erstellten Ausgabedateien werden im Verzeichnis results angezeigt.
Terminal
Sehen Sie sich die Ergebnisse über Ihr Terminal oder mithilfe von Cloud Shell an.
Verwenden Sie den Befehl
gcloud storage ls, um die Ausgabedateien aufzulisten:gcloud storage ls gs://BUCKET_NAME/results/outputs* --longErsetzen Sie
BUCKET_NAMEdurch den Namen des angegebenen Cloud Storage-Ausgabe-Buckets.Verwenden Sie den Befehl
gcloud storage cat, um die Ergebnisse in den Ausgabedateien aufzurufen:gcloud storage cat gs://BUCKET_NAME/results/outputs*
Bereinigen
Löschen Sie das Projekt von Google Cloud zusammen mit den Ressourcen, damit Ihrem Konto von Google Cloud die auf dieser Seite verwendeten Ressourcen nicht in Rechnung gestellt werden.
- Wechseln Sie in der Google Cloud Console unter „Cloud Storage“ zur Seite Buckets.
- Klicken Sie auf das Kästchen neben dem Bucket, der gelöscht werden soll.
- Klicken Sie zum Löschen des Buckets auf Löschen und folgen Sie der Anleitung.
Wenn Sie Ihr Projekt beibehalten, widerrufen Sie die Rollen, die Sie dem Compute Engine-Standarddienstkonto zugewiesen haben. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus:
roles/dataflow.adminroles/dataflow.workerroles/storage.objectAdmin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=SERVICE_ACCOUNT_ROLE
-
Optional: Widerrufen Sie die von Ihnen erstellten Anmeldedaten für die Authentifizierung und löschen Sie die lokale Datei mit den Anmeldedaten:
gcloud auth application-default revoke
-
Optional: Widerrufen Sie Anmeldedaten von der gcloud-CLI.
gcloud auth revoke