O conetor do BigQuery para Apache Spark permite que os cientistas de dados combinem o poder do motor SQL perfeitamente escalável do BigQuery com as capacidades de aprendizagem automática do Apache Spark. Neste tutorial, mostramos como usar o Dataproc, o BigQuery e o Apache Spark ML para realizar a aprendizagem automática num conjunto de dados.
Crie um subconjunto de dados de natalidade do BigQuery
Nesta secção, cria um conjunto de dados no seu projeto e, em seguida, cria uma tabela no conjunto de dados para o qual copia um subconjunto de dados da taxa de natalidade do conjunto de dados do BigQuery natality disponível publicamente. Mais adiante neste tutorial, vai usar os dados do subconjunto nesta tabela para prever o peso ao nascer como uma função da idade materna, da idade paterna e das semanas de gestação.
Pode criar o subconjunto de dados através da Google Cloud consola ou executando um script Python no seu computador local.
Consola
Crie um conjunto de dados no seu projeto.
- Aceda à IU Web do BigQuery.
- No painel de navegação do lado esquerdo, clique no nome do projeto e, de seguida, em CRIAR CONJUNTO DE DADOS.
- Na caixa de diálogo Criar conjunto de dados:
- Para ID do conjunto de dados, introduza "natality_regression".
- Para Localização de dados, pode escolher uma
localização
para o conjunto de dados. A localização do valor predefinido é
US multi-region
. Após a criação de um conjunto de dados, não é possível alterar a localização. - Em Validade predefinida da tabela, escolha uma das seguintes opções:
- Nunca (predefinição): tem de eliminar a tabela manualmente.
- Número de dias: a tabela é eliminada após o número de dias especificado a partir da hora de criação.
- Para Encriptação, escolha uma das seguintes opções:
- Google-owned and Google-managed encryption key (predefinição).
- Chave gerida pelo cliente: consulte o artigo Proteger dados com chaves do Cloud KMS.
- Clique em Criar conjunto de dados.
Execute uma consulta no conjunto de dados público de natalidade e, em seguida, guarde os resultados da consulta numa nova tabela no seu conjunto de dados.
- Copie e cole a seguinte consulta no editor de consultas e, de seguida,
clique em Executar.
CREATE OR REPLACE TABLE natality_regression.regression_input as SELECT weight_pounds, mother_age, father_age, gestation_weeks, weight_gain_pounds, apgar_5min FROM `bigquery-public-data.samples.natality` WHERE weight_pounds IS NOT NULL AND mother_age IS NOT NULL AND father_age IS NOT NULL AND gestation_weeks IS NOT NULL AND weight_gain_pounds IS NOT NULL AND apgar_5min IS NOT NULL
- Após a conclusão da consulta (em aproximadamente um minuto), os resultados são guardados como a tabela do BigQuery "regression_input" no conjunto de dados
natality_regression
no seu projeto.
- Copie e cole a seguinte consulta no editor de consultas e, de seguida,
clique em Executar.
Python
Antes de experimentar este exemplo, siga as Pythoninstruções de configuração no início rápido do Dataproc com as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Python Dataproc.
Para se autenticar no Dataproc, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.
Consulte o artigo Configurar um ambiente de desenvolvimento Python para ver instruções sobre a instalação do Python e da biblioteca cliente Google Cloud para Python (necessária para executar o código). Recomendamos a instalação e a utilização de um Python
virtualenv
.Copie e cole o código
natality_tutorial.py
abaixo numa shellpython
na sua máquina local. Prima a tecla<return>
na shell para executar o código para criar um conjunto de dados do BigQuery "natality_regression" no seu projetoGoogle Cloud predefinido com uma tabela "regression_input" preenchida com um subconjunto dos dadosnatality
públicos.Confirme a criação do conjunto de dados
natality_regression
e da tabelaregression_input
.
Execute uma regressão linear
Nesta secção, vai executar uma regressão linear do PySpark enviando
a tarefa para o serviço Dataproc através da Google Cloud consola
ou executando o comando gcloud
a partir de um terminal local.
Consola
Copie e cole o seguinte código num novo
natality_sparkml.py
ficheiro no seu computador local."""Run a linear regression using Apache Spark ML. In the following PySpark (Spark Python API) code, we take the following actions: * Load a previously created linear regression (BigQuery) input table into our Cloud Dataproc Spark cluster as an RDD (Resilient Distributed Dataset) * Transform the RDD into a Spark Dataframe * Vectorize the features on which the model will be trained * Compute a linear regression using Spark ML """ from pyspark.context import SparkContext from pyspark.ml.linalg import Vectors from pyspark.ml.regression import LinearRegression from pyspark.sql.session import SparkSession # The imports, above, allow us to access SparkML features specific to linear # regression as well as the Vectors types. # Define a function that collects the features of interest # (mother_age, father_age, and gestation_weeks) into a vector. # Package the vector in a tuple containing the label (`weight_pounds`) for that # row. def vector_from_inputs(r): return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]), float(r["father_age"]), float(r["gestation_weeks"]), float(r["weight_gain_pounds"]), float(r["apgar_5min"]))) sc = SparkContext() spark = SparkSession(sc) # Read the data from BigQuery as a Spark Dataframe. natality_data = spark.read.format("bigquery").option( "table", "natality_regression.regression_input").load() # Create a view so that Spark SQL queries can be run against the data. natality_data.createOrReplaceTempView("natality") # As a precaution, run a query in Spark SQL to ensure no NULL values exist. sql_query = """ SELECT * from natality where weight_pounds is not null and mother_age is not null and father_age is not null and gestation_weeks is not null """ clean_data = spark.sql(sql_query) # Create an input DataFrame for Spark ML using the above function. training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label", "features"]) training_data.cache() # Construct a new LinearRegression object and fit the training data. lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal") model = lr.fit(training_data) # Print the model summary. print("Coefficients:" + str(model.coefficients)) print("Intercept:" + str(model.intercept)) print("R^2:" + str(model.summary.r2)) model.summary.residuals.show()
Copie o ficheiro
natality_sparkml.py
local para um contentor do Cloud Storage no seu projeto.gcloud storage cp natality_sparkml.py gs://bucket-name
Execute a regressão a partir da página Enviar uma tarefa do Dataproc.
No campo Ficheiro Python principal, insira o URI do contentor do Cloud Storage onde se encontra a sua cópia do ficheiro
natality_sparkml.py
.gs://
Selecione
PySpark
como o Tipo de serviço.Insira
gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar
no campo Ficheiros JAR. Isto torna o spark-bigquery-connector disponível para a aplicação PySpark no momento da execução, o que lhe permite ler dados do BigQuery num DataFrame do Spark.Preencha os campos ID da tarefa, Região e Cluster.
Clique em Enviar para executar a tarefa no seu cluster.
Quando a tarefa estiver concluída, o resumo do modelo de saída de regressão linear é apresentado na janela de detalhes da tarefa do Dataproc.

gcloud
Copie e cole o seguinte código num novo
natality_sparkml.py
ficheiro no seu computador local."""Run a linear regression using Apache Spark ML. In the following PySpark (Spark Python API) code, we take the following actions: * Load a previously created linear regression (BigQuery) input table into our Cloud Dataproc Spark cluster as an RDD (Resilient Distributed Dataset) * Transform the RDD into a Spark Dataframe * Vectorize the features on which the model will be trained * Compute a linear regression using Spark ML """ from pyspark.context import SparkContext from pyspark.ml.linalg import Vectors from pyspark.ml.regression import LinearRegression from pyspark.sql.session import SparkSession # The imports, above, allow us to access SparkML features specific to linear # regression as well as the Vectors types. # Define a function that collects the features of interest # (mother_age, father_age, and gestation_weeks) into a vector. # Package the vector in a tuple containing the label (`weight_pounds`) for that # row. def vector_from_inputs(r): return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]), float(r["father_age"]), float(r["gestation_weeks"]), float(r["weight_gain_pounds"]), float(r["apgar_5min"]))) sc = SparkContext() spark = SparkSession(sc) # Read the data from BigQuery as a Spark Dataframe. natality_data = spark.read.format("bigquery").option( "table", "natality_regression.regression_input").load() # Create a view so that Spark SQL queries can be run against the data. natality_data.createOrReplaceTempView("natality") # As a precaution, run a query in Spark SQL to ensure no NULL values exist. sql_query = """ SELECT * from natality where weight_pounds is not null and mother_age is not null and father_age is not null and gestation_weeks is not null """ clean_data = spark.sql(sql_query) # Create an input DataFrame for Spark ML using the above function. training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label", "features"]) training_data.cache() # Construct a new LinearRegression object and fit the training data. lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal") model = lr.fit(training_data) # Print the model summary. print("Coefficients:" + str(model.coefficients)) print("Intercept:" + str(model.intercept)) print("R^2:" + str(model.summary.r2)) model.summary.residuals.show()
Copie o ficheiro
natality_sparkml.py
local para um contentor do Cloud Storage no seu projeto.gcloud storage cp natality_sparkml.py gs://bucket-name
Envie a tarefa do Pyspark para o serviço Dataproc executando o comando
gcloud
, apresentado abaixo, a partir de uma janela de terminal na sua máquina local.- O valor da flag --jars torna o spark-bigquery-connector disponível
para o trabalho do PySpark em tempo de execução para lhe permitir ler
dados do BigQuery num DataFrame do Spark.
gcloud dataproc jobs submit pyspark \ gs://your-bucket/natality_sparkml.py \ --cluster=cluster-name \ --region=region \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
- O valor da flag --jars torna o spark-bigquery-connector disponível
para o trabalho do PySpark em tempo de execução para lhe permitir ler
dados do BigQuery num DataFrame do Spark.
O resultado da regressão linear (resumo do modelo) é apresentado na janela de terminal quando a tarefa é concluída.
<<< # Imprima o resumo do modelo. ... print "Coefficients:" + str(model.coefficients) Coefficients:[0.0166657454602,-0.00296751984046,0.235714392936,0.00213002070133,-0.00048577251587] <<< print "Intercept:" + str(model.intercept) Intercept:-2.26130330748 <<< print "R^2:" + str(model.summary.r2) R^2:0.295200579035 <<< model.summary.residuals.show() +--------------------+ | residuals| +--------------------+ | -0.7234737533344147| | -0.985466980630501| | -0.6669710598385468| | 1.4162434829714794| |-0.09373154375186754| |-0.15461747949235072| | 0.32659061654192545| | 1.5053877697929803| | -0.640142797263989| | 1.229530260294963| |-0.03776160295256...| | -0.5160734239126814| | -1.5165972740062887| | 1.3269085258245008| | 1.7604670124710626| | 1.2348130901905972| | 2.318660276655887| | 1.0936947030883175| | 1.0169768511417363| | -1.7744915698181583| +--------------------+ only showing top 20 rows.