Menggunakan Apache Spark ML untuk machine learning

Konektor BigQuery untuk Apache Spark memungkinkan Data Scientist memadukan kecanggihan mesin SQL yang dapat diskalakan dengan lancar dari BigQuery dengan kemampuan Machine Learning Apache Spark. Dalam tutorial ini, kami menunjukkan cara menggunakan Managed Service for Apache Spark, BigQuery, dan Apache Spark ML untuk melakukan machine learning pada set data.

Tujuan

Gunakan regresi linear untuk membuat model berat badan bayi saat lahir sebagai fungsi dari lima faktor:

  • minggu kehamilan
  • usia ibu
  • usia ayah
  • penambahan berat badan ibu selama kehamilan
  • Skor Apgar

Gunakan alat berikut:

  • BigQuery, untuk menyiapkan tabel input regresi linear, yang ditulis ke project Google Cloud Anda
  • Python, untuk membuat kueri dan mengelola data di BigQuery
  • Apache Spark, untuk mengakses tabel regresi linier yang dihasilkan
  • Spark ML, untuk membuat dan mengevaluasi model
  • Tugas PySpark Managed Service for Apache Spark, untuk memanggil fungsi Spark ML

Biaya

Dalam dokumen ini, Anda akan menggunakan komponen Google Cloudyang dapat ditagih berikut:

  • Compute Engine
  • Managed Service for Apache Spark
  • BigQuery

Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda, gunakan kalkulator harga.

Pengguna Google Cloud baru mungkin memenuhi syarat untuk mendapatkan uji coba gratis.

Sebelum memulai

Cluster Managed Service for Apache Spark telah menginstal komponen Spark, termasuk Spark ML. Untuk menyiapkan cluster Managed Service for Apache Spark dan menjalankan kode dalam contoh ini, Anda harus melakukan (atau telah melakukan) hal berikut:

  1. Login ke akun Google Cloud Anda. Jika Anda baru menggunakan Google Cloud, buat akun untuk mengevaluasi performa produk kami dalam skenario dunia nyata. Pelanggan baru juga mendapatkan kredit gratis senilai $300 untuk menjalankan, menguji, dan men-deploy workload.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Enable the Dataproc, BigQuery, Compute Engine APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  4. Instal Google Cloud CLI.

  5. Jika Anda menggunakan penyedia identitas (IdP) eksternal, Anda harus login ke gcloud CLI dengan identitas gabungan Anda terlebih dahulu.

  6. Untuk melakukan inisialisasi gcloud CLI, jalankan perintah berikut:

    gcloud init
  7. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  8. Enable the Dataproc, BigQuery, Compute Engine APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  9. Instal Google Cloud CLI.

  10. Jika Anda menggunakan penyedia identitas (IdP) eksternal, Anda harus login ke gcloud CLI dengan identitas gabungan Anda terlebih dahulu.

  11. Untuk melakukan inisialisasi gcloud CLI, jalankan perintah berikut:

    gcloud init
  12. Buat cluster Managed Service for Apache Spark di project Anda. Cluster Anda harus menjalankan Managed Service for Apache Spark version dengan Spark 2.0 atau yang lebih tinggi, (termasuk library machine learning).

Membuat subset data natalitas BigQuery

Di bagian ini, Anda akan membuat set data di project, lalu membuat tabel di set data yang akan Anda salin ke subset data tingkat kelahiran dari set data BigQuery natality yang tersedia untuk publik. Nanti dalam tutorial ini, Anda akan menggunakan data subset dalam tabel ini untuk memprediksi berat lahir sebagai fungsi dari usia ibu, usia ayah, dan minggu kehamilan.

Anda dapat membuat subset data menggunakan konsol Google Cloud atau menjalankan skrip Python di mesin lokal Anda.

Konsol

  1. Buat set data di project Anda.

    1. Buka UI Web BigQuery.
    2. Di panel navigasi kiri, klik nama project Anda, lalu klik CREATE DATASET.
    3. Dalam dialog Buat set data:
      1. Untuk Dataset ID, masukkan "natality_regression".
      2. Untuk Lokasi data, Anda dapat memilih lokasi untuk set data. Lokasi nilai defaultnya adalah US multi-region. Setelah set data dibuat, lokasi tidak dapat diubah.
      3. Untuk Default table expiration, pilih salah satu opsi berikut:
        • Tidak pernah (default): Anda harus menghapus tabel secara manual.
        • Jumlah hari: Tabel akan dihapus setelah jumlah hari yang ditentukan sejak waktu pembuatannya.
      4. Untuk Enkripsi, pilih salah satu opsi berikut:
      5. Klik Create dataset.
  2. Jalankan kueri terhadap set data publik kelahiran, lalu simpan hasil kueri dalam tabel baru di set data Anda.

    1. Salin dan tempel kueri berikut ke dalam Editor Kueri, lalu klik Jalankan.
      CREATE OR REPLACE TABLE natality_regression.regression_input as
      SELECT
      weight_pounds,
      mother_age,
      father_age,
      gestation_weeks,
      weight_gain_pounds,
      apgar_5min
      FROM
      `bigquery-public-data.samples.natality`
      WHERE
      weight_pounds IS NOT NULL
      AND mother_age IS NOT NULL
      AND father_age IS NOT NULL
      AND gestation_weeks IS NOT NULL
      AND weight_gain_pounds IS NOT NULL
      AND apgar_5min IS NOT NULL
      
    2. Setelah kueri selesai (dalam waktu sekitar satu menit), hasilnya akan disimpan sebagai tabel BigQuery "regression_input" dalam set data natality_regression di project Anda.

Python

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Panduan memulai Managed Service for Apache Spark menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Python Managed Service for Apache Spark.

Untuk melakukan autentikasi ke Managed Service for Apache Spark, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

  1. Lihat Menyiapkan Lingkungan Pengembangan Python untuk mengetahui petunjuk tentang cara menginstal Python dan Library Klien Google Cloud untuk Python (diperlukan untuk menjalankan kode). Sebaiknya instal dan gunakan virtualenv Python.

  2. Salin dan tempel kode natality_tutorial.py, di bawah, ke dalam shell python di komputer lokal Anda. Tekan tombol <return> di shell untuk menjalankan kode guna membuat set data BigQuery "natality_regression" di projectGoogle Cloud default Anda dengan tabel "regression_input" yang diisi dengan subset data natalitypublik.

    """Create a Google BigQuery linear regression input table.
    
    In the code below, the following actions are taken:
    * A new dataset is created "natality_regression."
    * A query is run against the public dataset,
        bigquery-public-data.samples.natality, selecting only the data of
        interest to the regression, the output of which is stored in a new
        "regression_input" table.
    * The output table is moved over the wire to the user's default project via
        the built-in BigQuery Connector for Spark that bridges BigQuery and
        Cloud Dataproc.
    """
    
    from google.cloud import bigquery
    
    # Create a new Google BigQuery client using Google Cloud Platform project
    # defaults.
    client = bigquery.Client()
    
    # Prepare a reference to a new dataset for storing the query results.
    dataset_id = "natality_regression"
    dataset_id_full = f"{client.project}.{dataset_id}"
    
    dataset = bigquery.Dataset(dataset_id_full)
    
    # Create the new BigQuery dataset.
    dataset = client.create_dataset(dataset)
    
    # Configure the query job.
    job_config = bigquery.QueryJobConfig()
    
    # Set the destination table to where you want to store query results.
    # As of google-cloud-bigquery 1.11.0, a fully qualified table ID can be
    # used in place of a TableReference.
    job_config.destination = f"{dataset_id_full}.regression_input"
    
    # Set up a query in Standard SQL, which is the default for the BigQuery
    # Python client library.
    # The query selects the fields of interest.
    query = """
        SELECT
            weight_pounds, mother_age, father_age, gestation_weeks,
            weight_gain_pounds, apgar_5min
        FROM
            `bigquery-public-data.samples.natality`
        WHERE
            weight_pounds IS NOT NULL
            AND mother_age IS NOT NULL
            AND father_age IS NOT NULL
            AND gestation_weeks IS NOT NULL
            AND weight_gain_pounds IS NOT NULL
            AND apgar_5min IS NOT NULL
    """
    
    # Run the query.
    client.query_and_wait(query, job_config=job_config)  # Waits for the query to finish
  3. Konfirmasi pembuatan set data natality_regression dan tabel regression_input.

Menjalankan regresi linear

Di bagian ini, Anda akan menjalankan regresi linear PySpark dengan mengirimkan tugas ke Managed Service for Apache Spark menggunakan konsol Google Cloud atau dengan menjalankan perintah gcloud dari terminal lokal.

Konsol

  1. Salin dan tempel kode berikut ke dalam file natality_sparkml.py baru di mesin lokal Anda.

    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    sc = SparkContext()
    spark = SparkSession(sc)
    
    # Read the data from BigQuery as a Spark Dataframe.
    natality_data = spark.read.format("bigquery").option(
        "table", "natality_regression.regression_input").load()
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print("Coefficients:" + str(model.coefficients))
    print("Intercept:" + str(model.intercept))
    print("R^2:" + str(model.summary.r2))
    model.summary.residuals.show()

  2. Salin file natality_sparkml.py lokal ke bucket Cloud Storage di project Anda.

    gcloud storage cp natality_sparkml.py gs://bucket-name
    

  3. Jalankan regresi dari halaman Managed Service for Apache Spark Submit a job.

    1. Di kolom Main python file, masukkan URI gs:// dari bucket Cloud Storage tempat salinan file natality_sparkml.py Anda berada.

    2. Pilih PySpark sebagai Jenis tugas.

    3. Masukkan gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar di kolom File jar. Hal ini membuat spark-bigquery-connector tersedia untuk aplikasi PySpark saat runtime agar dapat membaca data BigQuery ke dalam DataFrame Spark.

    4. Isi kolom Job ID, Region, dan Cluster.

    5. Klik Kirim untuk menjalankan tugas di cluster Anda.

Setelah tugas selesai, ringkasan model output regresi linear akan muncul di jendela detail Tugas Managed Service for Apache Spark.

gcloud

  1. Salin dan tempel kode berikut ke dalam file natality_sparkml.py baru di mesin lokal Anda.

    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    sc = SparkContext()
    spark = SparkSession(sc)
    
    # Read the data from BigQuery as a Spark Dataframe.
    natality_data = spark.read.format("bigquery").option(
        "table", "natality_regression.regression_input").load()
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print("Coefficients:" + str(model.coefficients))
    print("Intercept:" + str(model.intercept))
    print("R^2:" + str(model.summary.r2))
    model.summary.residuals.show()

  2. Salin file natality_sparkml.py lokal ke bucket Cloud Storage di project Anda.

    gcloud storage cp natality_sparkml.py gs://bucket-name
    

  3. Kirimkan tugas Pyspark ke Managed Service for Apache Spark dengan menjalankan perintah gcloud, yang ditunjukkan di bawah, dari jendela terminal di komputer lokal Anda.

    1. Nilai tanda --jars membuat spark-bigquery-connector tersedia untuk tugas PySpark saat runtime agar dapat membaca data BigQuery ke dalam DataFrame Spark.
      gcloud dataproc jobs submit pyspark \
          gs://your-bucket/natality_sparkml.py \
          --cluster=cluster-name \
          --region=region \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
      

Output regresi linear (ringkasan model) akan muncul di jendela terminal setelah tugas selesai.

<<< # Cetak ringkasan model.
... print "Coefficients:" + str(model.coefficients)
Coefficients:[0.0166657454602,-0.00296751984046,0.235714392936,0.00213002070133,-0.00048577251587]
<<< print "Intercept:" + str(model.intercept)
Intercept:-2.26130330748
<<< print "R^2:" + str(model.summary.r2)
R^2:0.295200579035
<<< model.summary.residuals.show()
+--------------------+
|           residuals|
+--------------------+
| -0.7234737533344147|
|  -0.985466980630501|
| -0.6669710598385468|
|  1.4162434829714794|
|-0.09373154375186754|
|-0.15461747949235072|
| 0.32659061654192545|
|  1.5053877697929803|
|  -0.640142797263989|
|   1.229530260294963|
|-0.03776160295256...|
| -0.5160734239126814|
| -1.5165972740062887|
|  1.3269085258245008|
|  1.7604670124710626|
|  1.2348130901905972|
|   2.318660276655887|
|  1.0936947030883175|
|  1.0169768511417363|
| -1.7744915698181583|
+--------------------+
only showing top 20 rows.

  

Pembersihan

Setelah menyelesaikan tutorial, Anda dapat membersihkan resource yang dibuat agar tidak lagi menggunakan kuota dan menimbulkan tagihan. Bagian berikut menjelaskan cara menghapus atau menonaktifkan resource ini.

Menghapus project

Cara termudah untuk menghilangkan penagihan adalah dengan menghapus project yang Anda buat untuk tutorial.

Untuk menghapus project:

  1. Di Konsol Google Cloud , buka halaman Manage resources.

    Buka Kelola resource

  2. Pada daftar project, pilih project yang ingin Anda hapus, lalu klik Delete.
  3. Pada dialog, ketik project ID, lalu klik Shut down untuk menghapus project.

Menghapus cluster Managed Service for Apache Spark

Lihat Menghapus cluster.

Langkah berikutnya