O spark-bigquery-connector é usado com o Apache Spark para ler e escrever dados de e para o BigQuery. O conetor tira partido da API BigQuery Storage ao ler dados do BigQuery.
Este tutorial fornece informações sobre a disponibilidade do conetor pré-instalado e mostra como disponibilizar uma versão específica do conetor para tarefas do Spark. O código de exemplo mostra como usar o conetor do BigQuery do Spark numa aplicação do Spark.
Use o conetor pré-instalado
O conetor do BigQuery do Spark está pré-instalado e disponível para tarefas do Spark executadas em clusters do Dataproc criados com versões de imagens 2.1
e posteriores. A versão do conetor pré-instalada é apresentada na página de lançamento de cada versão de imagem. Por exemplo, a linha BigQuery Connector na página
2.2.x image release versions
mostra a versão do conetor que está instalada nas versões de imagens 2.2 mais recentes.
Disponibilize uma versão específica do conetor para tarefas do Spark
Se quiser usar uma versão do conetor diferente de uma versão pré-instalada num cluster de versão de imagem 2.1
ou posterior, ou se quiser instalar o conetor num cluster de versão de imagem pré-2.1
, siga as instruções nesta secção.
Importante: a versão spark-bigquery-connector
tem de ser compatível com a versão da imagem do cluster do Dataproc. Consulte a matriz de compatibilidade de imagens do conector para o Dataproc.
2.1
e clusters de versões de imagens posteriores
Quando cria um cluster do Dataproc
com uma versão da imagem 2.1
ou posterior, especifique a versão do conetor como metadados do cluster.
Exemplo da CLI gcloud:
gcloud dataproc clusters create CLUSTER_NAME \ --region=REGION \ --image-version=2.2 \ --metadata=SPARK_BQ_CONNECTOR_VERSION or SPARK_BQ_CONNECTOR_URL\ other flags
Notas:
SPARK_BQ_CONNECTOR_VERSION: especifique uma versão do conetor. As versões do conetor do BigQuery do Spark estão listadas na página spark-bigquery-connector/releases no GitHub.
Exemplo:
--metadata=SPARK_BQ_CONNECTOR_VERSION=0.42.1
SPARK_BQ_CONNECTOR_URL: especifique um URL que aponte para o JAR no Cloud Storage. Pode especificar o URL de um conetor indicado na coluna link em Transferir e usar o conetor no GitHub ou o caminho para uma localização do Cloud Storage onde tenha colocado um JAR de conetor personalizado.
Exemplos:
--metadata=SPARK_BQ_CONNECTOR_URL=gs://spark-lib/bigquery/spark-3.5-bigquery-0.42.1.jar --metadata=SPARK_BQ_CONNECTOR_URL=gs://PATH_TO_CUSTOM_JAR
2.0
e clusters de versões de imagens anteriores
Pode disponibilizar o conetor do BigQuery do Spark à sua aplicação de uma das seguintes formas:
Instale o spark-bigquery-connector no diretório de ficheiros JAR do Spark de cada nó através da ação de inicialização dos conetores do Dataproc quando criar o cluster.
Forneça o URL do JAR do conector quando enviar a tarefa para o cluster através da consola, da CLI gcloud ou da API Dataproc. Google Cloud
Consola
Use o item Ficheiros JARs da tarefa Spark na página Enviar uma tarefa do Dataproc.
gcloud
Use a flag
gcloud dataproc jobs submit spark --jars
.API
Use o campo
SparkJob.jarFileUris
.Como especificar o JAR do conetor ao executar tarefas do Spark em clusters com a versão de imagem anterior à 2.0
- Especifique o JAR do conetor substituindo as informações da versão do conetor e do Scala na seguinte string do URI:
gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
- Use o Scala
2.12
com as versões de imagens do Dataproc1.5+
Exemplo da CLI gcloud:gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \ -- job args
- Use o Scala
2.11
com as versões de imagem do Dataproc1.4
e anteriores: Exemplo da CLI gcloud:gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \ -- job-args
- Especifique o JAR do conetor substituindo as informações da versão do conetor e do Scala na seguinte string do URI:
Inclua o JAR do conetor na sua aplicação Scala ou Java Spark como uma dependência (consulte Compilar em função do conetor).
Calcule os custos
Neste documento, usa os seguintes componentes faturáveis do Google Cloud:
- Dataproc
- BigQuery
- Cloud Storage
Para gerar uma estimativa de custos com base na sua utilização projetada,
use a calculadora de preços.
Ler e escrever dados do e para o BigQuery
Este exemplo lê dados do BigQuery num DataFrame do Spark para fazer uma contagem de palavras através da API de origem de dados padrão.
O conetor escreve os dados no BigQuery ao
armazenar primeiro todos os dados numa tabela temporária do Cloud Storage. Em seguida, copia todos os dados para o BigQuery numa única operação. O conector tenta eliminar os ficheiros temporários assim que a operação de carregamento do BigQuery for bem-sucedida e novamente quando a aplicação Spark terminar.
Se a tarefa falhar, remova todos os ficheiros temporários restantes do
Google Cloud Storage. Normalmente, os ficheiros temporários do BigQuery
estão localizados em gs://[bucket]/.spark-bigquery-[jobid]-[UUID]
.
Configure a faturação
Por predefinição, o projeto associado às credenciais ou à conta de serviço é faturado pela utilização da API. Para faturar um projeto diferente, defina a seguinte configuração: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")
.
Também pode ser adicionado a uma operação de leitura ou escrita, da seguinte forma:
.option("parentProject", "<BILLED-GCP-PROJECT>")
.
Executar o código
Antes de executar este exemplo, crie um conjunto de dados denominado "wordcount_dataset" ou altere o conjunto de dados de saída no código para um conjunto de dados do BigQuery existente no seu projetoGoogle Cloud .
Use o comando bq para criar o wordcount_dataset
:
bq mk wordcount_dataset
Use o comando da CLI do Google Cloud para criar um contentor do Cloud Storage, que vai ser usado para exportar para o BigQuery:
gcloud storage buckets create gs://[bucket]
Scala
- Examine o código e substitua o marcador de posição [bucket] pelo contentor do Cloud Storage que criou anteriormente.
/* * Remove comment if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-bigquery-demo") .getOrCreate() */ // Use the Cloud Storage bucket for temporary BigQuery export data used // by the connector. val bucket = "[bucket]" spark.conf.set("temporaryGcsBucket", bucket) // Load data in from BigQuery. See // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties // for option information. val wordsDF = spark.read.bigquery("bigquery-public-data:samples.shakespeare") .cache() wordsDF.createOrReplaceTempView("words") // Perform word count. val wordCountDF = spark.sql( "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word") wordCountDF.show() wordCountDF.printSchema() // Saving the data to BigQuery. (wordCountDF.write.format("bigquery") .save("wordcount_dataset.wordcount_output"))
- Execute o código no seu cluster
- Use o SSH para se ligar ao nó principal do cluster do Dataproc
- Aceda à página
Clusters do Dataproc
na Google Cloud consola e, de seguida, clique no nome do cluster
- Na página >Detalhes do cluster, selecione o separador Instâncias de VM. Em seguida, clique em
SSH
à direita do nome do nó principal do cluster>
É aberta uma janela do navegador no seu diretório base no nó principalConnected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Aceda à página
Clusters do Dataproc
na Google Cloud consola e, de seguida, clique no nome do cluster
- Crie
wordcount.scala
com o editor de textovi
,vim
ounano
pré-instalado e, em seguida, cole o código Scala do exemplo de código Scalanano wordcount.scala
- Inicie o
spark-shell
REPL.$ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar ... Using Scala version ... Type in expressions to have them evaluated. Type :help for more information. ... Spark context available as sc. ... SQL context available as sqlContext. scala>
- Execute o wordcount.scala com o comando
:load wordcount.scala
para criar a tabelawordcount_output
do BigQuery. A saída da listagem apresenta 20 linhas da saída da contagem de palavras.:load wordcount.scala ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Para pré-visualizar a tabela de saída, abra a páginaBigQuery
, selecione a tabelawordcount_output
e, de seguida, clique em Pré-visualizar.
- Use o SSH para se ligar ao nó principal do cluster do Dataproc
PySpark
- Examine o código e substitua o marcador de posição [bucket] pelo contentor do Cloud Storage que criou anteriormente.
#!/usr/bin/env python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "[bucket]" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .load('bigquery-public-data:samples.shakespeare') \ words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Save the data to BigQuery word_count.write.format('bigquery') \ .save('wordcount_dataset.wordcount_output')
- Execute o código no cluster
- Use o SSH para se ligar ao nó principal do cluster do Dataproc
- Aceda à página
Clusters do Dataproc
na Google Cloud consola e, de seguida, clique no nome do cluster
- Na página Detalhes do cluster, selecione o separador Instâncias de VM. Em seguida, clique em
SSH
à direita do nome do nó principal do cluster
É aberta uma janela do navegador no seu diretório base no nó principalConnected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Aceda à página
Clusters do Dataproc
na Google Cloud consola e, de seguida, clique no nome do cluster
- Crie
wordcount.py
com o editor de textovi
,vim
ounano
pré-instalado e, em seguida, cole o código PySpark da Ficha de código PySparknano wordcount.py
- Execute o wordcount com
spark-submit
para criar a tabela do BigQuerywordcount_output
. A listagem de saída apresenta 20 linhas da saída de contagem de palavras.spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Para pré-visualizar a tabela de saída, abra a páginaBigQuery
, selecione a tabelawordcount_output
e, de seguida, clique em Pré-visualizar.
- Use o SSH para se ligar ao nó principal do cluster do Dataproc
Sugestões de resolução de problemas
Pode examinar os registos de tarefas no Cloud Logging e no Jobs Explorer do BigQuery para resolver problemas de tarefas do Spark que usam o conetor do BigQuery.
Os registos do controlador do Dataproc contêm uma entrada
BigQueryClient
com metadados do BigQuery que incluem ojobId
:ClassNotFoundException
INFO BigQueryClient:.. jobId: JobId{project=PROJECT_ID, job=JOB_ID, location=LOCATION} As tarefas do BigQuery contêm etiquetas
Dataproc_job_id
eDataproc_job_uuid
:- Registo:
protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.labels.dataproc_job_id="JOB_ID" protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.labels.dataproc_job_uuid="JOB_UUID" protoPayload.serviceData.jobCompletedEvent.job.jobName.jobId="JOB_NAME"
- Explorador de tarefas do BigQuery: clique num ID da tarefa para ver os detalhes da tarefa em Etiquetas em Informações da tarefa.
- Registo:
O que se segue?
- Consulte o artigo BigQuery Storage e Spark SQL – Python.
- Saiba como criar um ficheiro de definição de tabela para uma origem de dados externa.
- Saiba como consultar dados particionados externamente.
- Veja sugestões de otimização de tarefas do Spark.