Utilizzo del connettore Cloud Storage con Apache Spark

Questo tutorial mostra come eseguire il codice di esempio che utilizza il connettore Cloud Storage con Apache Spark.

Obiettivi

Scrivi un semplice job Spark di conteggio parole in Java, Scala o Python, quindi esegui il job su un cluster Dataproc.

Costi

In questo documento vengono utilizzati i seguenti componenti fatturabili di Google Cloud:

  • Compute Engine
  • Dataproc
  • Cloud Storage

Per generare una stima dei costi in base all'utilizzo previsto, utilizza il Calcolatore prezzi.

I nuovi utenti di Google Cloud potrebbero avere diritto a una prova senza costi.

Prima di iniziare

Segui i passaggi riportati di seguito per prepararti a eseguire il codice in questo tutorial.

  1. Configura il progetto. Se necessario, configura un progetto con le API Dataproc, Compute Engine e Cloud Storage abilitate e Google Cloud CLI installata sulla tua macchina locale.

    1. Accedi al tuo account Google Cloud . Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti senza costi per l'esecuzione, il test e il deployment dei workload.
    2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Roles required to select or create a project

      • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
      • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

      Go to project selector

    3. Verify that billing is enabled for your Google Cloud project.

    4. Abilita le API Dataproc, Compute Engine e Cloud Storage.

      Ruoli richiesti per abilitare le API

      Per abilitare le API, devi disporre del ruolo IAM Amministratore utilizzo dei servizi (roles/serviceusage.serviceUsageAdmin), che include l'autorizzazione serviceusage.services.enable. Scopri come concedere i ruoli.

      Abilita le API

    5. Crea un service account:

      1. Assicurati di disporre del ruolo IAM Creazione account di servizio (roles/iam.serviceAccountCreator) e del ruolo Amministratore IAM progetto (roles/resourcemanager.projectIamAdmin). Scopri come concedere i ruoli.
      2. Nella console Google Cloud vai a Crea service account.

        Vai a Crea service account
      3. Seleziona il progetto.
      4. Nel campo Nome service account, inserisci un nome. La console Google Cloud compila il campo ID service account in base a questo nome.

        Nel campo Descrizione service account, inserisci una descrizione. Ad esempio, Service account for quickstart.

      5. Fai clic su Crea e continua.
      6. Concedi il ruolo Progetto > Proprietario al account di servizio.

        Per concedere il ruolo, trova l'elenco Seleziona un ruolo, quindi seleziona Progetto > Proprietario.

      7. Fai clic su Continua.
      8. Fai clic su Fine per completare la creazione del service account.

        Non chiudere la finestra del browser. Lo utilizzerai nel prossimo passaggio.

    6. Crea una chiave dell'account di servizio:

      1. Nella console Google Cloud , fai clic sull'indirizzo email del account di servizio che hai creato.
      2. Fai clic su Chiavi.
      3. Fai clic su Aggiungi chiave, poi su Crea nuova chiave.
      4. Fai clic su Crea. Un file della chiave JSON viene scaricato sul computer.
      5. Fai clic su Chiudi.
    7. Imposta la variabile di ambiente GOOGLE_APPLICATION_CREDENTIALS sul percorso del file JSON contenente le credenziali. Questa variabile si applica solo alla sessione di shell corrente, quindi se apri una nuova sessione, imposta di nuovo la variabile.

    8. Installa Google Cloud CLI.

    9. Se utilizzi un provider di identità (IdP) esterno, devi prima accedere a gcloud CLI con la tua identità federata.

    10. Per inizializzare gcloud CLI, esegui questo comando:

      gcloud init
    11. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Roles required to select or create a project

      • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
      • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

      Go to project selector

    12. Verify that billing is enabled for your Google Cloud project.

    13. Abilita le API Dataproc, Compute Engine e Cloud Storage.

      Ruoli richiesti per abilitare le API

      Per abilitare le API, devi disporre del ruolo IAM Amministratore utilizzo dei servizi (roles/serviceusage.serviceUsageAdmin), che include l'autorizzazione serviceusage.services.enable. Scopri come concedere i ruoli.

      Abilita le API

    14. Crea un service account:

      1. Assicurati di disporre del ruolo IAM Creazione account di servizio (roles/iam.serviceAccountCreator) e del ruolo Amministratore IAM progetto (roles/resourcemanager.projectIamAdmin). Scopri come concedere i ruoli.
      2. Nella console Google Cloud vai a Crea service account.

        Vai a Crea service account
      3. Seleziona il progetto.
      4. Nel campo Nome service account, inserisci un nome. La console Google Cloud compila il campo ID service account in base a questo nome.

        Nel campo Descrizione service account, inserisci una descrizione. Ad esempio, Service account for quickstart.

      5. Fai clic su Crea e continua.
      6. Concedi il ruolo Progetto > Proprietario al account di servizio.

        Per concedere il ruolo, trova l'elenco Seleziona un ruolo, quindi seleziona Progetto > Proprietario.

      7. Fai clic su Continua.
      8. Fai clic su Fine per completare la creazione del service account.

        Non chiudere la finestra del browser. Lo utilizzerai nel prossimo passaggio.

    15. Crea una chiave dell'account di servizio:

      1. Nella console Google Cloud , fai clic sull'indirizzo email del account di servizio che hai creato.
      2. Fai clic su Chiavi.
      3. Fai clic su Aggiungi chiave, poi su Crea nuova chiave.
      4. Fai clic su Crea. Un file della chiave JSON viene scaricato sul computer.
      5. Fai clic su Chiudi.
    16. Imposta la variabile di ambiente GOOGLE_APPLICATION_CREDENTIALS sul percorso del file JSON contenente le credenziali. Questa variabile si applica solo alla sessione di shell corrente, quindi se apri una nuova sessione, imposta di nuovo la variabile.

    17. Installa Google Cloud CLI.

    18. Se utilizzi un provider di identità (IdP) esterno, devi prima accedere a gcloud CLI con la tua identità federata.

    19. Per inizializzare gcloud CLI, esegui questo comando:

      gcloud init

  2. Crea un bucket Cloud Storage. Ti serve un bucket Cloud Storage per archiviare i dati del tutorial. Se non ne hai uno pronto all'uso, crea un nuovo bucket nel tuo progetto.

    1. Nella console Google Cloud , vai alla pagina Bucket in Cloud Storage.

      Vai a Bucket

    2. Fai clic su Crea.
    3. Nella pagina Crea un bucket, inserisci le informazioni del bucket. Per andare al passaggio successivo, fai clic su Continua.
      1. Nella sezione Inizia, segui questi passaggi:
      2. Nella sezione Scegli dove archiviare i tuoi dati, segui questi passaggi:
        1. Seleziona un Tipo di località.
        2. Scegli una posizione in cui i dati del bucket vengono archiviati in modo permanente dal menu a discesa Tipo di località.
          • Se selezioni il tipo di località a doppia regione, puoi anche scegliere di attivare la replica turbo utilizzando la casella di controllo pertinente.
        3. Per configurare la replica tra bucket, seleziona Aggiungi una replica tra bucket mediante Storage Transfer Service e segui questi passaggi:

          Configura la replica tra bucket

          1. Nel menu Bucket, seleziona un bucket.
          2. Nella sezione Impostazioni di replica, fai clic su Configura per configurare le impostazioni per il job di replica.

            Viene visualizzato il riquadro Configura replica tra bucket.

            • Per filtrare gli oggetti da replicare in base al prefisso del nome dell'oggetto, inserisci un prefisso da cui includere o escludere gli oggetti, quindi fai clic su Aggiungi un prefisso.
            • Per impostare una classe di archiviazione per gli oggetti replicati, seleziona una classe di archiviazione dal menu Classe di archiviazione. Se salti questo passaggio, gli oggetti replicati utilizzeranno per impostazione predefinita la classe di archiviazione del bucket di destinazione.
            • Fai clic su Fine.
      3. Nella sezione Scegli come archiviare i tuoi dati, segui questi passaggi:
        1. Seleziona una classe di archiviazione predefinita per il bucket o Autoclass per la gestione automatica della classe di archiviazione dei dati del bucket.
        2. Per attivare lo spazio dei nomi gerarchico, nella sezione Ottimizza l'archiviazione per workload con uso intensivo dei dati, seleziona Abilita uno spazio dei nomi gerarchico in questo bucket.
      4. Nella sezione Scegli come controllare l'accesso agli oggetti, seleziona se il bucket applica o meno la prevenzione dell'accesso pubblico e seleziona un metodo di controllo dell'accesso per gli oggetti del bucket.
      5. Nella sezione Scegli come proteggere i dati degli oggetti, segui questi passaggi:
        • Seleziona una delle opzioni in Protezione dei dati che vuoi impostare per il bucket.
          • Per attivare l'eliminazione temporanea, fai clic sulla casella di controllo Criterio di eliminazione temporanea (per il recupero dei dati) e specifica il numero di giorni per cui vuoi conservare gli oggetti dopo l'eliminazione.
          • Per impostare il controllo delle versioni degli oggetti, seleziona la casella di controllo Controllo delle versioni degli oggetti (per il controllo delle versioni) e specifica il numero massimo di versioni per oggetto e il numero di giorni dopo i quali scadono le versioni non correnti.
          • Per abilitare il criterio di conservazione su oggetti e bucket, seleziona la casella di controllo Conservazione (per la conformità), quindi procedi nel seguente modo:
            • Per attivare il blocco della conservazione degli oggetti, fai clic sulla casella di controllo Abilita conservazione degli oggetti.
            • Per attivare Bucket Lock, fai clic sulla casella di controllo Imposta criterio di conservazione del bucket e scegli un'unità di tempo e una durata per il periodo di conservazione.
        • Per scegliere come verranno criptati i dati degli oggetti, espandi la sezione Crittografia dei dati () e seleziona un metodo di crittografia dei dati.
    4. Fai clic su Crea.

  3. Imposta le variabili di ambiente locali. Imposta le variabili di ambiente sulla tua macchina locale. Imposta il tuo Google Cloud project-id e il nome del bucket Cloud Storage che utilizzerai per questo tutorial. Fornisci anche il nome e la regione di un cluster Dataproc esistente o nuovo. Puoi creare un cluster da utilizzare in questo tutorial nel passaggio successivo.

    PROJECT=project-id
    
    BUCKET_NAME=bucket-name
    
    CLUSTER=cluster-name
    
    REGION=cluster-region Example: "us-central1"
    

  4. Crea un cluster Dataproc. Esegui il comando riportato di seguito per creare un cluster Dataproc a nodo singolo nella zona di Compute Engine specificata.

    gcloud dataproc clusters create ${CLUSTER} \
        --project=${PROJECT} \
        --region=${REGION} \
        --single-node
    

  5. Copia i dati pubblici nel tuo bucket Cloud Storage. Copia un frammento di testo di Shakespeare pubblico nella cartella input del tuo bucket Cloud Storage:

    gcloud storage cp gs://pub/shakespeare/rose.txt \
        gs://${BUCKET_NAME}/input/rose.txt
    

  6. Configura un ambiente di sviluppo Java (Apache Maven), Scala (SBT) o Python.

Prepara il job Spark di conteggio parole

Seleziona una scheda di seguito per seguire i passaggi per preparare un pacchetto o un file di job da inviare al cluster. Puoi preparare uno dei seguenti tipi di lavoro:

Java

  1. Copia il file pom.xml nella macchina locale. Il seguente file pom.xml specifica le dipendenze delle librerie Scala e Spark, a cui viene assegnato un ambito provided per indicare che il cluster Dataproc fornirà queste librerie in fase di runtime. Il file pom.xml non specifica una dipendenza da Cloud Storage perché il connettore implementa l'interfaccia HDFS standard. Quando un job Spark accede ai file del cluster Cloud Storage (file con URI che iniziano con gs://), il sistema utilizza automaticamente il connettore Cloud Storage per accedere ai file in Cloud Storage
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>dataproc.codelab</groupId>
      <artifactId>word-count</artifactId>
      <version>1.0</version>
    
      <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>Scala version, for example, 2.11.8</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_Scala major.minor.version, for example, 2.11</artifactId>
          <version>Spark version, for example, 2.3.1</version>
          <scope>provided</scope>
        </dependency>
      </dependencies>
    </project>
  2. Copia il codice WordCount.java elencato di seguito sul tuo computer locale.
    1. Crea un insieme di directory con il percorso src/main/java/dataproc/codelab:
      mkdir -p src/main/java/dataproc/codelab
      
    2. Copia WordCount.java sulla tua macchina locale in src/main/java/dataproc/codelab:
      cp WordCount.java src/main/java/dataproc/codelab
      

    WordCount.java è un job Spark in Java che legge file di testo da Cloud Storage, esegue un conteggio delle parole e poi scrive i risultati del file di testo in Cloud Storage.

    package dataproc.codelab;
    
    import java.util.Arrays;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import scala.Tuple2;
    
    public class WordCount {
      public static void main(String[] args) {
        if (args.length != 2) {
          throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>");
        }
        String inputPath = args[0];
        String outputPath = args[1];
        JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count"));
        JavaRDD<String> lines = sparkContext.textFile(inputPath);
        JavaRDD<String> words = lines.flatMap(
            (String line) -> Arrays.asList(line.split(" ")).iterator()
        );
        JavaPairRDD<String, Integer> wordCounts = words.mapToPair(
            (String word) -> new Tuple2<>(word, 1)
        ).reduceByKey(
            (Integer count1, Integer count2) -> count1 + count2
        );
        wordCounts.saveAsTextFile(outputPath);
      }
    }
  3. Crea il pacchetto.
    mvn clean package
    
    Se la build ha esito positivo, viene creato un target/word-count-1.0.jar.
  4. Trasferisci il pacchetto a Cloud Storage.
    gcloud storage cp target/word-count-1.0.jar \
        gs://${BUCKET_NAME}/java/word-count-1.0.jar
    

Scala

  1. Copia il file build.sbt nella macchina locale. Il seguente file build.sbt specifica le dipendenze delle librerie Scala e Spark, a cui viene assegnato un ambito provided per indicare che il cluster Dataproc fornirà queste librerie in fase di runtime. Il file build.sbt non specifica una dipendenza da Cloud Storage perché il connettore implementa l'interfaccia HDFS standard. Quando un job Spark accede ai file del cluster Cloud Storage (file con URI che iniziano con gs://), il sistema utilizza automaticamente il connettore Cloud Storage per accedere ai file in Cloud Storage
    scalaVersion := "Scala version, for example, 2.11.8"
    
    name := "word-count"
    organization := "dataproc.codelab"
    version := "1.0"
    
    libraryDependencies ++= Seq(
      "org.scala-lang" % "scala-library" % scalaVersion.value % "provided",
      "org.apache.spark" %% "spark-core" % "Spark version, for example, 2.3.1" % "provided"
    )
  2. Copia word-count.scala sulla tua macchina locale. Si tratta di un job Spark in Java che legge file di testo da Cloud Storage, esegue un conteggio delle parole e poi scrive i risultati del file di testo in Cloud Storage.
    package dataproc.codelab
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    object WordCount {
      def main(args: Array[String]) {
        if (args.length != 2) {
          throw new IllegalArgumentException(
              "Exactly 2 arguments are required: <inputPath> <outputPath>")
        }
    
        val inputPath = args(0)
        val outputPath = args(1)
    
        val sc = new SparkContext(new SparkConf().setAppName("Word Count"))
        val lines = sc.textFile(inputPath)
        val words = lines.flatMap(line => line.split(" "))
        val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
        wordCounts.saveAsTextFile(outputPath)
      }
    }
  3. Crea il pacchetto.
    sbt clean package
    
    Se la build ha esito positivo, viene creato un target/scala-2.11/word-count_2.11-1.0.jar.
  4. Trasferisci il pacchetto a Cloud Storage.
    gcloud storage cp target/scala-2.11/word-count_2.11-1.0.jar \
        gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
    

Python

  1. Copia word-count.py sulla tua macchina locale. Si tratta di un job Spark in Python che utilizza PySpark per leggere i file di testo da Cloud Storage, esegue un conteggio delle parole e poi scrive i risultati del file di testo in Cloud Storage.
    #!/usr/bin/env python
    
    import pyspark
    import sys
    
    if len(sys.argv) != 3:
      raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>")
    
    inputUri=sys.argv[1]
    outputUri=sys.argv[2]
    
    sc = pyspark.SparkContext()
    lines = sc.textFile(sys.argv[1])
    words = lines.flatMap(lambda line: line.split())
    wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2)
    wordCounts.saveAsTextFile(sys.argv[2])

Invia il job

Esegui questo comando gcloud per inviare il job di conteggio parole al cluster Dataproc.

Java

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Scala

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Python

gcloud dataproc jobs submit pyspark word-count.py \
    --cluster=${CLUSTER} \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Visualizzare l'output

Al termine del job, esegui questo comando gcloud CLI per visualizzare l'output del conteggio delle parole.

gcloud storage cat gs://${BUCKET_NAME}/output/*

L'output del conteggio delle parole dovrebbe essere simile al seguente:

(a,2)
(call,1)
(What's,1)
(sweet.,1)
(we,1)
(as,1)
(name?,1)
(any,1)
(other,1)
(rose,1)
(smell,1)
(name,1)
(would,1)
(in,1)
(which,1)
(That,1)
(By,1)

Esegui la pulizia

Al termine del tutorial, puoi eliminare le risorse che hai creato in modo che non utilizzino più la quota generando addebiti. Le seguenti sezioni descrivono come eliminare o disattivare queste risorse.

Elimina il progetto

Il modo più semplice per eliminare la fatturazione è eliminare il progetto creato per il tutorial.

Per eliminare il progetto:

  1. Nella console Google Cloud , vai alla pagina Gestisci risorse.

    Vai a Gestisci risorse

  2. Nell'elenco dei progetti, seleziona quello che vuoi eliminare, quindi fai clic su Elimina.
  3. Nella finestra di dialogo, digita l'ID del progetto e fai clic su Chiudi per eliminare il progetto.

Elimina il cluster Dataproc

Anziché eliminare il progetto, potresti voler eliminare solo il cluster al suo interno.

Elimina il bucket Cloud Storage

ConsoleGoogle Cloud

  1. Nella console Google Cloud , vai alla pagina Bucket in Cloud Storage.

    Vai a Bucket

  2. Fai clic sulla casella di controllo del bucket da eliminare.
  3. Per eliminare il bucket, fai clic su Elimina, quindi segui le istruzioni.

Riga di comando

    Elimina il bucket:
    gcloud storage buckets delete BUCKET_NAME

Passaggi successivi