使用 Dataproc、BigQuery 和 Apache Spark ML 进行机器学习

适用于 Apache SparkBigQuery 连接器让数据科学家能够将 BigQuery 的无缝可扩缩 SQL 引擎的强大功能与 Apache Spark 的机器学习功能相结合。在本教程中,我们将介绍如何使用 Dataproc、BigQuery 和 Apache Spark ML 在数据集上执行机器学习。

创建 BigQuery natality 数据的子集

在本部分中,你在项目中创建一个数据集,然后在数据集中创建一个表,你可从公开可用的出生率 BigQuery 资料集中复制出生率数据子集至该表。在本教程的后半部,您将使用此表中的子集数据来预测出生体重与产妇年龄、父亲年龄和妊娠周数的函数关系。

您可以使用 Google Cloud 控制台创建数据子集,也可以在本地机器上运行 Python 脚本。

控制台

  1. 在项目中创建数据集。

    1. 转到 BigQuery 网页界面
    2. 在左侧导航窗格中,点击您的项目名称,然后点击创建数据集
    3. 创建数据集对话框中执行以下操作:
      1. 对于数据集 ID,输入“natality_regression”。
      2. 对于数据位置,您可以选择数据集的位置。默认位置值为 US multi-region。 创建数据集后,就无法再更改此位置。
      3. 对于默认表到期时间,请选择以下任一选项:
        • 从不(默认值):您必须手动删除表格。
        • 天数:表格将在创建之时起的指定天数后删除。
      4. 对于加密,请选择以下任一选项:
      5. 使用 Web 界面创建数据集时,点击创建数据集
  2. 针对公共出生率数据集运行查询,然后将查询结果保存在数据集的新表中。

    1. 将以下查询复制并粘贴到查询编辑器中,然后点击“运行”。
      CREATE OR REPLACE TABLE natality_regression.regression_input as
      SELECT
      weight_pounds,
      mother_age,
      father_age,
      gestation_weeks,
      weight_gain_pounds,
      apgar_5min
      FROM
      `bigquery-public-data.samples.natality`
      WHERE
      weight_pounds IS NOT NULL
      AND mother_age IS NOT NULL
      AND father_age IS NOT NULL
      AND gestation_weeks IS NOT NULL
      AND weight_gain_pounds IS NOT NULL
      AND apgar_5min IS NOT NULL
      
    2. 查询完成后(大约 1 分钟后),结果会在项目的 natality_regression 数据集中保存为“regression_input”BigQuery 表。

Python

在尝试此示例之前,请按照 Dataproc 快速入门:使用客户端库中的 Python 设置说明进行操作。如需了解详情,请参阅 Dataproc Python API 参考文档

如需向 Dataproc 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

  1. 请参阅设置 Python 开发环境,了解如何安装 Python 和适用于 Python 的 Google Cloud 客户端库(运行代码时需要)。建议您安装和使用 Python virtualenv

  2. 将以下 natality_tutorial.py 代码复制并粘贴到本地机器的 python Shell 中。 在 shell 中按 <return> 键运行代码,以在默认Google Cloud 项目中创建“natality_regression”BigQuery 数据集,其中的“regression_input”表填充了部分公开 natality 数据。

    """Create a Google BigQuery linear regression input table.
    
    In the code below, the following actions are taken:
    * A new dataset is created "natality_regression."
    * A query is run against the public dataset,
        bigquery-public-data.samples.natality, selecting only the data of
        interest to the regression, the output of which is stored in a new
        "regression_input" table.
    * The output table is moved over the wire to the user's default project via
        the built-in BigQuery Connector for Spark that bridges BigQuery and
        Cloud Dataproc.
    """
    
    from google.cloud import bigquery
    
    # Create a new Google BigQuery client using Google Cloud Platform project
    # defaults.
    client = bigquery.Client()
    
    # Prepare a reference to a new dataset for storing the query results.
    dataset_id = "natality_regression"
    dataset_id_full = f"{client.project}.{dataset_id}"
    
    dataset = bigquery.Dataset(dataset_id_full)
    
    # Create the new BigQuery dataset.
    dataset = client.create_dataset(dataset)
    
    # Configure the query job.
    job_config = bigquery.QueryJobConfig()
    
    # Set the destination table to where you want to store query results.
    # As of google-cloud-bigquery 1.11.0, a fully qualified table ID can be
    # used in place of a TableReference.
    job_config.destination = f"{dataset_id_full}.regression_input"
    
    # Set up a query in Standard SQL, which is the default for the BigQuery
    # Python client library.
    # The query selects the fields of interest.
    query = """
        SELECT
            weight_pounds, mother_age, father_age, gestation_weeks,
            weight_gain_pounds, apgar_5min
        FROM
            `bigquery-public-data.samples.natality`
        WHERE
            weight_pounds IS NOT NULL
            AND mother_age IS NOT NULL
            AND father_age IS NOT NULL
            AND gestation_weeks IS NOT NULL
            AND weight_gain_pounds IS NOT NULL
            AND apgar_5min IS NOT NULL
    """
    
    # Run the query.
    client.query_and_wait(query, job_config=job_config)  # Waits for the query to finish
  3. 确认创建 natality_regression 数据集和 regression_input 表。

运行线性回归

在本部分中,您将使用 Google Cloud 控制台或通过从本地终端运行 gcloud 命令,将作业提交到 Dataproc 服务以运行 PySpark 线性回归。

控制台

  1. 将以下代码复制并粘贴到本地机器上的新 natality_sparkml.py 文件中。

    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    sc = SparkContext()
    spark = SparkSession(sc)
    
    # Read the data from BigQuery as a Spark Dataframe.
    natality_data = spark.read.format("bigquery").option(
        "table", "natality_regression.regression_input").load()
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print("Coefficients:" + str(model.coefficients))
    print("Intercept:" + str(model.intercept))
    print("R^2:" + str(model.summary.r2))
    model.summary.residuals.show()

  2. 将本地 natality_sparkml.py 文件复制到项目中的 Cloud Storage 存储分区。

    gcloud storage cp natality_sparkml.py gs://bucket-name
    

  3. 从 Dataproc 提交作业页面运行回归。

    1. 主 Python 文件字段中,插入您的 natality_sparkml.py 文件副本所在的 Cloud Storage 存储桶的 gs:// URI。

    2. 选择 PySpark 作为作业类型

    3. Jar 文件字段中插入 gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar。这可让 PySpark 应用在运行时使用 spark-bigquery-connector 将 BigQuery 数据读取到 Spark DataFrame 中。

    4. 填写作业 ID区域集群字段。

    5. 点击提交以在集群上运行作业。

作业完成后,Dataproc 作业详情窗口将显示线性回归输出模型汇总。

gcloud

  1. 将以下代码复制并粘贴到本地机器上的新 natality_sparkml.py 文件中。

    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    sc = SparkContext()
    spark = SparkSession(sc)
    
    # Read the data from BigQuery as a Spark Dataframe.
    natality_data = spark.read.format("bigquery").option(
        "table", "natality_regression.regression_input").load()
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print("Coefficients:" + str(model.coefficients))
    print("Intercept:" + str(model.intercept))
    print("R^2:" + str(model.summary.r2))
    model.summary.residuals.show()

  2. 将本地 natality_sparkml.py 文件复制到项目中的 Cloud Storage 存储分区。

    gcloud storage cp natality_sparkml.py gs://bucket-name
    

  3. 通过从本地机器的终端窗口中运行如下所示的 gcloud 命令,将 Pyspark 作业提交到 Dataproc 服务。

    1. --jars 标志值可让 PySpark jobv 在运行时使用 spark-bigquery-connector 将 BigQuery 数据读取到 Spark DataFrame 中。
      gcloud dataproc jobs submit pyspark \
          gs://your-bucket/natality_sparkml.py \
          --cluster=cluster-name \
          --region=region \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
      

作业完成时,线性回归输出(模型汇总)将显示在终端窗口中。

<<< # Print the model summary.
... print "Coefficients:" + str(model.coefficients)
Coefficients:[0.0166657454602,-0.00296751984046,0.235714392936,0.00213002070133,-0.00048577251587]
<<< print "Intercept:" + str(model.intercept)
Intercept:-2.26130330748
<<< print "R^2:" + str(model.summary.r2)
R^2:0.295200579035
<<< model.summary.residuals.show()
+--------------------+
|           residuals|
+--------------------+
| -0.7234737533344147|
|  -0.985466980630501|
| -0.6669710598385468|
|  1.4162434829714794|
|-0.09373154375186754|
|-0.15461747949235072|
| 0.32659061654192545|
|  1.5053877697929803|
|  -0.640142797263989|
|   1.229530260294963|
|-0.03776160295256...|
| -0.5160734239126814|
| -1.5165972740062887|
|  1.3269085258245008|
|  1.7604670124710626|
|  1.2348130901905972|
|   2.318660276655887|
|  1.0936947030883175|
|  1.0169768511417363|
| -1.7744915698181583|
+--------------------+
only showing top 20 rows.