Use o spark-bigquery-connector
com o Apache Spark
para ler e escrever dados de e para o BigQuery.
Este tutorial demonstra uma aplicação PySpark que usa o
spark-bigquery-connector
.
Use o conetor do BigQuery com a sua carga de trabalho
Consulte o artigo Versões de lançamento do tempo de execução sem servidor para o Apache Spark para determinar a versão do conetor do BigQuery que está instalada na versão do tempo de execução da carga de trabalho em lote. Se o conector não for apresentado, consulte a secção seguinte para ver instruções sobre como disponibilizar o conector às aplicações.
Como usar o conetor com a versão 2.0 do tempo de execução do Spark
O conetor do BigQuery não está instalado na versão 2.0 do tempo de execução do Spark. Quando usa a versão 2.0 do tempo de execução do Spark, pode disponibilizar o conector à sua aplicação de uma das seguintes formas:
- Use o parâmetro
jars
para apontar para um ficheiro JAR do conetor quando enviar a sua Google Cloud carga de trabalho em lote sem servidor para o Apache Spark Google Cloud . O exemplo seguinte especifica um ficheiro JAR do conetor (consulte o repositório GoogleCloudDataproc/spark-bigquery-connector no GitHub para ver uma lista dos ficheiros JAR do conetor disponíveis).- Exemplo da CLI do Google Cloud:
gcloud dataproc batches submit pyspark \ --region=region \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-version.jar \ ... other args
- Exemplo da CLI do Google Cloud:
- Inclua o ficheiro JAR do conetor na sua aplicação Spark como uma dependência (consulte Compilar em função do conetor)
Calcule os custos
Este tutorial usa componentes faturáveis do Google Cloud, incluindo:
- Serverless para Apache Spark
- BigQuery
- Cloud Storage
Use a calculadora de preços para gerar uma estimativa de custo com base na sua utilização prevista.
BigQuery I/O
Este exemplo lê dados do BigQuery num DataFrame do Spark para fazer uma contagem de palavras através da API de origem de dados padrão.
O conector escreve a saída da contagem de palavras no BigQuery da seguinte forma:
Armazenar os dados em buffer em ficheiros temporários no seu contentor do Cloud Storage
Copiar os dados numa operação do seu contentor do Cloud Storage para o BigQuery
Eliminar os ficheiros temporários no armazenamento na nuvem após a conclusão da operação de carregamento do BigQuery (os ficheiros temporários também são eliminados após o encerramento da aplicação Spark). Se a eliminação falhar, tem de eliminar todos os ficheiros temporários indesejados do Cloud Storage, que normalmente são colocados em
gs://YOUR_BUCKET/.spark-bigquery-JOB_ID-UUID
.
Configure a faturação
Por predefinição, o projeto associado às credenciais ou à conta de serviço é faturado pela utilização da API. Para faturar um projeto diferente, defina a seguinte configuração: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")
.
Também pode adicionar a uma operação de leitura ou escrita, da seguinte forma:
.option("parentProject", "<BILLED-GCP-PROJECT>")
.
Envie uma carga de trabalho em lote de contagem de palavras do PySpark
Execute um fluxo de trabalho em lote do Spark que conte o número de palavras num conjunto de dados público.
- Abra um terminal local ou o Cloud Shell
- Crie o
wordcount_dataset
com a ferramenta de linhas de comando bq num terminal local ou no Cloud Shell.bq mk wordcount_dataset
- Crie um contentor do Cloud Storage com a CLI do Google Cloud.
Substituagcloud storage buckets create gs://YOUR_BUCKET
YOUR_BUCKET
pelo nome do contentor do Cloud Storage que criou. - Crie o ficheiro
wordcount.py
localmente num editor de texto copiando o seguinte código PySpark.#!/usr/bin/python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "YOUR_BUCKET" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .option('table', 'bigquery-public-data:samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Saving the data to BigQuery word_count.write.format('bigquery') \ .option('table', 'wordcount_dataset.wordcount_output') \ .save()
- Envie a carga de trabalho em lote do PySpark:
Exemplo de saída do terminal:gcloud dataproc batches submit pyspark wordcount.py \ --region=REGION \ --deps-bucket=YOUR_BUCKET
... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Para pré-visualizar a tabela de saída na Google Cloud consola, abra a página do seu projeto no BigQuery , selecione a tabelawordcount_output
e, de seguida, clique em Pré-visualizar.
Para obter mais informações
- BigQuery Storage e Spark SQL – Python
- Criar um ficheiro de definição de tabela para uma origem de dados externa
- Use dados particionados externamente