機械学習で Dataproc、BigQuery、Apache Spark ML を使用する

Apache Spark 用の BigQuery コネクタを使用すると、データ サイエンティストは、BigQuery のシームレスでスケーラブルな SQL エンジンの能力と Apache Spark の ML 機能を融合できます。このチュートリアルでは、Dataproc、BigQuery、Apache Spark ML を使用して、データセットに対して ML を実行する方法を示します。

BigQuery natality データのサブセットを作成する

このセクションでは、プロジェクトにデータセットを作成し、そのデータセットにテーブルを作成して、公表されている出生率 BigQuery データセットから出生率データのサブセットをコピーします。このチュートリアルの後半では、このテーブルのサブセット データを使用して、母親の年齢、父親の年齢、妊娠週に基づいて出生体重を予測します。

データ サブセットは、 Google Cloud コンソールを使用するか、ローカルマシンで Python スクリプトを実行して作成できます。

コンソール

  1. プロジェクトにデータセットを作成します。

    1. BigQuery ウェブ UI に移動します
    2. 左側のナビゲーション パネルでプロジェクト名をクリックし、続いて [データセットを作成] をクリックします。
    3. [データセットを作成] ダイアログで、次のように指定します。
      1. [データセット ID] に「natality_regression」と入力します。
      2. [データのロケーション] では、データセットのロケーションを選択できます。ロケーションのデフォルト値は、US multi-region です。データセットの作成後はロケーションを変更できません。
      3. [デフォルトのテーブルの有効期限] では、次のいずれかのオプションを選択します。
        • [無期限](デフォルト): テーブルは手動で削除する必要があります。
        • [日数]: テーブルは、作成日から指定した日数後に削除されます。
      4. [暗号化] では、次のいずれかのオプションを選択します。
      5. [データセットを作成] をクリックします。
  2. 公表されている出生率データセットに対してクエリを実行し、クエリ結果をデータセット内の新しいテーブルに保存します。

    1. 次のクエリをコピーしてクエリエディタに貼り付け、[実行] をクリックします。
      CREATE OR REPLACE TABLE natality_regression.regression_input as
      SELECT
      weight_pounds,
      mother_age,
      father_age,
      gestation_weeks,
      weight_gain_pounds,
      apgar_5min
      FROM
      `bigquery-public-data.samples.natality`
      WHERE
      weight_pounds IS NOT NULL
      AND mother_age IS NOT NULL
      AND father_age IS NOT NULL
      AND gestation_weeks IS NOT NULL
      AND weight_gain_pounds IS NOT NULL
      AND apgar_5min IS NOT NULL
      
    2. 約 1 分後にクエリが完了すると、結果がプロジェクトの natality_regression データセットの「regression_input」BigQuery テーブルとして保存されます。

Python

このサンプルを試す前に、クライアント ライブラリを使用した Dataproc のクイックスタートにある Python の設定手順を行ってください。詳細については、Dataproc Python API のリファレンス ドキュメントをご覧ください。

Dataproc に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。

  1. Python および Python 用 Google Cloud クライアント ライブラリ(コードの実行に必要)をインストールする方法については、Python 開発環境の設定をご覧ください。Python virtualenv をインストールして使用することをおすすめします。

  2. 以下の natality_tutorial.py コードをコピーしてローカルマシンの python シェルに貼り付けます。シェルで <return> キーを押してコードを実行し、デフォルトのGoogle Cloud プロジェクトに「natality_regression」BigQuery データセットを作成します。「regression_input」テーブルには、公表されている natality データのサブセットが入力されています。

    """Create a Google BigQuery linear regression input table.
    
    In the code below, the following actions are taken:
    * A new dataset is created "natality_regression."
    * A query is run against the public dataset,
        bigquery-public-data.samples.natality, selecting only the data of
        interest to the regression, the output of which is stored in a new
        "regression_input" table.
    * The output table is moved over the wire to the user's default project via
        the built-in BigQuery Connector for Spark that bridges BigQuery and
        Cloud Dataproc.
    """
    
    from google.cloud import bigquery
    
    # Create a new Google BigQuery client using Google Cloud Platform project
    # defaults.
    client = bigquery.Client()
    
    # Prepare a reference to a new dataset for storing the query results.
    dataset_id = "natality_regression"
    dataset_id_full = f"{client.project}.{dataset_id}"
    
    dataset = bigquery.Dataset(dataset_id_full)
    
    # Create the new BigQuery dataset.
    dataset = client.create_dataset(dataset)
    
    # Configure the query job.
    job_config = bigquery.QueryJobConfig()
    
    # Set the destination table to where you want to store query results.
    # As of google-cloud-bigquery 1.11.0, a fully qualified table ID can be
    # used in place of a TableReference.
    job_config.destination = f"{dataset_id_full}.regression_input"
    
    # Set up a query in Standard SQL, which is the default for the BigQuery
    # Python client library.
    # The query selects the fields of interest.
    query = """
        SELECT
            weight_pounds, mother_age, father_age, gestation_weeks,
            weight_gain_pounds, apgar_5min
        FROM
            `bigquery-public-data.samples.natality`
        WHERE
            weight_pounds IS NOT NULL
            AND mother_age IS NOT NULL
            AND father_age IS NOT NULL
            AND gestation_weeks IS NOT NULL
            AND weight_gain_pounds IS NOT NULL
            AND apgar_5min IS NOT NULL
    """
    
    # Run the query.
    client.query_and_wait(query, job_config=job_config)  # Waits for the query to finish
  3. natality_regression データセットと regression_input テーブルの作成を確認します。

線形回帰を実行する

このセクションでは、 Google Cloud コンソールを使用して Dataproc サービスにジョブを送信するか、ローカル ターミナルから gcloud コマンドを実行して PySpark 線形回帰を実行します。

コンソール

  1. ローカルマシンの新しい natality_sparkml.py ファイルに、次のコードをコピーして貼り付けます。

    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    sc = SparkContext()
    spark = SparkSession(sc)
    
    # Read the data from BigQuery as a Spark Dataframe.
    natality_data = spark.read.format("bigquery").option(
        "table", "natality_regression.regression_input").load()
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print("Coefficients:" + str(model.coefficients))
    print("Intercept:" + str(model.intercept))
    print("R^2:" + str(model.summary.r2))
    model.summary.residuals.show()

  2. ローカルの natality_sparkml.py ファイルをプロジェクトの Cloud Storage バケットにコピーします。

    gcloud storage cp natality_sparkml.py gs://bucket-name
    

  3. Dataproc の [ジョブを送信] ページから回帰を実行します。

    1. [メインの Python ファイル] フィールドで、natality_sparkml.py ファイルのコピーが置かれている Cloud Storage バケットの gs:// URI を挿入します。

    2. [ジョブタイプ] として PySpark を選択します。

    3. gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar を [JAR ファイル] フィールドに挿入します。これにより、実行時に spark-bigquery-connector を PySpark アプリケーションで使用できるようになり、BigQuery データを Spark DataFrame に読み込めます。

    4. [ジョブ ID]、[リージョン]、[クラスタ] フィールドに入力します。

    5. [送信] をクリックして、クラスタでジョブを実行します。

ジョブが完了すると、線形回帰出力モデルの概要が Dataproc の [ジョブの詳細] ウィンドウに表示されます。

gcloud

  1. ローカルマシンの新しい natality_sparkml.py ファイルに、次のコードをコピーして貼り付けます。

    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    sc = SparkContext()
    spark = SparkSession(sc)
    
    # Read the data from BigQuery as a Spark Dataframe.
    natality_data = spark.read.format("bigquery").option(
        "table", "natality_regression.regression_input").load()
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print("Coefficients:" + str(model.coefficients))
    print("Intercept:" + str(model.intercept))
    print("R^2:" + str(model.summary.r2))
    model.summary.residuals.show()

  2. ローカルの natality_sparkml.py ファイルをプロジェクトの Cloud Storage バケットにコピーします。

    gcloud storage cp natality_sparkml.py gs://bucket-name
    

  3. 下に示すようにローカルマシンのターミナル ウィンドウから gcloud コマンドを実行することにより、Pyspark ジョブを Dataproc サービスに送信します。

    1. --jars フラグの値を指定することで、実行時に spark-bigquery-connector を PySpark ジョブで使用できるようになり、BigQuery データを Spark DataFrame に読み込めます。
      gcloud dataproc jobs submit pyspark \
          gs://your-bucket/natality_sparkml.py \
          --cluster=cluster-name \
          --region=region \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
      

ジョブが完了すると、線形回帰出力(モデル概要)がターミナル ウィンドウに表示されます。

<<< # モデルの概要を出力します。
... print "Coefficients:" + str(model.coefficients)
Coefficients:[0.0166657454602,-0.00296751984046,0.235714392936,0.00213002070133,-0.00048577251587]
<<< print "Intercept:" + str(model.intercept)
Intercept:-2.26130330748
<<< print "R^2:" + str(model.summary.r2)
R^2:0.295200579035
<<< model.summary.residuals.show()
+--------------------+
|           residuals|
+--------------------+
| -0.7234737533344147|
|  -0.985466980630501|
| -0.6669710598385468|
|  1.4162434829714794|
|-0.09373154375186754|
|-0.15461747949235072|
| 0.32659061654192545|
|  1.5053877697929803|
|  -0.640142797263989|
|   1.229530260294963|
|-0.03776160295256...|
| -0.5160734239126814|
| -1.5165972740062887|
|  1.3269085258245008|
|  1.7604670124710626|
|  1.2348130901905972|
|   2.318660276655887|
|  1.0936947030883175|
|  1.0169768511417363|
| -1.7744915698181583|
+--------------------+
only showing top 20 rows.