Usar o conector do Cloud Storage com o Apache Spark

Este tutorial mostra como executar um código de exemplo que usa o conector do Cloud Storage com o Apache Spark.

Objetivos

Escreva um job simples de contagem de palavras do Spark em Java, Scala ou Python e execute-o em um cluster do Dataproc.

Custos

Neste documento, você vai usar os seguintes componentes faturáveis do Google Cloud:

  • Compute Engine
  • Dataproc
  • Cloud Storage

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços.

Novos usuários do Google Cloud podem estar qualificados para um teste sem custo financeiro.

Antes de começar

Execute as etapas abaixo para se preparar para executar o código neste tutorial.

  1. Criar o projeto. Se necessário, configure um projeto com as APIs Dataproc, Compute Engine e Cloud Storage ativadas e a Google Cloud CLI instalada na máquina local.

    1. Faça login na sua conta do Google Cloud . Se você começou a usar o Google Cloud, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
    2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Roles required to select or create a project

      • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
      • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

      Go to project selector

    3. Verify that billing is enabled for your Google Cloud project.

    4. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

      Roles required to enable APIs

      To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

      Enable the APIs

    5. Create a service account:

      1. Ensure that you have the Create Service Accounts IAM role (roles/iam.serviceAccountCreator) and the Project IAM Admin role (roles/resourcemanager.projectIamAdmin). Learn how to grant roles.
      2. In the Google Cloud console, go to the Create service account page.

        Go to Create service account
      3. Select your project.
      4. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

        In the Service account description field, enter a description. For example, Service account for quickstart.

      5. Click Create and continue.
      6. Grant the Project > Owner role to the service account.

        To grant the role, find the Select a role list, then select Project > Owner.

      7. Click Continue.
      8. Click Done to finish creating the service account.

        Do not close your browser window. You will use it in the next step.

    6. Create a service account key:

      1. In the Google Cloud console, click the email address for the service account that you created.
      2. Click Keys.
      3. Click Add key, and then click Create new key.
      4. Click Create. A JSON key file is downloaded to your computer.
      5. Click Close.
    7. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

    8. Instale a CLI do Google Cloud.

    9. Ao usar um provedor de identidade (IdP) externo, primeiro faça login na gcloud CLI com sua identidade federada.

    10. Para inicializar a gcloud CLI, execute o seguinte comando:

      gcloud init
    11. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Roles required to select or create a project

      • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
      • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

      Go to project selector

    12. Verify that billing is enabled for your Google Cloud project.

    13. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

      Roles required to enable APIs

      To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

      Enable the APIs

    14. Create a service account:

      1. Ensure that you have the Create Service Accounts IAM role (roles/iam.serviceAccountCreator) and the Project IAM Admin role (roles/resourcemanager.projectIamAdmin). Learn how to grant roles.
      2. In the Google Cloud console, go to the Create service account page.

        Go to Create service account
      3. Select your project.
      4. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

        In the Service account description field, enter a description. For example, Service account for quickstart.

      5. Click Create and continue.
      6. Grant the Project > Owner role to the service account.

        To grant the role, find the Select a role list, then select Project > Owner.

      7. Click Continue.
      8. Click Done to finish creating the service account.

        Do not close your browser window. You will use it in the next step.

    15. Create a service account key:

      1. In the Google Cloud console, click the email address for the service account that you created.
      2. Click Keys.
      3. Click Add key, and then click Create new key.
      4. Click Create. A JSON key file is downloaded to your computer.
      5. Click Close.
    16. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

    17. Instale a CLI do Google Cloud.

    18. Ao usar um provedor de identidade (IdP) externo, primeiro faça login na gcloud CLI com sua identidade federada.

    19. Para inicializar a gcloud CLI, execute o seguinte comando:

      gcloud init

  2. Criar um bucket do Cloud Storage Você precisa do Cloud Storage para armazenar os dados do tutorial. Se você não tiver um pronto para usá-lo, crie um novo bucket no projeto.

    1. No console do Google Cloud , acesse a página Buckets do Cloud Storage.

      Acessar buckets

    2. Clique em Criar.
    3. Na página Criar um bucket, insira as informações do seu bucket. Para ir à próxima etapa, clique em Continuar.
      1. Na seção Começar, faça o seguinte:
      2. Na seção Escolha onde armazenar seus dados, faça o seguinte:
        1. Selecione um tipo de local.
        2. Escolha um local onde os dados do bucket são armazenados permanentemente no menu suspenso Tipo de local.
        3. Para configurar a replicação entre buckets, selecione Adicionar replicação entre buckets usando o Serviço de transferência do Cloud Storage e siga estas etapas:

          Configurar a replicação entre buckets

          1. No menu Bucket, selecione um bucket.
          2. Na seção Configurações de replicação, clique em Configurar para definir as configurações do job de replicação.

            O painel Configurar a replicação entre buckets aparece.

            • Para filtrar objetos a serem replicados por prefixo de nome de objeto, insira um prefixo com que você quer incluir ou excluir objetos e clique em Adicionar um prefixo.
            • Para definir uma classe de armazenamento para os objetos replicados, selecione uma classe de armazenamento no menu Classe de armazenamento. Se você pular esta etapa, os objetos replicados vão usar a classe de armazenamento do bucket de destino por padrão.
            • Clique em Concluído.
      3. Na seção Escolha como armazenar seus dados, faça o seguinte:
        1. Selecione uma classe de armazenamento padrão para o bucket ou Classe automática para gerenciamento automático da classe de armazenamento dos dados do bucket.
        2. Para ativar o namespace hierárquico, na seção Otimizar o armazenamento para cargas de trabalho com uso intensivo de dados, selecione Ativar namespace hierárquico neste bucket.
      4. Na seção Escolha como controlar o acesso a objetos, selecione se o bucket aplica ou não a prevenção de acesso público e selecione um método de controle de acesso para os objetos do bucket.
      5. Na seção Escolha como proteger os dados do objeto, faça o seguinte:
        • Selecione qualquer uma das opções em Proteção de dados que você quer definir para o bucket.
          • Para ativar a exclusão reversível, clique na caixa de seleção Política de exclusão reversível (para recuperação de dados) e especifique o número de dias que você quer reter os objetos após a exclusão.
          • Para definir o controle de versões de objetos, clique na caixa de seleção Controle de versões de objetos (para controle de versões) e especifique o número máximo de versões por objeto e o número de dias após os quais as versões não atuais expiram.
          • Para ativar a política de retenção em objetos e buckets, clique na caixa de seleção Retenção (para compliance) e faça o seguinte:
            • Para ativar o bloqueio de retenção de objetos, clique na caixa de seleção Ativar retenção de objetos.
            • Para ativar o Bloqueio de buckets, clique na caixa de seleção Definir política de retenção de buckets e escolha uma unidade e um período de armazenamento para a retenção.
        • Para escolher como os dados do objeto serão criptografados, expanda a seção Criptografia de dados () e selecione um método de Criptografia de dados.
    4. Clique em Criar.

  3. Defina variáveis de ambiente locais. Defina variáveis de ambiente na máquina local. Defina o ID do projeto Google Cloud e o nome do bucket do Cloud Storage que você vai usar neste tutorial. Forneça também o nome e a região de um cluster novo ou existente do Dataproc. Você pode criar um cluster para usar neste tutorial na próxima etapa.

    PROJECT=project-id
    
    BUCKET_NAME=bucket-name
    
    CLUSTER=cluster-name
    
    REGION=cluster-region Example: "us-central1"
    

  4. Criar um cluster de Dataproc. Execute o comando abaixo para criar um cluster do Dataproc de nó único na zona do Compute Engine especificada.

    gcloud dataproc clusters create ${CLUSTER} \
        --project=${PROJECT} \
        --region=${REGION} \
        --single-node
    

  5. Copie dados públicos para o bucket do Cloud Storage. Copie um snippet de um texto de Shakespeare de domínio público para a pasta input do bucket do Cloud Storage:

    gcloud storage cp gs://pub/shakespeare/rose.txt \
        gs://${BUCKET_NAME}/input/rose.txt
    

  6. Configure um ambiente de desenvolvimento Java (Apache Maven), Scala (SBT) ou Python.

Preparar o job de contagem de palavras do Spark

Selecione uma guia abaixo para seguir as etapas e preparar um pacote ou arquivo de job para enviar ao cluster. Você pode preparar um dos seguintes tipos de job:

Java

  1. Copie o arquivo pom.xml para sua máquina local. O arquivo pom.xml a seguir especifica as dependências da biblioteca Scala e Spark, que recebem um escopo provided para indicar que o cluster do Dataproc fornecerá essas bibliotecas no ambiente de execução. O arquivo pom.xml não especifica uma dependência do Cloud Storage porque o conector implementa a interface HDFS padrão. Quando um job do Spark acessa arquivos de cluster do Cloud Storage (arquivos com URIs que começam com gs:// ), o sistema usa automaticamente o conector do Cloud Storage para acessar os arquivos no Cloud Storage
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>dataproc.codelab</groupId>
      <artifactId>word-count</artifactId>
      <version>1.0</version>
    
      <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>Scala version, for example, 2.11.8</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_Scala major.minor.version, for example, 2.11</artifactId>
          <version>Spark version, for example, 2.3.1</version>
          <scope>provided</scope>
        </dependency>
      </dependencies>
    </project>
  2. Copie o código WordCount.java listado abaixo para sua máquina local.
    1. Crie um conjunto de diretórios com o caminho src/main/java/dataproc/codelab:
      mkdir -p src/main/java/dataproc/codelab
      
    2. Copie WordCount.java para sua máquina local em src/main/java/dataproc/codelab:
      cp WordCount.java src/main/java/dataproc/codelab
      

    WordCount.java é um job do Spark em Java que lê arquivos de texto do Cloud Storage, faz a contagem de palavras e grava os resultados em um arquivo de texto no Cloud Storage.

    package dataproc.codelab;
    
    import java.util.Arrays;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import scala.Tuple2;
    
    public class WordCount {
      public static void main(String[] args) {
        if (args.length != 2) {
          throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>");
        }
        String inputPath = args[0];
        String outputPath = args[1];
        JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count"));
        JavaRDD<String> lines = sparkContext.textFile(inputPath);
        JavaRDD<String> words = lines.flatMap(
            (String line) -> Arrays.asList(line.split(" ")).iterator()
        );
        JavaPairRDD<String, Integer> wordCounts = words.mapToPair(
            (String word) -> new Tuple2<>(word, 1)
        ).reduceByKey(
            (Integer count1, Integer count2) -> count1 + count2
        );
        wordCounts.saveAsTextFile(outputPath);
      }
    }
  3. Criar o pacote.
    mvn clean package
    
    Se a build for bem-sucedida, um target/word-count-1.0.jar será criado.
  4. Prepare o pacote para o Cloud Storage.
    gcloud storage cp target/word-count-1.0.jar \
        gs://${BUCKET_NAME}/java/word-count-1.0.jar
    

Scala

  1. Copie o arquivo build.sbt para sua máquina local. O arquivo build.sbt a seguir especifica as dependências da biblioteca Scala e Spark, que recebem um escopo provided para indicar que o cluster do Dataproc fornecerá essas bibliotecas no ambiente de execução. O arquivo build.sbt não especifica uma dependência do Cloud Storage porque o conector implementa a interface HDFS padrão. Quando um job do Spark acessa arquivos de cluster do Cloud Storage (arquivos com URIs que começam com gs:// ), o sistema usa automaticamente o conector do Cloud Storage para acessar os arquivos no Cloud Storage
    scalaVersion := "Scala version, for example, 2.11.8"
    
    name := "word-count"
    organization := "dataproc.codelab"
    version := "1.0"
    
    libraryDependencies ++= Seq(
      "org.scala-lang" % "scala-library" % scalaVersion.value % "provided",
      "org.apache.spark" %% "spark-core" % "Spark version, for example, 2.3.1" % "provided"
    )
  2. Copie word-count.scala para sua máquina local. Ele é um job do Spark em Java que lê arquivos de texto do Cloud Storage, faz a contagem de palavras e grava os resultados em um arquivo de texto no Cloud Storage.
    package dataproc.codelab
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    object WordCount {
      def main(args: Array[String]) {
        if (args.length != 2) {
          throw new IllegalArgumentException(
              "Exactly 2 arguments are required: <inputPath> <outputPath>")
        }
    
        val inputPath = args(0)
        val outputPath = args(1)
    
        val sc = new SparkContext(new SparkConf().setAppName("Word Count"))
        val lines = sc.textFile(inputPath)
        val words = lines.flatMap(line => line.split(" "))
        val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
        wordCounts.saveAsTextFile(outputPath)
      }
    }
  3. Criar o pacote.
    sbt clean package
    
    Se a build for bem-sucedida, um target/scala-2.11/word-count_2.11-1.0.jar será criado.
  4. Prepare o pacote para o Cloud Storage.
    gcloud storage cp target/scala-2.11/word-count_2.11-1.0.jar \
        gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
    

Python

  1. Copie word-count.py para sua máquina local. Ele é um job do Spark em Python usando PySpark que lê arquivos de texto do Cloud Storage, faz a contagem de palavras e grava os resultados em um arquivo de texto no Cloud Storage.
    #!/usr/bin/env python
    
    import pyspark
    import sys
    
    if len(sys.argv) != 3:
      raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>")
    
    inputUri=sys.argv[1]
    outputUri=sys.argv[2]
    
    sc = pyspark.SparkContext()
    lines = sc.textFile(sys.argv[1])
    words = lines.flatMap(lambda line: line.split())
    wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2)
    wordCounts.saveAsTextFile(sys.argv[2])

Enviar o job

Execute o comando gcloud a seguir para enviar o job de contagem de palavras ao cluster do Dataproc.

Java

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Scala

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Python

gcloud dataproc jobs submit pyspark word-count.py \
    --cluster=${CLUSTER} \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Veja o resultado

Após a conclusão do job, execute o seguinte comando da CLI gcloud para conferir a saída de contagem de palavras.

gcloud storage cat gs://${BUCKET_NAME}/output/*

O resultado da contagem de palavras deve ser semelhante a este:

(a,2)
(call,1)
(What's,1)
(sweet.,1)
(we,1)
(as,1)
(name?,1)
(any,1)
(other,1)
(rose,1)
(smell,1)
(name,1)
(would,1)
(in,1)
(which,1)
(That,1)
(By,1)

Limpar

Depois de concluir o tutorial, você pode limpar os recursos que criou para que eles parem de usar a cota e gerar cobranças. Nas seções a seguir, você aprenderá a excluir e desativar esses recursos.

Exclua o projeto

O jeito mais fácil de evitar cobranças é excluindo o projeto que você criou para o tutorial.

Para excluir o projeto:

  1. No console Google Cloud , acesse a página Gerenciar recursos.

    Acessar "Gerenciar recursos"

  2. Na lista de projetos, selecione o projeto que você quer excluir e clique em Excluir .
  3. Na caixa de diálogo, digite o ID do projeto e clique em Encerrar para excluí-lo.

Exclua o cluster do Dataproc

Em vez de excluir o projeto, convém excluir o cluster dentro do projeto.

Excluir o bucket do Cloud Storage

Console doGoogle Cloud

  1. No console do Google Cloud , acesse a página Buckets do Cloud Storage.

    Acessar buckets

  2. Clique na caixa de seleção do bucket que você quer excluir.
  3. Para excluir o bucket, clique em Excluir e siga as instruções.

Linha de comando

    Exclua o bucket:
    gcloud storage buckets delete BUCKET_NAME

A seguir