Cloud Storage-Connector mit Apache Spark verwenden

In dieser Anleitung erfahren Sie, wie Sie Beispielcode ausführen, der den Cloud Storage-Connector mit Apache Spark verwendet.

Ziele

Schreiben Sie einen einfachen Wordcount-Spark-Job in Java, Scala oder Python und führen Sie den Job dann in einem Dataproc-Cluster aus.

Kosten

In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:

  • Compute Engine
  • Dataproc
  • Cloud Storage

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen.

Neuen Nutzern von Google Cloud steht möglicherweise eine kostenlose Testversion zur Verfügung.

Hinweis

Führen Sie die Schritte unten aus, um den Code für diese Anleitung vorzubereiten.

  1. Richten Sie ein Projekt ein. Richten Sie bei Bedarf ein Projekt mit aktivierten Dataproc-, Compute Engine- und Cloud Storage-APIs ein und installieren Sie die Google Cloud CLI auf Ihrem lokalen Computer.

    1. Melden Sie sich in Ihrem Google Cloud -Konto an. Wenn Sie mit Google Cloudnoch nicht vertraut sind, erstellen Sie ein Konto, um die Leistungsfähigkeit unserer Produkte in der Praxis sehen und bewerten zu können. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.
    2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Roles required to select or create a project

      • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
      • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

      Go to project selector

    3. Verify that billing is enabled for your Google Cloud project.

    4. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

      Roles required to enable APIs

      To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

      Enable the APIs

    5. Create a service account:

      1. Ensure that you have the Create Service Accounts IAM role (roles/iam.serviceAccountCreator) and the Project IAM Admin role (roles/resourcemanager.projectIamAdmin). Learn how to grant roles.
      2. In the Google Cloud console, go to the Create service account page.

        Go to Create service account
      3. Select your project.
      4. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

        In the Service account description field, enter a description. For example, Service account for quickstart.

      5. Click Create and continue.
      6. Grant the Project > Owner role to the service account.

        To grant the role, find the Select a role list, then select Project > Owner.

      7. Click Continue.
      8. Click Done to finish creating the service account.

        Do not close your browser window. You will use it in the next step.

    6. Create a service account key:

      1. In the Google Cloud console, click the email address for the service account that you created.
      2. Click Keys.
      3. Click Add key, and then click Create new key.
      4. Click Create. A JSON key file is downloaded to your computer.
      5. Click Close.
    7. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

    8. Installieren Sie die Google Cloud CLI.

    9. Wenn Sie einen externen Identitätsanbieter (IdP) verwenden, müssen Sie sich zuerst mit Ihrer föderierten Identität in der gcloud CLI anmelden.

    10. Führen Sie den folgenden Befehl aus, um die gcloud CLI zu initialisieren:

      gcloud init
    11. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Roles required to select or create a project

      • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
      • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

      Go to project selector

    12. Verify that billing is enabled for your Google Cloud project.

    13. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

      Roles required to enable APIs

      To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

      Enable the APIs

    14. Create a service account:

      1. Ensure that you have the Create Service Accounts IAM role (roles/iam.serviceAccountCreator) and the Project IAM Admin role (roles/resourcemanager.projectIamAdmin). Learn how to grant roles.
      2. In the Google Cloud console, go to the Create service account page.

        Go to Create service account
      3. Select your project.
      4. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

        In the Service account description field, enter a description. For example, Service account for quickstart.

      5. Click Create and continue.
      6. Grant the Project > Owner role to the service account.

        To grant the role, find the Select a role list, then select Project > Owner.

      7. Click Continue.
      8. Click Done to finish creating the service account.

        Do not close your browser window. You will use it in the next step.

    15. Create a service account key:

      1. In the Google Cloud console, click the email address for the service account that you created.
      2. Click Keys.
      3. Click Add key, and then click Create new key.
      4. Click Create. A JSON key file is downloaded to your computer.
      5. Click Close.
    16. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

    17. Installieren Sie die Google Cloud CLI.

    18. Wenn Sie einen externen Identitätsanbieter (IdP) verwenden, müssen Sie sich zuerst mit Ihrer föderierten Identität in der gcloud CLI anmelden.

    19. Führen Sie den folgenden Befehl aus, um die gcloud CLI zu initialisieren:

      gcloud init

  2. Erstellen Sie einen Cloud Storage-Bucket. Sie benötigen Cloud Storage, um Anleitungsdaten zu speichern. Wenn Sie den Dienst nicht haben, können Sie einen neuen Bucket in Ihrem Projekt erstellen.

    1. Wechseln Sie in der Google Cloud Console unter „Cloud Storage“ zur Seite Buckets.

      Buckets aufrufen

    2. Klicken Sie auf Erstellen.
    3. Geben Sie auf der Seite Bucket erstellen die Bucket-Informationen ein. Klicken Sie auf Weiter, um mit dem nächsten Schritt fortzufahren.
      1. Führen Sie im Abschnitt Einstieg die folgenden Schritte aus:
        • Geben Sie einen global eindeutigen Namen ein, der den Anforderungen für Bucket-Namen entspricht.
        • So fügen Sie ein Bucket-Label hinzu: Maximieren Sie den Abschnitt Labels (), klicken Sie auf Label hinzufügen und geben Sie key und value für Ihr Label an.
      2. Gehen Sie im Bereich Speicherort für Daten auswählen so vor:
        1. Standorttyp auswählen.
        2. Wählen Sie im Drop-down-Menü Standorttyp einen Standort aus, an dem die Daten Ihres Buckets dauerhaft gespeichert werden.
          • Wenn Sie den Standorttyp Dual-Region auswählen, können Sie auch die Turboreplikation aktivieren, indem Sie das entsprechende Kästchen anklicken.
        3. Wenn Sie die Bucket-übergreifende Replikation einrichten möchten, wählen Sie Bucket-übergreifende Replikation über Storage Transfer Service hinzufügen aus und führen Sie die folgenden Schritte aus:

          Bucket-übergreifende Replikation einrichten

          1. Wählen Sie im Menü Bucket einen Bucket aus.
          2. Klicken Sie im Bereich Replikationseinstellungen auf Konfigurieren, um die Einstellungen für den Replikationsjob zu konfigurieren.

            Der Bereich Bucket-übergreifende Replikation konfigurieren wird angezeigt.

            • Wenn Sie die zu replizierenden Objekte nach dem Objektnamenspräfix filtern möchten, geben Sie ein Präfix ein, mit dem Sie Objekte ein- oder ausschließen möchten, und klicken Sie dann auf  Präfix hinzufügen.
            • Wenn Sie eine Speicherklasse für die replizierten Objekte festlegen möchten, wählen Sie im Menü Speicherklasse eine Speicherklasse aus. Wenn Sie diesen Schritt überspringen, wird für replizierte Objekte standardmäßig die Speicherklasse des Ziel-Buckets verwendet.
            • Klicken Sie auf Fertig.
      3. Gehen Sie im Bereich Speicherort für Daten auswählen so vor:
        1. Wählen Sie eine Standardspeicherklasse für den Bucket oder Autoclass für die automatische Speicherklassenverwaltung der Daten Ihres Buckets aus.
        2. Wenn Sie den hierarchischen Namespace aktivieren möchten, wählen Sie im Bereich Speicher für datenintensive Arbeitslasten optimieren die Option Hierarchischen Namespace für diesen Bucket aktivieren aus.
      4. Wählen Sie im Abschnitt Zugriff auf Objekte steuern aus, ob der Bucket Verhinderung des öffentlichen Zugriffs durchsetzt, und wählen Sie eine Zugriffssteuerungsmethode für die Objekte Ihres Buckets aus.
      5. Führen Sie im Bereich Auswählen, wie Objektdaten geschützt werden die folgenden Schritte aus:
        • Wählen Sie unter Datenschutz die gewünschten Optionen für Ihren Bucket aus.
          • Wenn Sie Vorläufiges Löschen aktivieren möchten, klicken Sie das Kästchen Richtlinie für vorläufiges Löschen (zur Datenwiederherstellung) an und geben Sie die Anzahl der Tage an, die Objekte nach dem Löschen beibehalten werden sollen.
          • Wenn Sie die Objektversionsverwaltung festlegen möchten, klicken Sie das Kästchen Objektversionsverwaltung (zur Datenwiederherstellung) an und geben Sie die maximale Anzahl von Versionen pro Objekt und die Anzahl der Tage an, nach denen die nicht aktuellen Versionen ablaufen.
          • Klicken Sie das Kästchen Aufbewahrung (für Compliance) an, um die Aufbewahrungsrichtlinie für Objekte und Buckets zu aktivieren, und gehen Sie dann so vor:
            • Klicken Sie auf das Kästchen Objektaufbewahrung aktivieren, um die Objektaufbewahrungssperre zu aktivieren.
            • Wenn Sie Bucket Lock aktivieren möchten, klicken Sie das Kästchen Bucket-Aufbewahrungsrichtlinie festlegen an und wählen Sie eine Zeiteinheit und eine Zeitdauer für die Aufbewahrungsdauer aus.
        • Um auszuwählen, wie Ihre Objektdaten verschlüsselt werden, maximieren Sie den Bereich Datenverschlüsselung () und wählen Sie eine Methode für die Datenverschlüsselung aus.
    4. Klicken Sie auf Erstellen.

  3. Legen Sie lokale Umgebungsvariablen fest. Legen Sie Umgebungsvariablen auf Ihrem lokalen Computer fest. Legen Sie Ihre Google Cloud project-id und den Namen des Cloud Storage-Bucket fest, den Sie für diese Anleitung verwenden werden. Geben Sie außerdem den Namen und die Region eines vorhandenen oder neuen Dataproc-Clusters an. Sie können im nächsten Schritt einen Cluster erstellen, der in dieser Anleitung verwendet werden soll.

    PROJECT=project-id
    
    BUCKET_NAME=bucket-name
    
    CLUSTER=cluster-name
    
    REGION=cluster-region Example: "us-central1"
    

  4. Dataproc-Cluster erstellen. Führen Sie den folgenden Befehl aus, um einen Dataproc-Cluster mit einem einzelnen Knoten in der angegebenen Compute Engine-Zone zu erstellen.

    gcloud dataproc clusters create ${CLUSTER} \
        --project=${PROJECT} \
        --region=${REGION} \
        --single-node
    

  5. Kopieren Sie öffentliche Daten in Ihren Cloud Storage-Bucket. Kopieren Sie ein öffentliches Data-Shakepee-Text-Snippet in den Ordner input Ihres Cloud Storage-Buckets:

    gcloud storage cp gs://pub/shakespeare/rose.txt \
        gs://${BUCKET_NAME}/input/rose.txt
    

  6. Java- (Apache Maven), Scala- (SBT) oder Python- Entwicklungsumgebung einrichten.

Spark-WordCount-Job vorbereiten

Wählen Sie unten einen Tab aus, um die Schritte zum Vorbereiten eines Jobpakets oder einer Datei zum Senden an den Cluster auszuführen. Sie können einen der folgenden Jobtypen vorbereiten:

Java

  1. pom.xml-Datei auf Ihren lokalen Computer kopieren. Die folgende pom.xml-Datei gibt Scala- und Spark-Bibliotheksabhängigkeiten an, denen ein provided-Bereich zugewiesen wird, um anzugeben, dass der Dataproc-Cluster diese Bibliotheken zur Laufzeit bereitstellt. Die pom.xml-Datei gibt keine Cloud Storage-Abhängigkeit an, da der Connector die standardmäßige HDFS-Schnittstelle implementiert. Wenn ein Spark-Job auf Cloud Storage-Clusterdateien zugreift (Dateien mit URIs, die mit gs:// beginnen), verwendet das System automatisch den Cloud Storage-Connector, um auf die Dateien in Cloud Storage zuzugreifen
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>dataproc.codelab</groupId>
      <artifactId>word-count</artifactId>
      <version>1.0</version>
    
      <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>Scala version, for example, 2.11.8</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_Scala major.minor.version, for example, 2.11</artifactId>
          <version>Spark version, for example, 2.3.1</version>
          <scope>provided</scope>
        </dependency>
      </dependencies>
    </project>
  2. Den unten aufgeführten WordCount.java-Code auf Ihren lokalen Computer kopieren.
    1. Erstellen Sie einen Satz von Verzeichnissen mit dem Pfad src/main/java/dataproc/codelab:
      mkdir -p src/main/java/dataproc/codelab
      
    2. Kopieren Sie WordCount.java auf Ihren lokalen Computer in src/main/java/dataproc/codelab:
      cp WordCount.java src/main/java/dataproc/codelab
      

    WordCount.java ist ein Spark-Job in Java, der Textdateien aus Cloud Storage liest, eine Wortzählung durchführt und dann die Ergebnisse der Textdatei in Cloud Storage schreibt.

    package dataproc.codelab;
    
    import java.util.Arrays;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import scala.Tuple2;
    
    public class WordCount {
      public static void main(String[] args) {
        if (args.length != 2) {
          throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>");
        }
        String inputPath = args[0];
        String outputPath = args[1];
        JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count"));
        JavaRDD<String> lines = sparkContext.textFile(inputPath);
        JavaRDD<String> words = lines.flatMap(
            (String line) -> Arrays.asList(line.split(" ")).iterator()
        );
        JavaPairRDD<String, Integer> wordCounts = words.mapToPair(
            (String word) -> new Tuple2<>(word, 1)
        ).reduceByKey(
            (Integer count1, Integer count2) -> count1 + count2
        );
        wordCounts.saveAsTextFile(outputPath);
      }
    }
  3. Erstellen Sie das Paket.
    mvn clean package
    
    Ist dies erfolgreich, wird eine target/word-count-1.0.jar-Datei erstellt.
  4. Das Paket in Cloud Storage bereitstellen.
    gcloud storage cp target/word-count-1.0.jar \
        gs://${BUCKET_NAME}/java/word-count-1.0.jar
    

Scala

  1. build.sbt-Datei auf Ihren lokalen Computer kopieren. Die folgende build.sbt-Datei gibt Scala- und Spark-Bibliotheksabhängigkeiten an, denen ein provided-Bereich zugewiesen wird, um anzugeben, dass der Dataproc-Cluster diese Bibliotheken zur Laufzeit bereitstellt. Die build.sbt-Datei gibt keine Cloud Storage-Abhängigkeit an, da der Connector die standardmäßige HDFS-Schnittstelle implementiert. Wenn ein Spark-Job auf Cloud Storage-Clusterdateien zugreift (Dateien mit URIs, die mit gs:// beginnen), verwendet das System automatisch den Cloud Storage-Connector, um auf die Dateien in Cloud Storage zuzugreifen
    scalaVersion := "Scala version, for example, 2.11.8"
    
    name := "word-count"
    organization := "dataproc.codelab"
    version := "1.0"
    
    libraryDependencies ++= Seq(
      "org.scala-lang" % "scala-library" % scalaVersion.value % "provided",
      "org.apache.spark" %% "spark-core" % "Spark version, for example, 2.3.1" % "provided"
    )
  2. word-count.scala auf Ihren lokalen Computer kopieren. Dies ist ein Spark-Job in Java, der Textdateien aus Cloud Storage liest, eine Wortzählung ausführt und die Ergebnisse der Textdatei anschließend in Cloud Storage schreibt.
    package dataproc.codelab
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    object WordCount {
      def main(args: Array[String]) {
        if (args.length != 2) {
          throw new IllegalArgumentException(
              "Exactly 2 arguments are required: <inputPath> <outputPath>")
        }
    
        val inputPath = args(0)
        val outputPath = args(1)
    
        val sc = new SparkContext(new SparkConf().setAppName("Word Count"))
        val lines = sc.textFile(inputPath)
        val words = lines.flatMap(line => line.split(" "))
        val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
        wordCounts.saveAsTextFile(outputPath)
      }
    }
  3. Erstellen Sie das Paket.
    sbt clean package
    
    Ist dies erfolgreich, wird eine target/scala-2.11/word-count_2.11-1.0.jar-Datei erstellt.
  4. Das Paket in Cloud Storage bereitstellen.
    gcloud storage cp target/scala-2.11/word-count_2.11-1.0.jar \
        gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
    

Python

  1. word-count.py auf Ihren lokalen Computer kopieren. Dies ist ein Spark-Job in Python mit PySpark, der Textdateien aus Cloud Storage liest, eine Wortzählung ausführt und die Ergebnisse der Textdatei anschließend in Cloud Storage schreibt.
    #!/usr/bin/env python
    
    import pyspark
    import sys
    
    if len(sys.argv) != 3:
      raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>")
    
    inputUri=sys.argv[1]
    outputUri=sys.argv[2]
    
    sc = pyspark.SparkContext()
    lines = sc.textFile(sys.argv[1])
    words = lines.flatMap(lambda line: line.split())
    wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2)
    wordCounts.saveAsTextFile(sys.argv[2])

Job senden

Führen Sie den folgenden gcloud-Befehl aus, um den Wordcount-Job an Ihren Dataproc-Cluster zu senden.

Java

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Scala

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Python

gcloud dataproc jobs submit pyspark word-count.py \
    --cluster=${CLUSTER} \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Ausgabe ansehen

Führen Sie nach Abschluss des Jobs den folgenden gcloud CLI-Befehl aus, um die Wordcount-Ausgabe aufzurufen.

gcloud storage cat gs://${BUCKET_NAME}/output/*

Es sollte etwa diese Ausgabe angezeigt werden:

(a,2)
(call,1)
(What's,1)
(sweet.,1)
(we,1)
(as,1)
(name?,1)
(any,1)
(other,1)
(rose,1)
(smell,1)
(name,1)
(would,1)
(in,1)
(which,1)
(That,1)
(By,1)

Bereinigen

Nachdem Sie die Anleitung abgeschlossen haben, können Sie die erstellten Ressourcen bereinigen, damit sie keine Kontingente mehr nutzen und keine Gebühren mehr anfallen. In den folgenden Abschnitten erfahren Sie, wie Sie diese Ressourcen löschen oder deaktivieren.

Projekt löschen

Am einfachsten vermeiden Sie weitere Kosten durch Löschen des für die Anleitung erstellten Projekts.

So löschen Sie das Projekt:

  1. Wechseln Sie in der Google Cloud -Console zur Seite Ressourcen verwalten.

    Zur Seite „Ressourcen verwalten“

  2. Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie dann auf Löschen.
  3. Geben Sie im Dialogfeld die Projekt-ID ein und klicken Sie auf Shut down (Beenden), um das Projekt zu löschen.

Dataproc-Cluster löschen

Wenn Sie Ihr Projekt löschen möchten, sollten Sie nur Ihren Cluster innerhalb des Projekts löschen.

Cloud Storage-Bucket löschen

Google Cloud Console

  1. Wechseln Sie in der Google Cloud Console unter „Cloud Storage“ zur Seite Buckets.

    Buckets aufrufen

  2. Klicken Sie auf das Kästchen neben dem Bucket, der gelöscht werden soll.
  3. Klicken Sie zum Löschen des Buckets auf Löschen und folgen Sie der Anleitung.

Befehlszeile

    Bucket löschen:
    gcloud storage buckets delete BUCKET_NAME

Nächste Schritte