שימוש במחבר Spark Spanner

בדף הזה מוסבר איך ליצור אשכול Managed Service for Apache Spark שמשתמש במחבר Spark Spanner כדי לקרוא נתונים מ-Spanner ולכתוב נתונים ב-Spanner באמצעות Apache Spark.

המחבר של Spanner פועל עם Spark כדי לקרוא נתונים ממסד הנתונים של Spanner ולכתוב נתונים בו באמצעות ספריית Spanner Java. מחבר Spanner תומך בקריאת טבלאות וגרפים של Spanner לתוך DataFrames ו-GraphFrames של Spark, ובכתיבת נתונים של DataFrame לתוך טבלאות Spanner.

עלויות

במסמך הזה משתמשים ברכיבים הבאים של Google Cloud, והשימוש בהם כרוך בתשלום:

  • Managed Service for Apache Spark
  • Spanner
  • Cloud Storage

כדי להעריך את ההוצאות בהתאם לתחזית השימוש שלכם, אתם יכולים להיעזר במחשבון העלויות.

משתמשים חדשים של Google Cloud ? יכול להיות שאתם זכאים לתקופת ניסיון בחינם.

לפני שמתחילים

  1. נכנסים לחשבון Google Cloud . אם אתם משתמשים חדשים ב- Google Cloud, צרו חשבון כדי שתוכלו להעריך את הביצועים של המוצרים שלנו בתרחישים מהעולם האמיתי. לקוחות חדשים מקבלים בחינם גם קרדיט בשווי 300$ להרצה, לבדיקה ולפריסה של עומסי העבודה.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Spanner, Managed Service for Apache Spark, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Spanner, Managed Service for Apache Spark, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  8. הקצאת התפקידים הנדרשים.
  9. הגדרת אשכול Managed Service for Apache Spark.
  10. הגדרת מכונת Spanner עם טבלת מסד נתונים של Singers

מתן התפקידים הנדרשים

כדי להריץ את הדוגמאות בדף הזה, צריך תפקידים מסוימים ב-IAM. יכול להיות שהתפקידים האלה כבר הוקצו, בהתאם למדיניות הארגון. כדי לבדוק את התפקידים שהוקצו, ראו האם צריך להקצות תפקידים?.

כדי לקרוא הסבר על מתן תפקידים, קראו איך מנהלים את הגישה ברמת הפרויקט, התיקייה והארגון.

כדי לוודא שלחשבון השירות שמוגדר כברירת מחדל ב-Compute Engine יש את ההרשאות שנדרשות ליצירת אשכול של Managed Service for Apache Spark, צריך לבקש מהאדמין להקצות לחשבון השירות שמוגדר כברירת מחדל ב-Compute Engine את תפקידי ה-IAM הבאים בפרויקט:

הגדרת אשכול של Managed Service for Apache Spark

יוצרים אשכול Managed Service for Apache Spark או משתמשים באשכול Managed Service for Apache Spark קיים שנוצר באמצעות תמונה של Managed Service for Apache Spark בגרסה 2.1 ואילך. אם האשכול נוצר באמצעות תמונה בגרסה 2.0 או בגרסה קודמת, הוא צריך להיות נוצר עם המאפיין scope שהוגדר להיקף cloud-platform.

הגדרת מכונת Spanner עם טבלת מסד נתונים של Singers

יוצרים מופע Spanner עם מסד נתונים שמכיל טבלה בשם Singers. שימו לב למזהה המכונה ולמזהה מסד הנתונים ב-Spanner.

שימוש במחבר Spanner עם Spark

המחבר של Spanner זמין לגרסאות Spark‏ 3.1+. מציינים את גרסת המחבר כחלק מהמפרט של קובץ ה-JAR של המחבר ל-Cloud Storage כששולחים משימה לאשכול Managed Service for Apache Spark.

דוגמה: שליחת משימת Spark באמצעות gcloud CLI עם מחבר Spanner.

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \
    ... [other job submission flags]
  

מחליפים את מה שכתוב בשדות הבאים:

CONNECTOR_VERSION: גרסת מחבר Spanner. בוחרים את הגרסה של מחבר Spanner מתוך רשימת הגרסאות במאגר GoogleCloudDataproc/spark-spanner-connector ב-GitHub.

קריאת טבלאות Spanner

אתם יכולים להשתמש ב-Python או ב-Scala כדי לקרוא נתונים מטבלת Spanner לתוך Spark Dataframe באמצעות Spark data source API.

PySpark

אתם יכולים להריץ את קוד PySpark לדוגמה שמופיע בקטע הזה באשכול שלכם על ידי שליחת המשימה אל Managed Service for Apache Spark או על ידי הרצת המשימה מ-spark-submit REPL בצומת הראשי של האשכול.

משימה של Managed Service for Apache Spark

  1. יוצרים קובץ singers.py באמצעות עורך טקסט מקומי או ב-Cloud Shell באמצעות עורך הטקסט vi,‏ vim או nano שהותקן מראש.
    1. אחרי שממלאים את משתני ה-placeholder, מדביקים את הקוד הבא בקובץ singers.py. הערה: התכונה Data Boost של Spanner מופעלת, וההשפעה שלה על מופע Spanner הראשי היא כמעט אפסית.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      מחליפים את מה שכתוב בשדות הבאים:

      1. PROJECT_ID: מזהה הפרויקט ב- Google Cloud . מזהי הפרויקטים מופיעים בקטע Project info בלוח הבקרה של מסוף Google Cloud .
      2. INSTANCE_ID,‏ DATABASE_ID ו-TABLE_NAME : ראו הגדרה של מופע Spanner עם טבלת מסד נתונים Singers.
    2. שומרים את קובץ ה-singers.py.
  2. שליחת העבודה אל Managed Service for Apache Spark באמצעות Google Cloud המסוף, ה-CLI של gcloud או API בארכיטקטורת REST.

    דוגמה: שליחת משימה באמצעות ה-CLI של gcloud עם מחבר Spanner.

    gcloud dataproc jobs submit pyspark singers.py \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
          

    מחליפים את מה שכתוב בשדות הבאים:

    1. CLUSTER_NAME: השם של האשכול החדש.
    2. REGION: אזור זמין ב-Compute Engine להרצת עומס העבודה.
    3. CONNECTOR_VERSION: גרסת מחבר Spanner. בוחרים את הגרסה של מחבר Spanner מתוך רשימת הגרסאות במאגר GoogleCloudDataproc/spark-spanner-connector ב-GitHub.

משימת spark-submit

  1. מתחברים לצומת הראשי של אשכול Managed Service for Apache Spark באמצעות SSH.
    1. נכנסים לדף Clusters של Managed Service for Apache Spark במסוף Google Cloud ולוחצים על שם האשכול.
    2. בדף פרטי האשכול, בוחרים בכרטיסייה 'מכונות וירטואליות'. לאחר מכן לוחצים על SSH משמאל לשם של מאסטר האשכולות.
      צילום מסך של הדף Dataproc Cluster details במסוף Google Cloud , שבו מוצג הלחצן SSH שמשמש להתחברות לצומת הראשי של האשכול.

      חלון דפדפן ייפתח בספריית הבית שלכם בצומת הראשי.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. יוצרים קובץ singers.py בצומת הראשי באמצעות עורך הטקסט vi,‏ vim או nano שהותקן מראש.
    1. מדביקים את הקוד הבא בקובץ singers.py אחרי שממלאים את משתני ה-placeholder בקובץ singers.py. שימו לב שהתכונה Data Boost של Spanner מופעלת, וההשפעה שלה על מופע Spanner הראשי היא כמעט אפסית.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      מחליפים את מה שכתוב בשדות הבאים:

      1. PROJECT_ID: מזהה הפרויקט ב- Google Cloud . מזהי הפרויקטים מופיעים בקטע Project info בלוח הבקרה של מסוף Google Cloud .
      2. INSTANCE_ID,‏ DATABASE_ID ו-TABLE_NAME : ראו הגדרה של מופע Spanner עם טבלת מסד נתונים Singers.
    2. שומרים את קובץ ה-singers.py.
  3. מריצים את הפקודה singers.py עם spark-submit כדי ליצור את טבלת Spanner Singers.
    spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      

    מחליפים את מה שכתוב בשדות הבאים:

    1. CONNECTOR_VERSION: גרסת מחבר Spanner. בוחרים את הגרסה של מחבר Spanner מתוך רשימת הגרסאות במאגר GoogleCloudDataproc/spark-spanner-connector ב-GitHub.

    הפלט שיתקבל:

    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
    only showing top 20 rows
    

Scala

כדי להריץ את קוד Scala לדוגמה באשכול:

  1. מתחברים לצומת הראשי של אשכול Managed Service for Apache Spark באמצעות SSH.
    1. נכנסים לדף Clusters של Managed Service for Apache Spark במסוף Google Cloud ולוחצים על שם האשכול.
    2. בדף פרטי האשכול, בוחרים בכרטיסייה 'מכונות וירטואליות'. לאחר מכן לוחצים על SSH משמאל לשם של מאסטר האשכולות. הדף Dataproc Cluster details במסוף Google Cloud .

      חלון דפדפן ייפתח בספריית הבית שלכם בצומת הראשי.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. יוצרים קובץ singers.scala בצומת הראשי באמצעות עורך הטקסט vi,‏ vim או nano שהותקן מראש.
    1. מדביקים את הקוד הבא בקובץ singers.scala. שימו לב שהתכונה Data Boost של Spanner מופעלת, וההשפעה שלה על מופע Spanner הראשי היא כמעט אפסית.
      object singers {
        def main(): Unit = {
          /*
           * Uncomment (use the following code) if you are not running in spark-shell.
           *
          import org.apache.spark.sql.SparkSession
          val spark = SparkSession.builder()
            .appName("spark-spanner-demo")
            .getOrCreate()
          */
      
          // Load data in from Spanner. See
          // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
          // for option information.
          val singersDF =
            (spark.read.format("cloud-spanner")
              .option("projectId", "PROJECT_ID")
              .option("instanceId", "INSTANCE_ID")
              .option("databaseId", "DATABASE_ID")
              .option("table", "TABLE_NAME")
              .option("enableDataBoost", true)
              .load()
              .cache())
      
          singersDF.createOrReplaceTempView("Singers")
      
          // Load the Singers table.
          val result = spark.sql("SELECT * FROM Singers")
          result.show()
          result.printSchema()
        }
      }
        

      מחליפים את מה שכתוב בשדות הבאים:

      1. PROJECT_ID: מזהה הפרויקט ב- Google Cloud . מזהי הפרויקטים מופיעים בקטע Project info בלוח הבקרה של מסוף Google Cloud .
      2. INSTANCE_ID,‏ DATABASE_ID ו-TABLE_NAME : ראו הגדרה של מופע Spanner עם טבלת מסד נתונים Singers.
    2. שומרים את קובץ ה-singers.scala.
  3. מפעילים את spark-shell REPL.
    $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
    

    מחליפים את מה שכתוב בשדות הבאים:

    CONNECTOR_VERSION: גרסת מחבר Spanner. בוחרים את הגרסה של מחבר Spanner מתוך רשימת הגרסאות במאגר GoogleCloudDataproc/spark-spanner-connector ב-GitHub.

  4. מריצים את הפקודה singers.scala עם הפקודה :load singers.scala כדי ליצור את טבלת Spanner‏ Singers. בדוגמה הבאה מוצג פלט של רשימת הזמרים.
    > :load singers.scala
    Loading singers.scala...
    defined object singers
    > singers.main()
    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
      

קריאת גרפים של Spanner

מחבר Spanner תומך בייצוא הגרף לDataFrames נפרדים של צמתים וקצוות, וגם בייצוא ישירות ל-GraphFrames.

בדוגמה הבאה מיוצאת טבלה מ-Spanner אל GraphFrame. הוא משתמש במחלקה SpannerGraphConnectorPython, שכלולה ב-jar של מחבר Spanner, כדי לקרוא את Spanner Graph.

from pyspark.sql import SparkSession

connector_jar = "gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar"

spark = (SparkSession.builder.appName("spanner-graphframe-graphx-example")
         .config("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12")
         .config("spark.jars", connector_jar)
         .getOrCreate())
spark.sparkContext.addPyFile(connector_jar)

from spannergraph import SpannerGraphConnector

connector = (SpannerGraphConnector()
             .spark(spark)
             .project("PROJECT_ID")
             .instance("INSTANCE_ID")
             .database("DATABASE_ID")
             .graph("GRAPH_ID"))

g = connector.load_graph()
g.vertices.show()
g.edges.show()

מחליפים את מה שכתוב בשדות הבאים:

  • CONNECTOR_VERSION: גרסת מחבר Spanner. בוחרים את הגרסה של מחבר Spanner מתוך רשימת הגרסאות במאגר GitHub‏ GoogleCloudDataproc/spark-spanner-connector.
  • PROJECT_ID: מזהה הפרויקט ב- Google Cloud . מזהי הפרויקטים מופיעים בקטע Project info בלוח הבקרה של מסוף Google Cloud .
  • INSTANCE_ID, DATABASE_ID ו-TABLE_NAME מוסיפים את מזהי המופע, מסד הנתונים והגרף.

כדי לייצא צומת וקצה DataFrames במקום GraphFrames, משתמשים ב-load_dfs במקום:

df_vertices, df_edges, df_id_map = connector.load_dfs()

כתיבה של טבלאות Spanner

מחבר Spanner תומך בכתיבת Spark Dataframe לטבלת Spanner באמצעות Spark data source API.

דוגמה לכתיבת DataFrame לטבלת Spanner

מאכלסים את המשתנים לפני ששומרים ומריצים את הקוד.

"""Spanner PySpark write example."""
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Spanner Write App').getOrCreate()

columns = ['id', 'name', 'email']
data = [(1, 'John Doe', 'john.doe@example.com'), (2, 'Jane Doe', 'jane.doe@example.com')]
df = spark.createDataFrame(data, columns)

df.write.format('cloud-spanner') \
    .option("projectId", "PROJECT_ID")
    .option("instanceId", "INSTANCE_ID")
    .option("databaseId", "DATABASE_ID")
    .option("table", "TABLE_NAME")
    .mode("append") \
    .save()

מחליפים את הפרטים הבאים.

  • PROJECT_ID: מזהה הפרויקט ב- Google Cloud . מזהי הפרויקטים מופיעים בקטע Project info בלוח הבקרה של מסוף Google Cloud .
  • INSTANCE_ID, DATABASE_ID ו-TABLE_NAME מוסיפים את מזהי המופע, מסד הנתונים והטבלה.

הסרת המשאבים

כדי למנוע חיובים שוטפים בחשבון Google Cloud , אפשר להפסיק או למחוק את אשכול Managed Service for Apache Spark ולמחוק את מופע Spanner.

המאמרים הבאים