Use o conetor do BigQuery do Spark

O spark-bigquery-connector é usado com o Apache Spark para ler e escrever dados de e para o BigQuery. O conetor tira partido da API BigQuery Storage ao ler dados do BigQuery.

Este tutorial fornece informações sobre a disponibilidade do conetor pré-instalado e mostra como disponibilizar uma versão específica do conetor para tarefas do Spark. O código de exemplo mostra como usar o conetor do BigQuery do Spark numa aplicação do Spark.

Use o conetor pré-instalado

O conetor do BigQuery do Spark está pré-instalado e disponível para tarefas do Spark executadas em clusters do Dataproc criados com versões de imagens 2.1 e posteriores. A versão do conetor pré-instalada é apresentada na página de lançamento de cada versão de imagem. Por exemplo, a linha BigQuery Connector na página 2.2.x image release versions mostra a versão do conetor que está instalada nas versões de imagens 2.2 mais recentes.

Disponibilize uma versão específica do conetor para tarefas do Spark

Se quiser usar uma versão do conetor diferente de uma versão pré-instalada num cluster de versão de imagem 2.1 ou posterior, ou se quiser instalar o conetor num cluster de versão de imagem pré-2.1, siga as instruções nesta secção.

Importante: a versão spark-bigquery-connector tem de ser compatível com a versão da imagem do cluster do Dataproc. Consulte a matriz de compatibilidade de imagens do conector para o Dataproc.

2.1 e clusters de versões de imagens posteriores

Quando cria um cluster do Dataproc com uma versão da imagem 2.1 ou posterior, especifique a versão do conetor como metadados do cluster.

Exemplo da CLI gcloud:

gcloud dataproc clusters create CLUSTER_NAME \
    --region=REGION \
    --image-version=2.2 \
    --metadata=SPARK_BQ_CONNECTOR_VERSION or SPARK_BQ_CONNECTOR_URL\
    other flags

Notas:

  • SPARK_BQ_CONNECTOR_VERSION: especifique uma versão do conetor. As versões do conetor do BigQuery do Spark estão listadas na página spark-bigquery-connector/releases no GitHub.

    Exemplo:

    --metadata=SPARK_BQ_CONNECTOR_VERSION=0.42.1
    
  • SPARK_BQ_CONNECTOR_URL: especifique um URL que aponte para o JAR no Cloud Storage. Pode especificar o URL de um conetor indicado na coluna link em Transferir e usar o conetor no GitHub ou o caminho para uma localização do Cloud Storage onde tenha colocado um JAR de conetor personalizado.

    Exemplos:

    --metadata=SPARK_BQ_CONNECTOR_URL=gs://spark-lib/bigquery/spark-3.5-bigquery-0.42.1.jar
    --metadata=SPARK_BQ_CONNECTOR_URL=gs://PATH_TO_CUSTOM_JAR
    

2.0 e clusters de versões de imagens anteriores

Pode disponibilizar o conetor do BigQuery do Spark à sua aplicação de uma das seguintes formas:

  1. Instale o spark-bigquery-connector no diretório de ficheiros JAR do Spark de cada nó através da ação de inicialização dos conetores do Dataproc quando criar o cluster.

  2. Forneça o URL do JAR do conector quando enviar a tarefa para o cluster através da consola, da CLI gcloud ou da API Dataproc. Google Cloud

    Consola

    Use o item Ficheiros JARs da tarefa Spark na página Enviar uma tarefa do Dataproc.

    gcloud

    Use a flag gcloud dataproc jobs submit spark --jars.

    API

    Use o campo SparkJob.jarFileUris.

    Como especificar o JAR do conetor ao executar tarefas do Spark em clusters com a versão de imagem anterior à 2.0

    • Especifique o JAR do conetor substituindo as informações da versão do conetor e do Scala na seguinte string do URI:
      gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
      
    • Use o Scala 2.12 com as versões de imagens do Dataproc 1.5+
      gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
      
      Exemplo da CLI gcloud:
      gcloud dataproc jobs submit spark \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \
          -- job args
      
    • Use o Scala 2.11 com as versões de imagem do Dataproc 1.4 e anteriores:
      gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
      
      Exemplo da CLI gcloud:
      gcloud dataproc jobs submit spark \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \
          -- job-args
      
  3. Inclua o JAR do conetor na sua aplicação Scala ou Java Spark como uma dependência (consulte Compilar em função do conetor).

Calcule os custos

Neste documento, usa os seguintes componentes faturáveis do Google Cloud:

  • Dataproc
  • BigQuery
  • Cloud Storage

Para gerar uma estimativa de custos com base na sua utilização projetada, use a calculadora de preços.

Os novos Google Cloud utilizadores podem ser elegíveis para uma avaliação gratuita.

Ler e escrever dados do e para o BigQuery

Este exemplo lê dados do BigQuery num DataFrame do Spark para fazer uma contagem de palavras através da API de origem de dados padrão.

O conetor escreve os dados no BigQuery ao armazenar primeiro todos os dados numa tabela temporária do Cloud Storage. Em seguida, copia todos os dados para o BigQuery numa única operação. O conector tenta eliminar os ficheiros temporários assim que a operação de carregamento do BigQuery for bem-sucedida e novamente quando a aplicação Spark terminar. Se a tarefa falhar, remova todos os ficheiros temporários restantes do Google Cloud Storage. Normalmente, os ficheiros temporários do BigQuery estão localizados em gs://[bucket]/.spark-bigquery-[jobid]-[UUID].

Configure a faturação

Por predefinição, o projeto associado às credenciais ou à conta de serviço é faturado pela utilização da API. Para faturar um projeto diferente, defina a seguinte configuração: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").

Também pode ser adicionado a uma operação de leitura ou escrita, da seguinte forma: .option("parentProject", "<BILLED-GCP-PROJECT>").

Executar o código

Antes de executar este exemplo, crie um conjunto de dados denominado "wordcount_dataset" ou altere o conjunto de dados de saída no código para um conjunto de dados do BigQuery existente no seu projetoGoogle Cloud .

Use o comando bq para criar o wordcount_dataset:

bq mk wordcount_dataset

Use o comando da CLI do Google Cloud para criar um contentor do Cloud Storage, que vai ser usado para exportar para o BigQuery:

gcloud storage buckets create gs://[bucket]

Scala

  1. Examine o código e substitua o marcador de posição [bucket] pelo contentor do Cloud Storage que criou anteriormente.
    /*
     * Remove comment if you are not running in spark-shell.
     *
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder()
      .appName("spark-bigquery-demo")
      .getOrCreate()
    */
    
    // Use the Cloud Storage bucket for temporary BigQuery export data used
    // by the connector.
    val bucket = "[bucket]"
    spark.conf.set("temporaryGcsBucket", bucket)
    
    // Load data in from BigQuery. See
    // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties
    // for option information.
    val wordsDF =
      spark.read.bigquery("bigquery-public-data:samples.shakespeare")
      .cache()
    
    wordsDF.createOrReplaceTempView("words")
    
    // Perform word count.
    val wordCountDF = spark.sql(
      "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word")
    wordCountDF.show()
    wordCountDF.printSchema()
    
    // Saving the data to BigQuery.
    (wordCountDF.write.format("bigquery")
      .save("wordcount_dataset.wordcount_output"))
  2. Execute o código no seu cluster
    1. Use o SSH para se ligar ao nó principal do cluster do Dataproc
      1. Aceda à página Clusters do Dataproc na Google Cloud consola e, de seguida, clique no nome do cluster Página de clusters do Dataproc na Cloud Console.
      2. Na página >Detalhes do cluster, selecione o separador Instâncias de VM. Em seguida, clique em SSH à direita do nome do nó principal do cluster> Página de detalhes do cluster do Dataproc na Cloud Console.
        É aberta uma janela do navegador no seu diretório base no nó principal
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Crie wordcount.scala com o editor de texto vi, vim ou nano pré-instalado e, em seguida, cole o código Scala do exemplo de código Scala
      nano wordcount.scala
        
    3. Inicie o spark-shell REPL.
      $ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
      ...
      Using Scala version ...
      Type in expressions to have them evaluated.
      Type :help for more information.
      ...
      Spark context available as sc.
      ...
      SQL context available as sqlContext.
      scala>
      
    4. Execute o wordcount.scala com o comando :load wordcount.scala para criar a tabela wordcount_output do BigQuery. A saída da listagem apresenta 20 linhas da saída da contagem de palavras.
      :load wordcount.scala
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      Para pré-visualizar a tabela de saída, abra a página BigQuery , selecione a tabela wordcount_output e, de seguida, clique em Pré-visualizar. Pré-visualize a tabela na página do BigQuery Explorer na Cloud Console.

PySpark

  1. Examine o código e substitua o marcador de posição [bucket] pelo contentor do Cloud Storage que criou anteriormente.
    #!/usr/bin/env python
    
    """BigQuery I/O PySpark example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "[bucket]"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .load('bigquery-public-data:samples.shakespeare') \
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Save the data to BigQuery
    word_count.write.format('bigquery') \
      .save('wordcount_dataset.wordcount_output')
  2. Execute o código no cluster
    1. Use o SSH para se ligar ao nó principal do cluster do Dataproc
      1. Aceda à página Clusters do Dataproc na Google Cloud consola e, de seguida, clique no nome do cluster Página Clusters na Cloud Console.
      2. Na página Detalhes do cluster, selecione o separador Instâncias de VM. Em seguida, clique em SSH à direita do nome do nó principal do cluster Selecione SSH na linha do nome do cluster na página de detalhes do cluster na Cloud Console.
        É aberta uma janela do navegador no seu diretório base no nó principal
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Crie wordcount.py com o editor de texto vi, vim ou nano pré-instalado e, em seguida, cole o código PySpark da Ficha de código PySpark
      nano wordcount.py
      
    3. Execute o wordcount com spark-submit para criar a tabela do BigQuery wordcount_output. A listagem de saída apresenta 20 linhas da saída de contagem de palavras.
      spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      Para pré-visualizar a tabela de saída, abra a página BigQuery , selecione a tabela wordcount_output e, de seguida, clique em Pré-visualizar. Pré-visualize a tabela na página do BigQuery Explorer na Cloud Console.

Sugestões de resolução de problemas

Pode examinar os registos de tarefas no Cloud Logging e no Jobs Explorer do BigQuery para resolver problemas de tarefas do Spark que usam o conetor do BigQuery.

  • Os registos do controlador do Dataproc contêm uma entrada BigQueryClient com metadados do BigQuery que incluem o jobId:

    ClassNotFoundException INFO BigQueryClient:.. jobId: JobId{project=PROJECT_ID, job=JOB_ID, location=LOCATION}
    
  • As tarefas do BigQuery contêm etiquetas Dataproc_job_id e Dataproc_job_uuid:

    • Registo:
      protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.labels.dataproc_job_id="JOB_ID"
      protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.labels.dataproc_job_uuid="JOB_UUID"
      protoPayload.serviceData.jobCompletedEvent.job.jobName.jobId="JOB_NAME"
      
    • Explorador de tarefas do BigQuery: clique num ID da tarefa para ver os detalhes da tarefa em Etiquetas em Informações da tarefa.

O que se segue?