שימוש ב-Apache Spark ML ללמידת מכונה

המחבר של BigQuery ל-Apache Spark מאפשר למדעני נתונים לשלב את העוצמה של מנוע SQL עם יכולת שינוי קנה מידה חלקה עם היכולות של Apache Spark Machine Learning. במדריך הזה נראה איך להשתמש ב-Managed Service for Apache Spark, ב-BigQuery וב-Apache Spark ML כדי לבצע למידת מכונה בקבוצת נתונים.

מטרות

משתמשים ברגרסיה לינארית כדי ליצור מודל של משקל הלידה כפונקציה של חמישה גורמים:

  • שבועות הריון
  • גיל האם
  • גיל האב
  • עלייה במשקל של האם במהלך ההיריון
  • ציון אפגר

אפשר להשתמש בכלים הבאים:

  • ‫BigQuery, כדי להכין את טבלת הקלט של הרגרסיה הלינארית, שנכתבת לפרויקט Google Cloud
  • ‫Python, להרצת שאילתות ולניהול נתונים ב-BigQuery
  • ‫Apache Spark, כדי לגשת לטבלת הרגרסיה הלינארית שמתקבלת
  • ‫Spark ML, כדי לבנות ולהעריך את המודל
  • ‫Managed Service for Apache Spark PySpark job, to invoke Spark ML functions

עלויות

במסמך הזה משתמשים ברכיבים הבאים של Google Cloud, והשימוש בהם כרוך בתשלום:

  • Compute Engine
  • Managed Service for Apache Spark
  • BigQuery

כדי להעריך את ההוצאות בהתאם לתחזית השימוש שלכם, אתם יכולים להיעזר במחשבון העלויות.

משתמשים חדשים של Google Cloud ? יכול להיות שאתם זכאים לתקופת ניסיון בחינם.

לפני שמתחילים

באשכול Managed Service for Apache Spark מותקנים רכיבי Spark, כולל Spark ML. כדי להגדיר אשכול של Managed Service for Apache Spark ולהריץ את הקוד בדוגמה הזו, צריך לבצע את הפעולות הבאות (או לוודא שהן בוצעו):

  1. נכנסים לחשבון Google Cloud . אם אתם משתמשים חדשים ב- Google Cloud, צרו חשבון כדי שתוכלו להעריך את הביצועים של המוצרים שלנו בתרחישים מהעולם האמיתי. לקוחות חדשים מקבלים בחינם גם קרדיט בשווי 300$ להרצה, לבדיקה ולפריסה של עומסי העבודה.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Enable the Dataproc, BigQuery, Compute Engine APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  4. התקינו את ה-CLI של Google Cloud.

  5. אם אתם משתמשים בספק זהויות חיצוני (IdP), קודם אתם צריכים להיכנס ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.

  6. כדי לאתחל את ה-CLI של gcloud, הריצו את הפקודה הבאה:

    gcloud init
  7. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  8. Enable the Dataproc, BigQuery, Compute Engine APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  9. התקינו את ה-CLI של Google Cloud.

  10. אם אתם משתמשים בספק זהויות חיצוני (IdP), קודם אתם צריכים להיכנס ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.

  11. כדי לאתחל את ה-CLI של gcloud, הריצו את הפקודה הבאה:

    gcloud init
  12. יוצרים אשכול של Managed Service for Apache Spark בפרויקט. באשכול צריך להפעיל גרסה של Managed Service for Apache Spark עם Spark 2.0 ומעלה (כולל ספריות של למידת מכונה).

יצירת קבוצת משנה של נתוני לידות ב-BigQuery

בקטע הזה, תיצרו מערך נתונים בפרויקט, ואז תיצרו טבלה במערך הנתונים שאליה תעתיקו קבוצת משנה של נתוני שיעור הילודה ממערך הנתונים הציבורי natality ב-BigQuery. בהמשך המדריך הזה תשתמשו בנתוני קבוצת המשנה שבטבלה הזו כדי לחזות את משקל הלידה כפונקציה של גיל האם, גיל האב ושבועות ההיריון.

אפשר ליצור את קבוצת המשנה של הנתונים באמצעות מסוף Google Cloud או על ידי הפעלת סקריפט Python במחשב המקומי.

המסוף

  1. יוצרים מערך נתונים בפרויקט.

    1. עוברים לממשק המשתמש של BigQuery באינטרנט.
    2. בחלונית הניווט הימנית, לוחצים על שם הפרויקט ואז על CREATE DATASET (יצירה של קבוצת נתונים).
    3. בתיבת הדו-שיח יצירת מערך נתונים:
      1. בשדה Dataset ID (מזהה מערך הנתונים), מזינים natality_regression.
      2. בקטע Data location, אפשר לבחור מיקום למערך הנתונים. מיקום ברירת המחדל הוא US multi-region. אחרי שיוצרים את מערך הנתונים, אי אפשר לשנות את המיקום שלו.
      3. בקטע Default table expiration (תפוגה של טבלה שמוגדרת כברירת מחדל), בוחרים באחת מהאפשרויות הבאות:
        • אף פעם (ברירת מחדל): צריך למחוק את הטבלה באופן ידני.
        • Number of days (מספר הימים): הטבלה תימחק אחרי מספר הימים שצוין ממועד היצירה שלה.
      4. בקטע הצפנה, בוחרים באחת מהאפשרויות הבאות:
      5. לוחצים על יצירת מערך נתונים.
  2. מריצים שאילתה על מערך הנתונים הציבורי של נתוני הלידות, ואז שומרים את תוצאות השאילתה בטבלה חדשה במערך הנתונים.

    1. מעתיקים את השאילתה הבאה ומדביקים אותה בעורך השאילתות, ואז לוחצים על Run (הפעלה).
      CREATE OR REPLACE TABLE natality_regression.regression_input as
      SELECT
      weight_pounds,
      mother_age,
      father_age,
      gestation_weeks,
      weight_gain_pounds,
      apgar_5min
      FROM
      `bigquery-public-data.samples.natality`
      WHERE
      weight_pounds IS NOT NULL
      AND mother_age IS NOT NULL
      AND father_age IS NOT NULL
      AND gestation_weeks IS NOT NULL
      AND weight_gain_pounds IS NOT NULL
      AND apgar_5min IS NOT NULL
      
    2. אחרי שהשאילתה מסתיימת (תוך דקה בערך), התוצאות נשמרות כטבלה ב-BigQuery בשם regression_input במערך הנתונים natality_regression בפרויקט.

Python

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי Pythonההוראות להגדרה במאמר התחלה מהירה של Managed Service for Apache Spark באמצעות ספריות לקוח. מידע נוסף מופיע במאמרי העזרה של Managed Service for Apache Spark Python API.

כדי לבצע אימות ל-Managed Service for Apache Spark, מגדירים את ה-Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.

  1. הוראות להתקנת Python וספריית הלקוח של Google Cloud ל-Python (שנדרשת להרצת הקוד) מופיעות במאמר בנושא הגדרת סביבת פיתוח ב-Python. מומלץ להתקין ולהשתמש ב-Python virtualenv.

  2. מעתיקים את הקוד natality_tutorial.py שבהמשך ומדביקים אותו במעטפת python במחשב המקומי. מקישים על המקש <return> בשורת הפקודה כדי להריץ את הקוד ליצירת מערך נתונים בשם natality_regression ב-BigQuery בפרויקט ברירת המחדלGoogle Cloud עם טבלה בשם regression_input שאוכלסת עם קבוצת משנה של נתוני natality הציבוריים.

    """Create a Google BigQuery linear regression input table.
    
    In the code below, the following actions are taken:
    * A new dataset is created "natality_regression."
    * A query is run against the public dataset,
        bigquery-public-data.samples.natality, selecting only the data of
        interest to the regression, the output of which is stored in a new
        "regression_input" table.
    * The output table is moved over the wire to the user's default project via
        the built-in BigQuery Connector for Spark that bridges BigQuery and
        Cloud Dataproc.
    """
    
    from google.cloud import bigquery
    
    # Create a new Google BigQuery client using Google Cloud Platform project
    # defaults.
    client = bigquery.Client()
    
    # Prepare a reference to a new dataset for storing the query results.
    dataset_id = "natality_regression"
    dataset_id_full = f"{client.project}.{dataset_id}"
    
    dataset = bigquery.Dataset(dataset_id_full)
    
    # Create the new BigQuery dataset.
    dataset = client.create_dataset(dataset)
    
    # Configure the query job.
    job_config = bigquery.QueryJobConfig()
    
    # Set the destination table to where you want to store query results.
    # As of google-cloud-bigquery 1.11.0, a fully qualified table ID can be
    # used in place of a TableReference.
    job_config.destination = f"{dataset_id_full}.regression_input"
    
    # Set up a query in Standard SQL, which is the default for the BigQuery
    # Python client library.
    # The query selects the fields of interest.
    query = """
        SELECT
            weight_pounds, mother_age, father_age, gestation_weeks,
            weight_gain_pounds, apgar_5min
        FROM
            `bigquery-public-data.samples.natality`
        WHERE
            weight_pounds IS NOT NULL
            AND mother_age IS NOT NULL
            AND father_age IS NOT NULL
            AND gestation_weeks IS NOT NULL
            AND weight_gain_pounds IS NOT NULL
            AND apgar_5min IS NOT NULL
    """
    
    # Run the query.
    client.query_and_wait(query, job_config=job_config)  # Waits for the query to finish
  3. מאשרים את היצירה של מערך הנתונים natality_regression והטבלה regression_input.

הרצת רגרסיה ליניארית

בקטע הזה תריצו רגרסיה לינארית של PySpark על ידי שליחת העבודה אל Managed Service for Apache Spark באמצעות מסוף Google Cloud או על ידי הרצת הפקודה gcloud ממסוף מקומי.

המסוף

  1. מעתיקים את הקוד הבא ומדביקים אותו בקובץ natality_sparkml.pyחדש במחשב המקומי.

    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    sc = SparkContext()
    spark = SparkSession(sc)
    
    # Read the data from BigQuery as a Spark Dataframe.
    natality_data = spark.read.format("bigquery").option(
        "table", "natality_regression.regression_input").load()
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print("Coefficients:" + str(model.coefficients))
    print("Intercept:" + str(model.intercept))
    print("R^2:" + str(model.summary.r2))
    model.summary.residuals.show()

  2. מעתיקים את הקובץ המקומי natality_sparkml.py לקטגוריה של Cloud Storage בפרויקט.

    gcloud storage cp natality_sparkml.py gs://bucket-name
    

  3. מריצים את הרגרסיה מהדף Submit a job (שליחת משימה) ב-Managed Service for Apache Spark.

    1. בשדה Main python file (קובץ ה-Python הראשי), מזינים את ה-URI של gs:// קטגוריה של Cloud Storage שבה נמצא העותק של קובץ natality_sparkml.py.

    2. בוחרים באפשרות PySpark בתור סוג העבודה.

    3. מזינים gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar בשדה Jar files (קבצי Jar). כך, מחבר spark-bigquery-connector יהיה זמין לאפליקציית PySpark בזמן הריצה, ויאפשר לה לקרוא נתונים מ-BigQuery לתוך Spark DataFrame.

    4. ממלאים את השדות Job ID (מזהה המשימה), Region (אזור) ו-Cluster (אשכול).

    5. לוחצים על שליחה כדי להריץ את העבודה באשכול.

כשהעבודה מסתיימת, סיכום של מודל הפלט של הרגרסיה הלינארית מופיע בחלון פרטי העבודה של Managed Service for Apache Spark.

gcloud

  1. מעתיקים את הקוד הבא ומדביקים אותו בקובץ natality_sparkml.pyחדש במחשב המקומי.

    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    sc = SparkContext()
    spark = SparkSession(sc)
    
    # Read the data from BigQuery as a Spark Dataframe.
    natality_data = spark.read.format("bigquery").option(
        "table", "natality_regression.regression_input").load()
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print("Coefficients:" + str(model.coefficients))
    print("Intercept:" + str(model.intercept))
    print("R^2:" + str(model.summary.r2))
    model.summary.residuals.show()

  2. מעתיקים את הקובץ המקומי natality_sparkml.py לקטגוריה של Cloud Storage בפרויקט.

    gcloud storage cp natality_sparkml.py gs://bucket-name
    

  3. כדי לשלוח את משימת Pyspark אל Managed Service for Apache Spark, מריצים את הפקודה gcloud שמוצגת בהמשך מחלון טרמינל במחשב המקומי.

    1. הערך של הדגל ‎--jars מאפשר למחבר spark-bigquery-connector להיות זמין למשימת PySpark בזמן הריצה, כדי שיוכל לקרוא נתונים מ-BigQuery לתוך Spark DataFrame.
      gcloud dataproc jobs submit pyspark \
          gs://your-bucket/natality_sparkml.py \
          --cluster=cluster-name \
          --region=region \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
      

פלט הרגרסיה הלינארית (סיכום המודל) מופיע בחלון הטרמינל כשהמשימה מסתיימת.

<<< # Print the model summary.
‪... print "Coefficients:" + str(model.coefficients)
Coefficients:[0.0166657454602,-0.00296751984046,0.235714392936,0.00213002070133,-0.00048577251587]
<<< print "Intercept:" + str(model.intercept)
Intercept:-2.26130330748
<<< print "R^2:" + str(model.summary.r2)
R^2:0.295200579035
<<< model.summary.residuals.show()
+--------------------+
|           residuals|
+--------------------+
| -0.7234737533344147|
|  -0.985466980630501|
| -0.6669710598385468|
|  1.4162434829714794|
|-0.09373154375186754|
|-0.15461747949235072|
| 0.32659061654192545|
|  1.5053877697929803|
|  -0.640142797263989|
|   1.229530260294963|
|-0.03776160295256...|
| -0.5160734239126814|
| -1.5165972740062887|
|  1.3269085258245008|
|  1.7604670124710626|
|  1.2348130901905972|
|   2.318660276655887|
|  1.0936947030883175|
|  1.0169768511417363|
| -1.7744915698181583|
+--------------------+
only showing top 20 rows.

  

הסרת המשאבים

אחרי שמסיימים את המדריך, אפשר למחוק את המשאבים שנוצרו, כדי שהם יפסיקו להשתמש במכסה ולצבור חיובים. בסעיפים הבאים מוסבר איך למחוק או להשבית את המשאבים האלו.

מחיקת הפרויקט

הדרך הקלה ביותר לבטל את החיוב היא למחוק את הפרויקט שיצרתם בשביל המדריך הזה.

כדי למחוק את הפרויקט:

  1. במסוף Google Cloud , נכנסים לדף Manage resources.

    כניסה לדף Manage resources

  2. ברשימת הפרויקטים, בוחרים את הפרויקט שרוצים למחוק ולוחצים על Delete.
  3. כדי למחוק את הפרויקט, כותבים את מזהה הפרויקט בתיבת הדו-שיח ולוחצים על Shut down.

מחיקת אשכול Managed Service for Apache Spark

כך מוחקים אשכול

המאמרים הבאים