בדף הזה מוסבר איך ליצור אשכול Managed Service for Apache Spark שמשתמש במחבר Spark Spanner כדי לקרוא נתונים מ-Spanner ולכתוב נתונים ב-Spanner באמצעות Apache Spark.
המחבר של Spanner פועל עם Spark כדי לקרוא נתונים ממסד הנתונים של Spanner ולכתוב נתונים בו באמצעות ספריית Spanner Java. מחבר Spanner תומך בקריאת טבלאות וגרפים של Spanner לתוך DataFrames ו-GraphFrames של Spark, ובכתיבת נתונים של DataFrame לתוך טבלאות Spanner.
עלויות
במסמך הזה משתמשים ברכיבים הבאים של Google Cloud, והשימוש בהם כרוך בתשלום:
- Managed Service for Apache Spark
- Spanner
- Cloud Storage
כדי להעריך את ההוצאות בהתאם לתחזית השימוש שלכם, אתם יכולים להיעזר במחשבון העלויות.
לפני שמתחילים
- נכנסים לחשבון Google Cloud . אם אתם משתמשים חדשים ב- Google Cloud, צרו חשבון כדי שתוכלו להעריך את הביצועים של המוצרים שלנו בתרחישים מהעולם האמיתי. לקוחות חדשים מקבלים בחינם גם קרדיט בשווי 300$ להרצה, לבדיקה ולפריסה של עומסי העבודה.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Spanner, Managed Service for Apache Spark, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Spanner, Managed Service for Apache Spark, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.- הקצאת התפקידים הנדרשים.
- הגדרת אשכול Managed Service for Apache Spark.
- הגדרת מכונת Spanner עם טבלת מסד נתונים של Singers
מתן התפקידים הנדרשים
כדי להריץ את הדוגמאות בדף הזה, צריך תפקידים מסוימים ב-IAM. יכול להיות שהתפקידים האלה כבר הוקצו, בהתאם למדיניות הארגון. כדי לבדוק את התפקידים שהוקצו, ראו האם צריך להקצות תפקידים?.
כדי לקרוא הסבר על מתן תפקידים, קראו איך מנהלים את הגישה ברמת הפרויקט, התיקייה והארגון.
כדי לוודא שלחשבון השירות שמוגדר כברירת מחדל ב-Compute Engine יש את ההרשאות שנדרשות ליצירת אשכול של Managed Service for Apache Spark, צריך לבקש מהאדמין להקצות לחשבון השירות שמוגדר כברירת מחדל ב-Compute Engine את תפקידי ה-IAM הבאים בפרויקט:
- Dataproc Worker (
roles/dataproc.worker) - משתמש במסד נתונים ב-Cloud Spanner (
roles/spanner.databaseUser) - Cloud Spanner Database Reader with DataBoost (
roles/spanner.databaseReaderWithDataBoost)
הגדרת אשכול של Managed Service for Apache Spark
יוצרים אשכול Managed Service for Apache Spark או משתמשים באשכול Managed Service for Apache Spark קיים שנוצר באמצעות תמונה של Managed Service for Apache Spark בגרסה 2.1 ואילך. אם האשכול נוצר באמצעות תמונה בגרסה 2.0 או בגרסה קודמת, הוא צריך להיות נוצר עם המאפיין scope שהוגדר להיקף cloud-platform.
הגדרת מכונת Spanner עם טבלת מסד נתונים של Singers
יוצרים מופע Spanner עם מסד נתונים שמכיל טבלה בשם Singers. שימו לב למזהה המכונה ולמזהה מסד הנתונים ב-Spanner.
שימוש במחבר Spanner עם Spark
המחבר של Spanner זמין לגרסאות Spark 3.1+. מציינים את גרסת המחבר כחלק מהמפרט של קובץ ה-JAR של המחבר ל-Cloud Storage כששולחים משימה לאשכול Managed Service for Apache Spark.
דוגמה: שליחת משימת Spark באמצעות gcloud CLI עם מחבר Spanner.
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \ ... [other job submission flags]
מחליפים את מה שכתוב בשדות הבאים:
CONNECTOR_VERSION: גרסת מחבר Spanner.
בוחרים את הגרסה של מחבר Spanner מתוך רשימת הגרסאות במאגר GoogleCloudDataproc/spark-spanner-connector ב-GitHub.
קריאת טבלאות Spanner
אתם יכולים להשתמש ב-Python או ב-Scala כדי לקרוא נתונים מטבלת Spanner לתוך Spark Dataframe באמצעות Spark data source API.
PySpark
אתם יכולים להריץ את קוד PySpark לדוגמה שמופיע בקטע הזה באשכול שלכם על ידי שליחת המשימה אל Managed Service for Apache Spark או על ידי הרצת המשימה מ-spark-submit REPL בצומת הראשי של האשכול.
משימה של Managed Service for Apache Spark
- יוצרים קובץ
singers.pyבאמצעות עורך טקסט מקומי או ב-Cloud Shell באמצעות עורך הטקסטvi,vimאוnanoשהותקן מראש. - אחרי שממלאים את משתני ה-placeholder, מדביקים את הקוד הבא בקובץ
singers.py. הערה: התכונה Data Boost של Spanner מופעלת, וההשפעה שלה על מופע Spanner הראשי היא כמעט אפסית.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
מחליפים את מה שכתוב בשדות הבאים:
- PROJECT_ID: מזהה הפרויקט ב- Google Cloud . מזהי הפרויקטים מופיעים בקטע Project info בלוח הבקרה של מסוף Google Cloud .
- INSTANCE_ID, DATABASE_ID ו-TABLE_NAME : ראו הגדרה של מופע Spanner עם טבלת מסד נתונים
Singers.
- שומרים את קובץ ה-
singers.py. - שליחת העבודה אל Managed Service for Apache Spark באמצעות Google Cloud המסוף, ה-CLI של gcloud או API בארכיטקטורת REST.
דוגמה: שליחת משימה באמצעות ה-CLI של gcloud עם מחבר Spanner.
gcloud dataproc jobs submit pyspark singers.py \ --cluster=CLUSTER_NAME \ --region=REGION \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jarמחליפים את מה שכתוב בשדות הבאים:
- CLUSTER_NAME: השם של האשכול החדש.
- REGION: אזור זמין ב-Compute Engine להרצת עומס העבודה.
- CONNECTOR_VERSION: גרסת מחבר Spanner.
בוחרים את הגרסה של מחבר Spanner מתוך רשימת הגרסאות במאגר
GoogleCloudDataproc/spark-spanner-connectorב-GitHub.
משימת spark-submit
- מתחברים לצומת הראשי של אשכול Managed Service for Apache Spark באמצעות SSH.
- נכנסים לדף Clusters של Managed Service for Apache Spark במסוף Google Cloud ולוחצים על שם האשכול.
- בדף פרטי האשכול, בוחרים בכרטיסייה 'מכונות וירטואליות'. לאחר מכן לוחצים על
SSHמשמאל לשם של מאסטר האשכולות.
חלון דפדפן ייפתח בספריית הבית שלכם בצומת הראשי.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- יוצרים קובץ
singers.pyבצומת הראשי באמצעות עורך הטקסטvi,vimאוnanoשהותקן מראש.- מדביקים את הקוד הבא בקובץ
singers.pyאחרי שממלאים את משתני ה-placeholder בקובץsingers.py. שימו לב שהתכונה Data Boost של Spanner מופעלת, וההשפעה שלה על מופע Spanner הראשי היא כמעט אפסית.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
מחליפים את מה שכתוב בשדות הבאים:
- PROJECT_ID: מזהה הפרויקט ב- Google Cloud . מזהי הפרויקטים מופיעים בקטע Project info בלוח הבקרה של מסוף Google Cloud .
- INSTANCE_ID, DATABASE_ID ו-TABLE_NAME : ראו הגדרה של מופע Spanner עם טבלת מסד נתונים
Singers.
- שומרים את קובץ ה-
singers.py.
- מדביקים את הקוד הבא בקובץ
- מריצים את הפקודה
singers.pyעםspark-submitכדי ליצור את טבלת SpannerSingers.spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
מחליפים את מה שכתוב בשדות הבאים:
- CONNECTOR_VERSION: גרסת מחבר Spanner.
בוחרים את הגרסה של מחבר Spanner מתוך רשימת הגרסאות במאגר
GoogleCloudDataproc/spark-spanner-connectorב-GitHub.
הפלט שיתקבל:
... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true) only showing top 20 rows
- CONNECTOR_VERSION: גרסת מחבר Spanner.
בוחרים את הגרסה של מחבר Spanner מתוך רשימת הגרסאות במאגר
Scala
כדי להריץ את קוד Scala לדוגמה באשכול:
- מתחברים לצומת הראשי של אשכול Managed Service for Apache Spark באמצעות SSH.
- נכנסים לדף Clusters של Managed Service for Apache Spark במסוף Google Cloud ולוחצים על שם האשכול.
- בדף פרטי האשכול, בוחרים בכרטיסייה 'מכונות וירטואליות'. לאחר מכן לוחצים על
SSHמשמאל לשם של מאסטר האשכולות.
חלון דפדפן ייפתח בספריית הבית שלכם בצומת הראשי.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- יוצרים קובץ
singers.scalaבצומת הראשי באמצעות עורך הטקסטvi,vimאוnanoשהותקן מראש.- מדביקים את הקוד הבא בקובץ
singers.scala. שימו לב שהתכונה Data Boost של Spanner מופעלת, וההשפעה שלה על מופע Spanner הראשי היא כמעט אפסית.object singers { def main(): Unit = { /* * Uncomment (use the following code) if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-spanner-demo") .getOrCreate() */ // Load data in from Spanner. See // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties // for option information. val singersDF = (spark.read.format("cloud-spanner") .option("projectId", "PROJECT_ID") .option("instanceId", "INSTANCE_ID") .option("databaseId", "DATABASE_ID") .option("table", "TABLE_NAME") .option("enableDataBoost", true) .load() .cache()) singersDF.createOrReplaceTempView("Singers") // Load the Singers table. val result = spark.sql("SELECT * FROM Singers") result.show() result.printSchema() } }
מחליפים את מה שכתוב בשדות הבאים:
- PROJECT_ID: מזהה הפרויקט ב- Google Cloud . מזהי הפרויקטים מופיעים בקטע Project info בלוח הבקרה של מסוף Google Cloud .
- INSTANCE_ID, DATABASE_ID ו-TABLE_NAME : ראו הגדרה של מופע Spanner עם טבלת מסד נתונים
Singers.
- שומרים את קובץ ה-
singers.scala.
- מדביקים את הקוד הבא בקובץ
- מפעילים את
spark-shellREPL.$ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
מחליפים את מה שכתוב בשדות הבאים:
CONNECTOR_VERSION: גרסת מחבר Spanner. בוחרים את הגרסה של מחבר Spanner מתוך רשימת הגרסאות במאגר
GoogleCloudDataproc/spark-spanner-connectorב-GitHub. - מריצים את הפקודה
singers.scalaעם הפקודה:load singers.scalaכדי ליצור את טבלת SpannerSingers. בדוגמה הבאה מוצג פלט של רשימת הזמרים.> :load singers.scala Loading singers.scala... defined object singers > singers.main() ... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true)
קריאת גרפים של Spanner
מחבר Spanner תומך בייצוא הגרף לDataFrames נפרדים של צמתים וקצוות, וגם בייצוא ישירות ל-GraphFrames.
בדוגמה הבאה מיוצאת טבלה מ-Spanner אל GraphFrame. הוא משתמש במחלקה SpannerGraphConnectorPython, שכלולה ב-jar של מחבר Spanner, כדי לקרוא את Spanner Graph.
from pyspark.sql import SparkSession connector_jar = "gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar" spark = (SparkSession.builder.appName("spanner-graphframe-graphx-example") .config("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12") .config("spark.jars", connector_jar) .getOrCreate()) spark.sparkContext.addPyFile(connector_jar) from spannergraph import SpannerGraphConnector connector = (SpannerGraphConnector() .spark(spark) .project("PROJECT_ID") .instance("INSTANCE_ID") .database("DATABASE_ID") .graph("GRAPH_ID")) g = connector.load_graph() g.vertices.show() g.edges.show()
מחליפים את מה שכתוב בשדות הבאים:
- CONNECTOR_VERSION: גרסת מחבר Spanner.
בוחרים את הגרסה של מחבר Spanner מתוך רשימת הגרסאות במאגר GitHub
GoogleCloudDataproc/spark-spanner-connector. - PROJECT_ID: מזהה הפרויקט ב- Google Cloud . מזהי הפרויקטים מופיעים בקטע Project info בלוח הבקרה של מסוף Google Cloud .
- INSTANCE_ID, DATABASE_ID ו-TABLE_NAME מוסיפים את מזהי המופע, מסד הנתונים והגרף.
כדי לייצא צומת וקצה DataFrames במקום GraphFrames, משתמשים ב-load_dfs
במקום:
df_vertices, df_edges, df_id_map = connector.load_dfs()
כתיבה של טבלאות Spanner
מחבר Spanner תומך בכתיבת Spark Dataframe לטבלת Spanner באמצעות Spark data source API.
דוגמה לכתיבת DataFrame לטבלת Spanner
מאכלסים את המשתנים לפני ששומרים ומריצים את הקוד.
"""Spanner PySpark write example.""" from pyspark.sql import SparkSession spark = SparkSession.builder.appName('Spanner Write App').getOrCreate() columns = ['id', 'name', 'email'] data = [(1, 'John Doe', 'john.doe@example.com'), (2, 'Jane Doe', 'jane.doe@example.com')] df = spark.createDataFrame(data, columns) df.write.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") .option("instanceId", "INSTANCE_ID") .option("databaseId", "DATABASE_ID") .option("table", "TABLE_NAME") .mode("append") \ .save()
מחליפים את הפרטים הבאים.
- PROJECT_ID: מזהה הפרויקט ב- Google Cloud . מזהי הפרויקטים מופיעים בקטע Project info בלוח הבקרה של מסוף Google Cloud .
- INSTANCE_ID, DATABASE_ID ו-TABLE_NAME מוסיפים את מזהי המופע, מסד הנתונים והטבלה.
הסרת המשאבים
כדי למנוע חיובים שוטפים בחשבון Google Cloud , אפשר להפסיק או למחוק את אשכול Managed Service for Apache Spark ולמחוק את מופע Spanner.
המאמרים הבאים
- אפשר לעיין ב
pyspark.sql.DataFrameדוגמאות. - למידע על תמיכה בשפות ב-Spark DataFrame, אפשר לעיין במאמרים הבאים:
- אפשר לעיין במאגר Spark Spanner Connector ב-GitHub.
- טיפים לשיפור ביצועים של משימות Spark