Use o conetor do BigQuery com o Google Cloud sem servidor para o Apache Spark

Use o spark-bigquery-connector com o Apache Spark para ler e escrever dados de e para o BigQuery. Este tutorial demonstra uma aplicação PySpark que usa o spark-bigquery-connector.

Use o conetor do BigQuery com a sua carga de trabalho

Consulte o artigo Versões de lançamento do tempo de execução sem servidor para o Apache Spark para determinar a versão do conetor do BigQuery que está instalada na versão do tempo de execução da carga de trabalho em lote. Se o conector não for apresentado, consulte a secção seguinte para ver instruções sobre como disponibilizar o conector às aplicações.

Como usar o conetor com a versão 2.0 do tempo de execução do Spark

O conetor do BigQuery não está instalado na versão 2.0 do tempo de execução do Spark. Quando usa a versão 2.0 do tempo de execução do Spark, pode disponibilizar o conector à sua aplicação de uma das seguintes formas:

  • Use o parâmetro jars para apontar para um ficheiro JAR do conetor quando enviar a sua Google Cloud carga de trabalho em lote sem servidor para o Apache Spark Google Cloud . O exemplo seguinte especifica um ficheiro JAR do conetor (consulte o repositório GoogleCloudDataproc/spark-bigquery-connector no GitHub para ver uma lista dos ficheiros JAR do conetor disponíveis).
    • Exemplo da CLI do Google Cloud:
      gcloud dataproc batches submit pyspark \
          --region=region \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-version.jar \
          ... other args
      
  • Inclua o ficheiro JAR do conetor na sua aplicação Spark como uma dependência (consulte Compilar em função do conetor)

Calcule os custos

Este tutorial usa componentes faturáveis do Google Cloud, incluindo:

  • Serverless para Apache Spark
  • BigQuery
  • Cloud Storage

Use a calculadora de preços para gerar uma estimativa de custo com base na sua utilização prevista.

Os novos utilizadores da Cloud Platform podem ser elegíveis para uma avaliação gratuita.

BigQuery I/O

Este exemplo lê dados do BigQuery num DataFrame do Spark para fazer uma contagem de palavras através da API de origem de dados padrão.

O conector escreve a saída da contagem de palavras no BigQuery da seguinte forma:

  1. Armazenar os dados em buffer em ficheiros temporários no seu contentor do Cloud Storage

  2. Copiar os dados numa operação do seu contentor do Cloud Storage para o BigQuery

  3. Eliminar os ficheiros temporários no armazenamento na nuvem após a conclusão da operação de carregamento do BigQuery (os ficheiros temporários também são eliminados após o encerramento da aplicação Spark). Se a eliminação falhar, tem de eliminar todos os ficheiros temporários indesejados do Cloud Storage, que normalmente são colocados em gs://YOUR_BUCKET/.spark-bigquery-JOB_ID-UUID.

Configure a faturação

Por predefinição, o projeto associado às credenciais ou à conta de serviço é faturado pela utilização da API. Para faturar um projeto diferente, defina a seguinte configuração: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").

Também pode adicionar a uma operação de leitura ou escrita, da seguinte forma: .option("parentProject", "<BILLED-GCP-PROJECT>").

Envie uma carga de trabalho em lote de contagem de palavras do PySpark

Execute um fluxo de trabalho em lote do Spark que conte o número de palavras num conjunto de dados público.

  1. Abra um terminal local ou o Cloud Shell
  2. Crie o wordcount_dataset com a ferramenta de linhas de comando bq num terminal local ou no Cloud Shell.
    bq mk wordcount_dataset
    
  3. Crie um contentor do Cloud Storage com a CLI do Google Cloud.
    gcloud storage buckets create gs://YOUR_BUCKET
    
    Substitua YOUR_BUCKET pelo nome do contentor do Cloud Storage que criou.
  4. Crie o ficheiro wordcount.py localmente num editor de texto copiando o seguinte código PySpark.
    #!/usr/bin/python
    """BigQuery I/O PySpark example."""
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "YOUR_BUCKET"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Saving the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
  5. Envie a carga de trabalho em lote do PySpark:
    gcloud dataproc batches submit pyspark wordcount.py \
        --region=REGION \
        --deps-bucket=YOUR_BUCKET
    
    Exemplo de saída do terminal:
    ...
    +---------+----------+
    |     word|word_count|
    +---------+----------+
    |     XVII|         2|
    |    spoil|        28|
    |    Drink|         7|
    |forgetful|         5|
    |   Cannot|        46|
    |    cures|        10|
    |   harder|        13|
    |  tresses|         3|
    |      few|        62|
    |  steel'd|         5|
    | tripping|         7|
    |   travel|        35|
    |   ransom|        55|
    |     hope|       366|
    |       By|       816|
    |     some|      1169|
    |    those|       508|
    |    still|       567|
    |      art|       893|
    |    feign|        10|
    +---------+----------+
    only showing top 20 rows
    
    root
     |-- word: string (nullable = false)
     |-- word_count: long (nullable = true)
    

    Para pré-visualizar a tabela de saída na Google Cloud consola, abra a página do seu projeto no BigQuery , selecione a tabela wordcount_output e, de seguida, clique em Pré-visualizar.

Para obter mais informações