Use o Dataproc, o BigQuery e o Apache Spark ML para aprendizagem automática

O conetor do BigQuery para Apache Spark permite que os cientistas de dados combinem o poder do motor SQL perfeitamente escalável do BigQuery com as capacidades de aprendizagem automática do Apache Spark. Neste tutorial, mostramos como usar o Dataproc, o BigQuery e o Apache Spark ML para realizar a aprendizagem automática num conjunto de dados.

Crie um subconjunto de dados de natalidade do BigQuery

Nesta secção, cria um conjunto de dados no seu projeto e, em seguida, cria uma tabela no conjunto de dados para o qual copia um subconjunto de dados da taxa de natalidade do conjunto de dados do BigQuery natality disponível publicamente. Mais adiante neste tutorial, vai usar os dados do subconjunto nesta tabela para prever o peso ao nascer como uma função da idade materna, da idade paterna e das semanas de gestação.

Pode criar o subconjunto de dados através da Google Cloud consola ou executando um script Python no seu computador local.

Consola

  1. Crie um conjunto de dados no seu projeto.

    1. Aceda à IU Web do BigQuery.
    2. No painel de navegação do lado esquerdo, clique no nome do projeto e, de seguida, em CRIAR CONJUNTO DE DADOS.
    3. Na caixa de diálogo Criar conjunto de dados:
      1. Para ID do conjunto de dados, introduza "natality_regression".
      2. Para Localização de dados, pode escolher uma localização para o conjunto de dados. A localização do valor predefinido é US multi-region. Após a criação de um conjunto de dados, não é possível alterar a localização.
      3. Em Validade predefinida da tabela, escolha uma das seguintes opções:
        • Nunca (predefinição): tem de eliminar a tabela manualmente.
        • Número de dias: a tabela é eliminada após o número de dias especificado a partir da hora de criação.
      4. Para Encriptação, escolha uma das seguintes opções:
      5. Clique em Criar conjunto de dados.
  2. Execute uma consulta no conjunto de dados público de natalidade e, em seguida, guarde os resultados da consulta numa nova tabela no seu conjunto de dados.

    1. Copie e cole a seguinte consulta no editor de consultas e, de seguida, clique em Executar.
      CREATE OR REPLACE TABLE natality_regression.regression_input as
      SELECT
      weight_pounds,
      mother_age,
      father_age,
      gestation_weeks,
      weight_gain_pounds,
      apgar_5min
      FROM
      `bigquery-public-data.samples.natality`
      WHERE
      weight_pounds IS NOT NULL
      AND mother_age IS NOT NULL
      AND father_age IS NOT NULL
      AND gestation_weeks IS NOT NULL
      AND weight_gain_pounds IS NOT NULL
      AND apgar_5min IS NOT NULL
      
    2. Após a conclusão da consulta (em aproximadamente um minuto), os resultados são guardados como a tabela do BigQuery "regression_input" no conjunto de dados natality_regression no seu projeto.

Python

Antes de experimentar este exemplo, siga as Pythoninstruções de configuração no início rápido do Dataproc com as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Python Dataproc.

Para se autenticar no Dataproc, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.

  1. Consulte o artigo Configurar um ambiente de desenvolvimento Python para ver instruções sobre a instalação do Python e da biblioteca cliente Google Cloud para Python (necessária para executar o código). Recomendamos a instalação e a utilização de um Python virtualenv.

  2. Copie e cole o código natality_tutorial.py abaixo numa shell python na sua máquina local. Prima a tecla <return> na shell para executar o código para criar um conjunto de dados do BigQuery "natality_regression" no seu projetoGoogle Cloud predefinido com uma tabela "regression_input" preenchida com um subconjunto dos dados natality públicos.

    """Create a Google BigQuery linear regression input table.
    
    In the code below, the following actions are taken:
    * A new dataset is created "natality_regression."
    * A query is run against the public dataset,
        bigquery-public-data.samples.natality, selecting only the data of
        interest to the regression, the output of which is stored in a new
        "regression_input" table.
    * The output table is moved over the wire to the user's default project via
        the built-in BigQuery Connector for Spark that bridges BigQuery and
        Cloud Dataproc.
    """
    
    from google.cloud import bigquery
    
    # Create a new Google BigQuery client using Google Cloud Platform project
    # defaults.
    client = bigquery.Client()
    
    # Prepare a reference to a new dataset for storing the query results.
    dataset_id = "natality_regression"
    dataset_id_full = f"{client.project}.{dataset_id}"
    
    dataset = bigquery.Dataset(dataset_id_full)
    
    # Create the new BigQuery dataset.
    dataset = client.create_dataset(dataset)
    
    # Configure the query job.
    job_config = bigquery.QueryJobConfig()
    
    # Set the destination table to where you want to store query results.
    # As of google-cloud-bigquery 1.11.0, a fully qualified table ID can be
    # used in place of a TableReference.
    job_config.destination = f"{dataset_id_full}.regression_input"
    
    # Set up a query in Standard SQL, which is the default for the BigQuery
    # Python client library.
    # The query selects the fields of interest.
    query = """
        SELECT
            weight_pounds, mother_age, father_age, gestation_weeks,
            weight_gain_pounds, apgar_5min
        FROM
            `bigquery-public-data.samples.natality`
        WHERE
            weight_pounds IS NOT NULL
            AND mother_age IS NOT NULL
            AND father_age IS NOT NULL
            AND gestation_weeks IS NOT NULL
            AND weight_gain_pounds IS NOT NULL
            AND apgar_5min IS NOT NULL
    """
    
    # Run the query.
    client.query_and_wait(query, job_config=job_config)  # Waits for the query to finish
  3. Confirme a criação do conjunto de dados natality_regression e da tabela regression_input.

Execute uma regressão linear

Nesta secção, vai executar uma regressão linear do PySpark enviando a tarefa para o serviço Dataproc através da Google Cloud consola ou executando o comando gcloud a partir de um terminal local.

Consola

  1. Copie e cole o seguinte código num novo natality_sparkml.py ficheiro no seu computador local.

    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    sc = SparkContext()
    spark = SparkSession(sc)
    
    # Read the data from BigQuery as a Spark Dataframe.
    natality_data = spark.read.format("bigquery").option(
        "table", "natality_regression.regression_input").load()
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print("Coefficients:" + str(model.coefficients))
    print("Intercept:" + str(model.intercept))
    print("R^2:" + str(model.summary.r2))
    model.summary.residuals.show()

  2. Copie o ficheiro natality_sparkml.py local para um contentor do Cloud Storage no seu projeto.

    gcloud storage cp natality_sparkml.py gs://bucket-name
    

  3. Execute a regressão a partir da página Enviar uma tarefa do Dataproc.

    1. No campo Ficheiro Python principal, insira o URI do contentor do Cloud Storage onde se encontra a sua cópia do ficheiro natality_sparkml.py.gs://

    2. Selecione PySpark como o Tipo de serviço.

    3. Insira gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar no campo Ficheiros JAR. Isto torna o spark-bigquery-connector disponível para a aplicação PySpark no momento da execução, o que lhe permite ler dados do BigQuery num DataFrame do Spark.

    4. Preencha os campos ID da tarefa, Região e Cluster.

    5. Clique em Enviar para executar a tarefa no seu cluster.

Quando a tarefa estiver concluída, o resumo do modelo de saída de regressão linear é apresentado na janela de detalhes da tarefa do Dataproc.

gcloud

  1. Copie e cole o seguinte código num novo natality_sparkml.py ficheiro no seu computador local.

    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    sc = SparkContext()
    spark = SparkSession(sc)
    
    # Read the data from BigQuery as a Spark Dataframe.
    natality_data = spark.read.format("bigquery").option(
        "table", "natality_regression.regression_input").load()
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print("Coefficients:" + str(model.coefficients))
    print("Intercept:" + str(model.intercept))
    print("R^2:" + str(model.summary.r2))
    model.summary.residuals.show()

  2. Copie o ficheiro natality_sparkml.py local para um contentor do Cloud Storage no seu projeto.

    gcloud storage cp natality_sparkml.py gs://bucket-name
    

  3. Envie a tarefa do Pyspark para o serviço Dataproc executando o comando gcloud, apresentado abaixo, a partir de uma janela de terminal na sua máquina local.

    1. O valor da flag --jars torna o spark-bigquery-connector disponível para o trabalho do PySpark em tempo de execução para lhe permitir ler dados do BigQuery num DataFrame do Spark.
      gcloud dataproc jobs submit pyspark \
          gs://your-bucket/natality_sparkml.py \
          --cluster=cluster-name \
          --region=region \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
      

O resultado da regressão linear (resumo do modelo) é apresentado na janela de terminal quando a tarefa é concluída.

<<< # Imprima o resumo do modelo.
... print "Coefficients:" + str(model.coefficients)
Coefficients:[0.0166657454602,-0.00296751984046,0.235714392936,0.00213002070133,-0.00048577251587]
<<< print "Intercept:" + str(model.intercept)
Intercept:-2.26130330748
<<< print "R^2:" + str(model.summary.r2)
R^2:0.295200579035
<<< model.summary.residuals.show()
+--------------------+
|           residuals|
+--------------------+
| -0.7234737533344147|
|  -0.985466980630501|
| -0.6669710598385468|
|  1.4162434829714794|
|-0.09373154375186754|
|-0.15461747949235072|
| 0.32659061654192545|
|  1.5053877697929803|
|  -0.640142797263989|
|   1.229530260294963|
|-0.03776160295256...|
| -0.5160734239126814|
| -1.5165972740062887|
|  1.3269085258245008|
|  1.7604670124710626|
|  1.2348130901905972|
|   2.318660276655887|
|  1.0936947030883175|
|  1.0169768511417363|
| -1.7744915698181583|
+--------------------+
only showing top 20 rows.