שימוש במחבר Bigtable Spark
מחבר Bigtable Spark מאפשר לכם לקרוא ולכתוב נתונים אל Bigtable וממנו. אפשר לקרוא נתונים מאפליקציית Spark באמצעות Spark SQL ו-DataFrames. הפעולות הבאות ב-Bigtable נתמכות באמצעות מחבר Bigtable Spark:
- כתיבת נתונים
- קריאת נתונים
- יצירת טבלה חדשה
במאמר הזה מוסבר איך להמיר טבלה של Spark SQL DataFrames לטבלה של Bigtable, ואז לקמפל וליצור קובץ JAR כדי לשלוח עבודת Spark.
סטטוס התמיכה ב-Spark וב-Scala
מחבר Bigtable Spark תומך בגרסאות הבאות של Scala:
מחבר Bigtable Spark תומך בגרסאות הבאות של Spark:
מחבר Bigtable Spark תומך בגרסאות הבאות של Managed Service for Apache Spark:
- 1.5 image-version cluster
- 2.0 image-version cluster
- 2.1 image-version cluster
- 2.2 image-version cluster
- Dataproc Serverless runtime version 1.0
חישוב העלויות
אם תחליטו להשתמש באחד מהרכיבים הבאים של Google Cloud, השימוש במשאבים יחויב:
- Bigtable (לא נגבה תשלום על שימוש באמולטור Bigtable)
- Managed Service for Apache Spark
- Cloud Storage
התמחור של Managed Service for Apache Spark חל על השימוש ב-Managed Service for Apache Spark באשכולות של Compute Engine. התמחור של Managed Service for Apache Spark Serverless חל על עומסי עבודה ועל סשנים שמופעלים ב-Managed Service for Apache Spark Serverless for Spark.
כדי ליצור הערכת עלויות בהתאם לשימוש החזוי, אתם יכולים להשתמש במחשבון התמחור.
לפני שמתחילים
לפני שמשתמשים במחבר Bigtable Spark, צריך לוודא שמתקיימות הדרישות המוקדמות הבאות.
התפקידים הנדרשים
כדי לקבל את ההרשאות שדרושות לשימוש ב-Bigtable Spark connector, צריך לבקש מהאדמין להקצות לכם את תפקידי ה-IAM הבאים בפרויקט:
-
אדמין של Bigtable (
roles/bigtable.admin)(אופציונלי): מאפשר לקרוא או לכתוב נתונים וליצור טבלה חדשה. -
משתמש Bigtable (
roles/bigtable.user): מאפשר לקרוא או לכתוב נתונים, אבל לא מאפשר ליצור טבלה חדשה.
להסבר על מתן תפקידים, ראו איך מנהלים את הגישה ברמת הפרויקט, התיקייה והארגון.
יכול להיות שאפשר לקבל את ההרשאות הנדרשות גם באמצעות תפקידים בהתאמה אישית או תפקידים מוגדרים מראש.
אם אתם משתמשים ב-Managed Service for Apache Spark או ב-Cloud Storage, יכול להיות שתידרשו הרשאות נוספות. מידע נוסף זמין במאמרים הרשאות בשירות המנוהל ל-Apache Spark והרשאות ב-Cloud Storage.
הגדרת Spark
בנוסף ליצירת מופע Bigtable, צריך גם להגדיר את מופע Spark. אפשר לעשות את זה באופן מקומי או לבחור באחת מהאפשרויות הבאות כדי להשתמש ב-Spark עם Managed Service for Apache Spark:
- אשכול Managed Service for Apache Spark
- Managed Service for Apache Spark Serverless
מידע נוסף על בחירה בין אשכול Managed Service for Apache Spark לבין אפשרות ללא שרת זמין במאמר Managed Service for Apache Spark Serverless for Spark בהשוואה ל-Managed Service for Apache Spark ב-Compute Engine .
הורדת קובץ ה-JAR של המחבר
אפשר למצוא את קוד המקור של מחבר Bigtable Spark עם דוגמאות במאגר GitHub של מחבר Bigtable Spark.
בהתאם להגדרה של Spark, אפשר לגשת לקובץ ה-JAR באופן הבא:
אם אתם מריצים את PySpark באופן מקומי, אתם צריכים להוריד את קובץ ה-JAR של המחבר ממיקום Cloud Storage
gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar.מחליפים את
SCALA_VERSIONב-2.12או ב-2.13, שהן הגרסאות היחידות של Scala שנתמכות, ומחליפים אתCONNECTOR_VERSIONבגרסת המחבר שרוצים להשתמש בה.באפשרות Managed Service for Apache Spark cluster או באפשרות Serverless, משתמשים בקובץ ה-JAR העדכני כארטיפקט שאפשר להוסיף לאפליקציות Scala או Java Spark. מידע נוסף על השימוש בקובץ JAR כארטיפקט זמין במאמר ניהול יחסי תלות.
אם שולחים את משימת PySpark אל Managed Service for Apache Spark, משתמשים בדגל
gcloud dataproc jobs submit pyspark --jarsכדי להגדיר את ה-URI למיקום של קובץ ה-JAR ב-Cloud Storage – לדוגמהgs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar.
קביעת סוג המחשוב
למשימות קריאה עם תפוקה גבוהה, כמו אלה שמבוצעות על ידי אפליקציות Spark, אפשר להשתמש ב-Data Boost serverless compute כדי למנוע השפעה על אשכולות שרת האפליקציות. כדי להשתמש ב-Data Boost, אפליקציית Spark צריכה להשתמש בגרסה 1.1.0 או בגרסה מתקדמת יותר של מחבר Spark.
כדי להשתמש ב-Data Boost, צריך ליצור פרופיל אפליקציה של Data Boost ואז לספק את מזהה פרופיל האפליקציה עבור האפשרות spark.bigtable.app_profile.id Spark כשמוסיפים את ההגדרה של Bigtable לאפליקציית Spark. אם כבר יצרתם פרופיל אפליקציה למשימות קריאה של Spark ואתם רוצים להמשיך להשתמש בו בלי לשנות את קוד האפליקציה, אתם יכולים להמיר את פרופיל האפליקציה לפרופיל אפליקציה של Data Boost. מידע נוסף זמין במאמר המרת פרופיל של אפליקציה.
מידע נוסף זמין במאמר סקירה כללית על Bigtable Data Boost.
לגבי משימות שכוללות קריאה וכתיבה, אפשר להשתמש בצמתי האשכול של המופע לחישובים על ידי ציון פרופיל אפליקציה רגיל בבקשה.
מגדירים או יוצרים פרופיל אפליקציה לשימוש
אם לא מציינים מזהה של פרופיל אפליקציה, המחבר משתמש בפרופיל האפליקציה שמוגדר כברירת מחדל.
מומלץ להשתמש בפרופיל אפליקציה ייחודי לכל אפליקציה שמפעילים, כולל אפליקציית Spark. מידע נוסף על הסוגים וההגדרות של פרופילים של אפליקציות זמין במאמר סקירה כללית על פרופילים של אפליקציות. הוראות מפורטות מופיעות במאמר יצירה והגדרה של פרופילים לאפליקציות.
הוספת הגדרת Bigtable לאפליקציית Spark
באפליקציית Spark, מוסיפים את האפשרויות של Spark שמאפשרות אינטראקציה עם Bigtable.
אפשרויות Spark נתמכות
משתמשים באפשרויות של Spark שזמינות כחלק מחבילת com.google.cloud.spark.bigtable.
| שם האפשרות | חובה | ערך ברירת המחדל | משמעות |
|---|---|---|---|
spark.bigtable.project.id |
כן | לא רלוונטי | מגדירים את מזהה הפרויקט ב-Bigtable. |
spark.bigtable.instance.id |
כן | לא רלוונטי | מגדירים את מזהה המכונה של Bigtable. |
catalog |
כן | לא רלוונטי | מגדירים את פורמט ה-JSON שמציין את פורמט ההמרה בין סכימת ה-SQL של DataFrame לבין סכימת הטבלה של Bigtable. מידע נוסף זמין במאמר בנושא יצירת מטא-נתונים של טבלה בפורמט JSON. |
spark.bigtable.app_profile.id |
לא | default |
מגדירים את מזהה פרופיל האפליקציה של Bigtable. |
spark.bigtable.write.timestamp.milliseconds |
לא | השעה הנוכחית במערכת | מגדירים את חותמת הזמן במילישניות שבה רוצים להשתמש כשכותבים DataFrame ל-Bigtable. שימו לב: מכיוון שכל השורות ב-DataFrame משתמשות באותה חותמת זמן, שורות עם אותה עמודה של מפתח שורה ב-DataFrame נשמרות כגרסה יחידה ב-Bigtable כי יש להן אותה חותמת זמן. |
spark.bigtable.create.new.table |
לא | false |
מגדירים את הערך true כדי ליצור טבלה חדשה לפני הכתיבה ל-Bigtable. |
spark.bigtable.read.timerange.start.milliseconds או spark.bigtable.read.timerange.end.milliseconds |
לא | לא רלוונטי | מגדירים חותמות זמן (באלפיות שנייה מאז ראשית זמן יוניקס) כדי לסנן תאים עם תאריך התחלה ותאריך סיום ספציפיים, בהתאמה. |
spark.bigtable.push.down.row.key.filters |
לא | true |
הגדרה לערך true כדי לאפשר סינון פשוט של מפתחות שורות בצד השרת. סינון של מפתחות שורות מורכבים מיושם בצד הלקוח.מידע נוסף זמין במאמר קריאת שורה ספציפית ב-DataFrame באמצעות מסנן. |
spark.bigtable.read.rows.attempt.timeout.milliseconds |
לא | 30 דקות | מגדירים את משך הזמן הקצוב לתפוגה לניסיון קריאת שורות שמתאים למחיצה אחת של DataFrame בלקוח Bigtable ל-Java. |
spark.bigtable.read.rows.total.timeout.milliseconds |
לא | 12 שעות | מגדירים את משך הזמן הקצוב הכולל לתפוגה לניסיון קריאת שורות שמתאים למחיצה אחת של DataFrame בלקוח Bigtable ל-Java. |
spark.bigtable.mutate.rows.attempt.timeout.milliseconds |
לא | דקה | מגדירים את משך הזמן הקצוב לתפוגה של ניסיון לשנות שורות שמתאים למחיצה אחת של DataFrame בלקוח Bigtable ל-Java. |
spark.bigtable.mutate.rows.total.timeout.milliseconds |
לא | 10 דקות | מגדירים את משך הזמן של total timeout לניסיון של שינוי שורות שמתאים למחיצה אחת של DataFrame בלקוח Bigtable ל-Java. |
spark.bigtable.batch.mutate.size |
לא | 100 |
מגדירים את מספר המוטציות בכל קבוצה. הערך המקסימלי שאפשר להגדיר הוא 100000. |
spark.bigtable.enable.batch_mutate.flow_control |
לא | false |
מגדירים את הערך true כדי להפעיל בקרה על זרימת נתונים לשינויים בקבוצות. |
יצירת מטא-נתונים של טבלה בפורמט JSON
צריך להמיר את פורמט הטבלה של Spark SQL DataFrames לטבלת Bigtable באמצעות מחרוזת בפורמט JSON. פורמט ה-JSON של המחרוזת הזו מאפשר להתאים את פורמט הנתונים ל-Bigtable. אפשר להעביר את פורמט ה-JSON בקוד האפליקציה באמצעות האפשרות .option("catalog", catalog_json_string).
לדוגמה, נניח שיש לכם טבלת DataFrame וטבלת Bigtable תואמת.
בדוגמה הזו, העמודות name ו-birthYear ב-DataFrame מקובצות יחד תחת קבוצת העמודות info, והשמות שלהן משתנים ל-name ו-birth_year, בהתאמה. באופן דומה, העמודה address מאוחסנת תחת קבוצת העמודות location עם אותו שם עמודה. העמודה id מ-DataFrame מומרת למפתח השורה ב-Bigtable.
למפתחות השורה אין שם עמודה ייעודי ב-Bigtable, ובדוגמה הזו, id_rowkey משמש רק כדי לציין למחבר שזו עמודת מפתח השורה. אתם יכולים להשתמש בכל שם לעמודה של מפתח השורה, אבל חשוב לוודא שאתם משתמשים באותו שם כשאתם מצהירים על השדה "rowkey":"column_name" בפורמט JSON.
| DataFrame | Bigtable table = t1 | |||||||
| עמודות | מפתח שורה | משפחות עמודות | ||||||
| מידע | מיקום | |||||||
| עמודות | עמודות | |||||||
| id | name | birthYear | address | id_rowkey | name | birth_year | address | |
הפורמט של קטלוג ב-JSON הוא:
"""
{
"table": {"name": "t1"},
"rowkey": "id_rowkey",
"columns": {
"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"},
"name": {"cf": "info", "col": "name", "type": "string"},
"birthYear": {"cf": "info", "col": "birth_year", "type": "long"},
"address": {"cf": "location", "col": "address", "type": "string"}
}
}
"""
המפתחות והערכים שמשמשים בפורמט JSON הם:
| מפתח קטלוג | ערך הקטלוג | פורמט JSON |
|---|---|---|
| טבלה | שם הטבלה ב-Bigtable. | "table":{"name":"t1"}אם הטבלה לא קיימת, משתמשים בפונקציה .option("spark.bigtable.create.new.table", "true") כדי ליצור טבלה. |
| rowkey | שם העמודה שתשמש כמפתח השורה ב-Bigtable. מוודאים ששם העמודה של DataFrame משמש כמפתח השורה – לדוגמה, id_rowkey. אפשר להשתמש גם במפתחות מורכבים כמפתחות שורה. לדוגמה, "rowkey":"name:address". הגישה הזו עלולה לגרום לכך שמפתחות השורות ידרשו סריקה מלאה של הטבלה לכל בקשות הקריאה. |
"rowkey":"id_rowkey", |
| עמודות | מיפוי של כל עמודה ב-DataFrame לקבוצת העמודות ("cf") ולשם העמודה ("col") המתאימים ב-Bigtable. שם העמודה יכול להיות שונה משם העמודה בטבלת DataFrame. סוגי הנתונים הנתמכים כוללים string, long ו-binary. |
"columns": {"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"}, "name": {"cf": "info", "col": "name", "type": "string"}, "birthYear": {"cf":"info", "col": "birth_year", "type": "long"}, "address": {"cf": "location", "col": "address", "type":"string"}}" בדוגמה הזו, id_rowkey הוא מפתח השורה, ו-info ו-location הם משפחות העמודות. |
סוגי נתונים נתמכים
המחבר תומך בשימוש בסוגים string, long ו-binary (מערך בייטים) בקטלוג. עד שנוסיף תמיכה בסוגים אחרים כמו intו-float, תוכלו להמיר באופן ידני סוגי נתונים כאלה למערכי בייטים (BinaryType ב-Spark SQL) לפני שתשתמשו במחבר כדי לכתוב אותם ב-Bigtable.
בנוסף, אפשר להשתמש ב-Avro כדי לבצע סריאליזציה של סוגים מורכבים, כמו ArrayType. מידע נוסף זמין במאמר סדרת נתונים מורכבים באמצעות Apache Avro.
כתיבה ל-Bigtable
משתמשים בפונקציה .write() ובאפשרויות הנתמכות כדי לכתוב את הנתונים ב-Bigtable.
Java
הקוד הבא ממאגר GitHub משתמש ב-Java וב-Maven כדי לכתוב ל-Bigtable.
String catalog = "{" +
"\"table\":{\"name\":\"" + tableName + "\"," +
"\"tableCoder\":\"PrimitiveType\"}," +
"\"rowkey\":\"wordCol\"," +
"\"columns\":{" +
"\"word\":{\"cf\":\"rowkey\", \"col\":\"wordCol\", \"type\":\"string\"}," +
"\"count\":{\"cf\":\"example_family\", \"col\":\"countCol\", \"type\":\"long\"}" +
"}}".replaceAll("\\s+", "");
…
private static void writeDataframeToBigtable(Dataset<Row> dataframe, String catalog,
String createNewTable) {
dataframe
.write()
.format("bigtable")
.option("catalog", catalog)
.option("spark.bigtable.project.id", projectId)
.option("spark.bigtable.instance.id", instanceId)
.option("spark.bigtable.create.new.table", createNewTable)
.save();
}
Python
הקוד הבא ממאגר GitHub משתמש ב-Python כדי לכתוב ל-Bigtable.
catalog = ''.join(("""{
"table":{"name":" """ + bigtable_table_name + """
", "tableCoder":"PrimitiveType"},
"rowkey":"wordCol",
"columns":{
"word":{"cf":"rowkey", "col":"wordCol", "type":"string"},
"count":{"cf":"example_family", "col":"countCol", "type":"long"}
}
}""").split())
…
input_data = spark.createDataFrame(data)
print('Created the DataFrame:')
input_data.show()
input_data.write \
.format('bigtable') \
.options(catalog=catalog) \
.option('spark.bigtable.project.id', bigtable_project_id) \
.option('spark.bigtable.instance.id', bigtable_instance_id) \
.option('spark.bigtable.create.new.table', create_new_table) \
.save()
print('DataFrame was written to Bigtable.')
…
קריאה מ-Bigtable
כדי לבדוק אם הטבלה יובאה ל-Bigtable, משתמשים בפונקציה .read().
Java
…
private static Dataset<Row> readDataframeFromBigtable(String catalog) {
Dataset<Row> dataframe = spark
.read()
.format("bigtable")
.option("catalog", catalog)
.option("spark.bigtable.project.id", projectId)
.option("spark.bigtable.instance.id", instanceId)
.load();
return dataframe;
}
Python
…
records = spark.read \
.format('bigtable') \
.option('spark.bigtable.project.id', bigtable_project_id) \
.option('spark.bigtable.instance.id', bigtable_instance_id) \
.options(catalog=catalog) \
.load()
print('Reading the DataFrame from Bigtable:')
records.show()
קומפילציה של הפרויקט
יוצרים את קובץ ה-JAR שמשמש להרצת משימה באחד מהמקרים הבאים: אשכול Managed Service for Apache Spark, Managed Service for Apache Spark Serverless או מופע Spark מקומי. אפשר לקמפל את קובץ ה-JAR באופן מקומי ואז להשתמש בו כדי לשלוח עבודה. הנתיב לקובץ ה-JAR המהודר מוגדר כמשתנה הסביבה PATH_TO_COMPILED_JAR כששולחים עבודה.
השלב הזה לא רלוונטי לאפליקציות PySpark.
ניהול יחסי התלות
מחבר Bigtable Spark תומך בכלי ניהול התלות הבאים:
קומפילציה של קובץ ה-JAR
Maven
מוסיפים את התלות
spark-bigtableלקובץ pom.xml.<dependencies> <dependency> <groupId>com.google.cloud.spark.bigtable</groupId> <artifactId>spark-bigtable_SCALA_VERSION</artifactId> <version>0.1.0</version> </dependency> </dependencies>מוסיפים את Maven Shade plugin לקובץ
pom.xmlכדי ליצור uber JAR:<plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins>מריצים את הפקודה
mvn clean installכדי ליצור קובץ JAR.
sbt
מוסיפים את התלות
spark-bigtableלקובץbuild.sbt:libraryDependencies += "com.google.cloud.spark.bigtable" % "spark-bigtable_SCALA_VERSION" % "0.1.0{""}}"
מוסיפים את התוסף
sbt-assemblyלקובץproject/plugins.sbtאוproject/assembly.sbtכדי ליצור קובץ Uber JAR.addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1")
מריצים את הפקודה
sbt clean assemblyכדי ליצור את קובץ ה-JAR.
Gradle
מוסיפים את התלות
spark-bigtableלקובץbuild.gradle.dependencies { implementation group: 'com.google.cloud.bigtable', name: 'spark-bigtable_SCALA_VERSION', version: '0.1.0' }
מוסיפים את הפלאגין Shadow לקובץ
build.gradleכדי ליצור קובץ JAR גדול:plugins { id 'com.github.johnrengelman.shadow' version '8.1.1' id 'java' }
מידע נוסף על הגדרות ועל קומפילציה של JAR זמין במסמכי התיעוד של Shadow plugin.
שליחת משרה
שולחים משימת Spark באמצעות Managed Service for Apache Spark, Managed Service for Apache Spark Serverless או מופע Spark מקומי כדי להפעיל את האפליקציה.
הגדרת סביבת זמן ריצה
מגדירים את משתני הסביבה הבאים.
#Google Cloud
export BIGTABLE_SPARK_PROJECT_ID=PROJECT_ID
export BIGTABLE_SPARK_INSTANCE_ID=INSTANCE_ID
export BIGTABLE_SPARK_TABLE_NAME=TABLE_NAME
export BIGTABLE_SPARK_DATAPROC_CLUSTER=DATAPROC_CLUSTER
export BIGTABLE_SPARK_DATAPROC_REGION=DATAPROC_REGION
export BIGTABLE_SPARK_DATAPROC_ZONE=DATAPROC_ZONE
#Managed Service for Apache Spark Serverless
export BIGTABLE_SPARK_SUBNET=SUBNET
export BIGTABLE_SPARK_GCS_BUCKET_NAME=GCS_BUCKET_NAME
#Scala/Java
export PATH_TO_COMPILED_JAR=PATH_TO_COMPILED_JAR
#PySpark
export GCS_PATH_TO_CONNECTOR_JAR=GCS_PATH_TO_CONNECTOR_JAR
export PATH_TO_PYTHON_FILE=PATH_TO_PYTHON_FILE
export LOCAL_PATH_TO_CONNECTOR_JAR=LOCAL_PATH_TO_CONNECTOR_JAR
מחליפים את מה שכתוב בשדות הבאים:
- PROJECT_ID: המזהה הקבוע של פרויקט Bigtable.
- INSTANCE_ID: המזהה הקבוע של מופע Bigtable.
- TABLE_NAME: המזהה הקבוע של הטבלה.
- DATAPROC_CLUSTER: המזהה הקבוע של אשכול Managed Service for Apache Spark.
- DATAPROC_REGION: האזור של Managed Service for Apache Spark שמכיל אחד מהאשכולות במופע של Managed Service for Apache Spark, לדוגמה,
northamerica-northeast2. - DATAPROC_ZONE: האזור שבו פועל אשכול Managed Service for Apache Spark.
- SUBNET: נתיב המשאב המלא של תת-הרשת.
- GCS_BUCKET_NAME: הקטגוריה של Cloud Storage שאליה מעלים את התלויות של עומס העבודה של Spark.
- PATH_TO_COMPILED_JAR: הנתיב המלא או היחסי לקובץ ה-JAR שעבר קומפילציה. לדוגמה,
/path/to/project/root/target/<compiled_JAR_name>ב-Maven. - GCS_PATH_TO_CONNECTOR_JAR: קטגוריית Cloud Storage
gs://spark-lib/bigtable, שבה נמצא הקובץspark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar. - PATH_TO_PYTHON_FILE: ביישומי PySpark, הנתיב לקובץ Python שישמש לכתיבת נתונים ב-Bigtable ולקריאת נתונים מ-Bigtable.
- LOCAL_PATH_TO_CONNECTOR_JAR: ביישומי PySpark, הנתיב לקובץ ה-JAR של מחבר Bigtable Spark שהורד.
שליחת משימה ב-Spark
כדי להעלות נתונים ל-Bigtable, מריצים משימת Spark במופעים של Managed Service for Apache Spark או בהגדרת Spark המקומית.
אשכול Managed Service for Apache Spark
משתמשים בקובץ ה-JAR המהודר ויוצרים משימה של אשכול Managed Service for Apache Spark שקוראת נתונים מ-Bigtable וכותבת נתונים ל-Bigtable.
יוצרים אשכול של Managed Service for Apache Spark. בדוגמה הבאה מוצגת פקודה לדוגמה ליצירת אשכול Managed Service for Apache Spark v2.0 עם Debian 10, שני צמתי עובד והגדרות ברירת מחדל.
gcloud dataproc clusters create \ $BIGTABLE_SPARK_DATAPROC_CLUSTER --region $BIGTABLE_SPARK_DATAPROC_REGION \ --zone $BIGTABLE_SPARK_DATAPROC_ZONE \ --master-machine-type n2-standard-4 --master-boot-disk-size 500 \ --num-workers 2 --worker-machine-type n2-standard-4 --worker-boot-disk-size 500 \ --image-version 2.0-debian10 --project $BIGTABLE_SPARK_PROJECT_IDשליחת משרה.
Scala/Java
בדוגמה הבאה מוצג המחלקה
spark.bigtable.example.WordCountשכוללת את הלוגיקה ליצירת טבלת בדיקה ב-DataFrame, כתיבת הטבלה ל-Bigtable ואז ספירת מספר המילים בטבלה.gcloud dataproc jobs submit spark \ --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \ --region=$BIGTABLE_SPARK_DATAPROC_REGION \ --class=spark.bigtable.example.WordCount \ --jar=$PATH_TO_COMPILED_JAR \ -- \ $BIGTABLE_SPARK_PROJECT_ID \ $BIGTABLE_SPARK_INSTANCE_ID \ $BIGTABLE_SPARK_TABLE_NAME \PySpark
gcloud dataproc jobs submit pyspark \ --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \ --region=$BIGTABLE_SPARK_DATAPROC_REGION \ --jars=$GCS_PATH_TO_CONNECTOR_JAR \ --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \ $PATH_TO_PYTHON_FILE \ -- \ --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \ --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \ --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME \
Managed Service for Apache Spark Serverless
משתמשים בקובץ ה-JAR המהודר ויוצרים משימה של Managed Service for Apache Spark שקוראת נתונים מ-Bigtable וכותבת נתונים ל-Bigtable באמצעות מופע Serverless של Managed Service for Apache Spark.
Scala/Java
gcloud dataproc batches submit spark \
--region=$BIGTABLE_SPARK_DATAPROC_REGION \
--subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
--deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME --jar=$PATH_TO_COMPILED_JAR \
-- \
$BIGTABLE_SPARK_PROJECT_ID \
$BIGTABLE_SPARK_INSTANCE_ID \
$BIGTABLE_SPARK_TABLE_NAME
PySpark
gcloud dataproc batches submit pyspark $PATH_TO_PYTHON_FILE \
--region=$BIGTABLE_SPARK_DATAPROC_REGION \
--subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
--deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME \
--jars=$GCS_PATH_TO_CONNECTOR_JAR \
--properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
-- \
--bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
--bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
--bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME
ניצוץ מקומי
משתמשים בקובץ ה-JAR שהורד ויוצרים משימת Spark שקוראת נתונים מ-Bigtable וכותבת נתונים ל-Bigtable באמצעות מופע Spark מקומי. אפשר גם להשתמש באמולטור של Bigtable כדי לשלוח את עבודת Spark.
שימוש באמולטור Bigtable
אם מחליטים להשתמש באמולטור של Bigtable, פועלים לפי השלבים הבאים:
מריצים את הפקודה הבאה כדי להפעיל את האמולטור:
gcloud beta emulators bigtable startכברירת מחדל, האמולטור בוחר באפשרות
localhost:8086.מגדירים את משתנה הסביבה
BIGTABLE_EMULATOR_HOST:export BIGTABLE_EMULATOR_HOST=localhost:8086
מידע נוסף על השימוש באמולטור של Bigtable זמין במאמר בדיקה באמצעות האמולטור.
שליחת משימה ב-Spark
משתמשים בפקודה spark-submit כדי לשלוח משימת Spark, בלי קשר לשאלה אם אתם משתמשים באמולטור Bigtable מקומי.
Scala/Java
spark-submit $PATH_TO_COMPILED_JAR \
$BIGTABLE_SPARK_PROJECT_ID \
$BIGTABLE_SPARK_INSTANCE_ID \
$BIGTABLE_SPARK_TABLE_NAME
PySpark
spark-submit \
--jars=$LOCAL_PATH_TO_CONNECTOR_JAR \
--packages=org.slf4j:slf4j-reload4j:1.7.36 \
$PATH_TO_PYTHON_FILE \
--bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
--bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
--bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME
אימות הנתונים בטבלה
מריצים את הפקודה הבאה של cbt CLI
כדי לוודא שהנתונים נכתבים ב-Bigtable. cbt CLI הוא רכיב של Google Cloud CLI. מידע נוסף מפורט
בסקירה הכללית בנושא CLI של cbt.
cbt -project=$BIGTABLE_SPARK_PROJECT_ID -instance=$BIGTABLE_SPARK_INSTANCE_ID \
read $BIGTABLE_SPARK_TABLE_NAME
פתרונות נוספים
להשתמש במחבר Bigtable Spark לפתרונות ספציפיים, כמו סריאליזציה של סוגים מורכבים של Spark SQL, קריאה של שורות ספציפיות ויצירה של מדדים בצד הלקוח.
קריאת שורה ספציפית ב-DataFrame באמצעות מסנן
כשמשתמשים ב-DataFrames כדי לקרוא מ-Bigtable, אפשר לציין מסנן כדי לקרוא רק שורות ספציפיות. מסננים פשוטים כמו ==, <= ו-startsWith בעמודה של מפתח השורה מוחלים בצד השרת כדי למנוע סריקה של כל הטבלה. מסננים במפתחות שורות מורכבים או מסננים מורכבים כמו המסנן LIKE בעמודה של מפתח השורה מוחלים בצד הלקוח.
אם אתם קוראים טבלאות גדולות, מומלץ להשתמש במסננים פשוטים של מפתחות שורות כדי להימנע מסריקה של כל הטבלה. בדוגמה הבאה אפשר לראות איך קוראים נתונים באמצעות מסנן פשוט. מוודאים שבמסנן Spark משתמשים בשם של עמודת DataFrame שהומר למפתח השורה:
dataframe.filter("id == 'some_id'").show()
כשמחילים מסנן, משתמשים בשם העמודה של DataFrame במקום בשם העמודה של טבלת Bigtable.
סריאליזציה של סוגי נתונים מורכבים באמצעות Apache Avro
מחבר Bigtable Spark תומך בשימוש ב-Apache Avro כדי לבצע סריאליזציה של סוגי Spark SQL מורכבים, כמו ArrayType, MapType או StructType. Apache Avro מספק סריאליזציה של נתונים עבור נתוני רשומות, שמשמשת בדרך כלל לעיבוד ולאחסון של מבני נתונים מורכבים.
משתמשים בתחביר כמו "avro":"avroSchema" כדי לציין שעמודה ב-Bigtable צריכה להיות מקודדת באמצעות Avro. אחר כך אפשר להשתמש ב-.option("avroSchema", avroSchemaString) כשקוראים מ-Bigtable או כותבים ל-Bigtable כדי לציין את סכמת Avro שמתאימה לעמודה הזו בפורמט מחרוזת. אפשר להשתמש בשמות שונים של אפשרויות – לדוגמה, "anotherAvroSchema" לעמודות שונות ולהעביר סכימות Avro לכמה עמודות.
def catalogWithAvroColumn = s"""{
|"table":{"name":"ExampleAvroTable"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
|}
|}""".stripMargin
שימוש במדדים בצד הלקוח
מחבר Bigtable Spark מבוסס על Bigtable Client for Java, ולכן מדדים בצד הלקוח מופעלים בתוך המחבר כברירת מחדל. מידע נוסף על גישה למדדים האלה ועל פירוש שלהם זמין במאמר בנושא מדדים בצד הלקוח.
שימוש בלקוח Bigtable ל-Java עם פונקציות RDD ברמה נמוכה
מחבר Bigtable Spark מבוסס על לקוח Bigtable ל-Java, ולכן אפשר להשתמש ישירות בלקוח באפליקציות Spark ולבצע בקשות קריאה או כתיבה מבוזרות בפונקציות RDD ברמה נמוכה, כמו mapPartitions ו-foreachPartition.
כדי להשתמש בלקוח Bigtable עבור מחלקות Java, מוסיפים את התחילית com.google.cloud.spark.bigtable.repackaged לשמות החבילות. לדוגמה, במקום להשתמש בשם המחלקה com.google.cloud.bigtable.data.v2.BigtableDataClient, צריך להשתמש ב-com.google.cloud.spark.bigtable.repackaged.com.google.cloud.bigtable.data.v2.BigtableDataClient.
מידע נוסף על לקוח Bigtable ל-Java זמין במאמר בנושא לקוח Bigtable ל-Java.
המאמרים הבאים
- איך משפרים את הביצועים של עבודת Spark ב-Managed Service for Apache Spark
- משתמשים בclasses מלקוח Bigtable ל-Java עם מחבר Bigtable Spark.