使用 Dataproc、BigQuery 和 Apache Spark ML 進行機器學習

Apache Spark 專用的 BigQuery 連接器,能夠讓資料科學家將 BigQuery 可靈活擴充的 SQL 引擎所具備的強大力量,與 Apache Spark 的機器學習功能相結合。在本教學課程中,我們將示範如何在資料集上使用 Dataproc、BigQuery 和 Apache Spark ML 執行機器學習。

建立 BigQuery natality 資料的子集

在本節中,您將會在專案中建立資料集,然後在資料集中建立資料表,再將出生率資料子集從開放大眾使用的出生率 BigQuery 資料集複製到該資料表中。稍後在本教學課程中,您將在這個資料表中使用子集資料,透過使用母親年齡、父親年齡與懷孕週數的函式來預測出生重量。

您可以使用 Google Cloud 控制台建立資料子集,或在本機電腦上執行 Python 指令碼。

控制台

  1. 在專案中建立資料集。

    1. 前往 BigQuery 網頁版 UI
    2. 在左側導覽面板中,按一下專案名稱,然後按一下「建立資料集」
    3. 在「建立資料集」對話方塊中:
      1. 針對「Dataset ID」,輸入「natality_regression」。
      2. 針對「Data location」(資料位置),您可以選擇該資料集的位置。預設值位置為 US multi-region。 資料集建立後即無法變更位置。
      3. 針對「Default table expiration」(預設資料表到期時間),選擇下列其中一個選項:
        • 永不過期 (預設):您必須手動刪除資料表。
        • 天數:資料表會在建立時間之後於指定天數內刪除。
      4. 在「加密」部分,選擇下列其中一個選項:
      5. 按一下「建立資料集」
  2. 針對公開出生率資料集執行查詢,然後將查詢結果儲存在資料集的新資料表中。

    1. 複製下列查詢並貼到查詢編輯器,然後按一下「執行」。
      CREATE OR REPLACE TABLE natality_regression.regression_input as
      SELECT
      weight_pounds,
      mother_age,
      father_age,
      gestation_weeks,
      weight_gain_pounds,
      apgar_5min
      FROM
      `bigquery-public-data.samples.natality`
      WHERE
      weight_pounds IS NOT NULL
      AND mother_age IS NOT NULL
      AND father_age IS NOT NULL
      AND gestation_weeks IS NOT NULL
      AND weight_gain_pounds IS NOT NULL
      AND apgar_5min IS NOT NULL
      
    2. 查詢完成後 (大約一分鐘),結果會儲存為專案中 natality_regression 資料集的「regression_input」BigQuery 資料表。

Python

在試用這個範例之前,請先按照Python使用用戶端程式庫的 Dataproc 快速入門」中的設定說明操作。詳情請參閱 Dataproc Python API 參考說明文件

如要向 Dataproc 進行驗證,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。

  1. 如需安裝 Python 和 Python 專用的 Google Cloud 用戶端程式庫 (需執行程式碼) 的操作說明,請參閱設定 Python 開發環境一文。建議安裝並使用 Python virtualenv

  2. 請複製以下 natality_tutorial.py 程式碼並貼到本機電腦上的 python 殼層。按一下殼層中的 <return> 鍵執行程式碼,以透過公開natality資料子集填入的「regression_input」資料表,在預設Google Cloud 專案中建立「natality_regression」 BigQuery 資料集。

    """Create a Google BigQuery linear regression input table.
    
    In the code below, the following actions are taken:
    * A new dataset is created "natality_regression."
    * A query is run against the public dataset,
        bigquery-public-data.samples.natality, selecting only the data of
        interest to the regression, the output of which is stored in a new
        "regression_input" table.
    * The output table is moved over the wire to the user's default project via
        the built-in BigQuery Connector for Spark that bridges BigQuery and
        Cloud Dataproc.
    """
    
    from google.cloud import bigquery
    
    # Create a new Google BigQuery client using Google Cloud Platform project
    # defaults.
    client = bigquery.Client()
    
    # Prepare a reference to a new dataset for storing the query results.
    dataset_id = "natality_regression"
    dataset_id_full = f"{client.project}.{dataset_id}"
    
    dataset = bigquery.Dataset(dataset_id_full)
    
    # Create the new BigQuery dataset.
    dataset = client.create_dataset(dataset)
    
    # Configure the query job.
    job_config = bigquery.QueryJobConfig()
    
    # Set the destination table to where you want to store query results.
    # As of google-cloud-bigquery 1.11.0, a fully qualified table ID can be
    # used in place of a TableReference.
    job_config.destination = f"{dataset_id_full}.regression_input"
    
    # Set up a query in Standard SQL, which is the default for the BigQuery
    # Python client library.
    # The query selects the fields of interest.
    query = """
        SELECT
            weight_pounds, mother_age, father_age, gestation_weeks,
            weight_gain_pounds, apgar_5min
        FROM
            `bigquery-public-data.samples.natality`
        WHERE
            weight_pounds IS NOT NULL
            AND mother_age IS NOT NULL
            AND father_age IS NOT NULL
            AND gestation_weeks IS NOT NULL
            AND weight_gain_pounds IS NOT NULL
            AND apgar_5min IS NOT NULL
    """
    
    # Run the query.
    client.query_and_wait(query, job_config=job_config)  # Waits for the query to finish
  3. 確認已建立 natality_regression 資料集和 regression_input 資料表。

執行線性迴歸

在本節中,您將使用 Google Cloud 控制台將工作提交至 Dataproc 服務,或從本機終端機執行 gcloud 指令,藉此執行 PySpark 線性迴歸。

控制台

  1. 複製下列程式碼並貼到本機電腦上新的 natality_sparkml.py 檔案中。

    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    sc = SparkContext()
    spark = SparkSession(sc)
    
    # Read the data from BigQuery as a Spark Dataframe.
    natality_data = spark.read.format("bigquery").option(
        "table", "natality_regression.regression_input").load()
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print("Coefficients:" + str(model.coefficients))
    print("Intercept:" + str(model.intercept))
    print("R^2:" + str(model.summary.r2))
    model.summary.residuals.show()

  2. 將本機的 natality_sparkml.py 檔案複製到專案的 Cloud Storage bucket。

    gcloud storage cp natality_sparkml.py gs://bucket-name
    

  3. 從 Dataproc 的「Submit a job」(提交工作) 頁面執行迴歸。

    1. 在「Main python file」(主要 Python 檔案) 欄位中,插入 natality_sparkml.py 檔案副本所在 Cloud Storage 值區的 gs:// URI。

    2. 選取 PySpark 做為「Job type」(工作類型)

    3. 在「Jar files」(Jar 檔案) 欄位中插入 gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar。這會讓 spark-bigquery-connector 在執行階段可供 PySpark 應用程式使用,以便將 BigQuery 資料讀取至 Spark DataFrame。

    4. 填寫「Job ID」(工作 ID)、「Region」(區域) 和「Cluster」(叢集) 欄位。

    5. 按一下「提交」,在叢集上執行工作。

工作完成後,線性迴歸輸出模型摘要會顯示在 Dataproc 工作詳細資料視窗中。

gcloud

  1. 複製下列程式碼並貼到本機電腦上新的 natality_sparkml.py 檔案中。

    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    sc = SparkContext()
    spark = SparkSession(sc)
    
    # Read the data from BigQuery as a Spark Dataframe.
    natality_data = spark.read.format("bigquery").option(
        "table", "natality_regression.regression_input").load()
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print("Coefficients:" + str(model.coefficients))
    print("Intercept:" + str(model.intercept))
    print("R^2:" + str(model.summary.r2))
    model.summary.residuals.show()

  2. 將本機的 natality_sparkml.py 檔案複製到專案的 Cloud Storage bucket。

    gcloud storage cp natality_sparkml.py gs://bucket-name
    

  3. 在本機電腦的終端機視窗中執行下列 gcloud 指令,將 Pyspark 工作提交至 Dataproc 服務。

    1. --jars 標記值會在執行階段提供 spark-bigquery-connector 給 PySpark 作業,讓作業將 BigQuery 資料讀取至 Spark DataFrame。
      gcloud dataproc jobs submit pyspark \
          gs://your-bucket/natality_sparkml.py \
          --cluster=cluster-name \
          --region=region \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
      

工作完成之後,線性迴歸輸出 (模型摘要) 會出現在終端機視窗中。

<<< # Print the model summary.
... print "Coefficients:" + str(model.coefficients)
Coefficients:[0.0166657454602,-0.00296751984046,0.235714392936,0.00213002070133,-0.00048577251587]
<<< print "Intercept:" + str(model.intercept)
Intercept:-2.26130330748
<<< print "R^2:" + str(model.summary.r2)
R^2:0.295200579035
<<< model.summary.residuals.show()
+--------------------+
|           residuals|
+--------------------+
| -0.7234737533344147|
|  -0.985466980630501|
| -0.6669710598385468|
|  1.4162434829714794|
|-0.09373154375186754|
|-0.15461747949235072|
| 0.32659061654192545|
|  1.5053877697929803|
|  -0.640142797263989|
|   1.229530260294963|
|-0.03776160295256...|
| -0.5160734239126814|
| -1.5165972740062887|
|  1.3269085258245008|
|  1.7604670124710626|
|  1.2348130901905972|
|   2.318660276655887|
|  1.0936947030883175|
|  1.0169768511417363|
| -1.7744915698181583|
+--------------------+
only showing top 20 rows.