שימוש במחבר Bigtable Spark

מחבר Bigtable Spark מאפשר לכם לקרוא ולכתוב נתונים אל Bigtable וממנו. אפשר לקרוא נתונים מאפליקציית Spark באמצעות Spark SQL ו-DataFrames. הפעולות הבאות ב-Bigtable נתמכות באמצעות מחבר Bigtable Spark:

  • כתיבת נתונים
  • קריאת נתונים
  • יצירת טבלה חדשה

במאמר הזה מוסבר איך להמיר טבלה של Spark SQL DataFrames לטבלה של Bigtable, ואז לקמפל וליצור קובץ JAR כדי לשלוח עבודת Spark.

סטטוס התמיכה ב-Spark וב-Scala

מחבר Bigtable Spark תומך בגרסאות הבאות של Scala:

מחבר Bigtable Spark תומך בגרסאות הבאות של Spark:

מחבר Bigtable Spark תומך בגרסאות הבאות של Managed Service for Apache Spark:

חישוב העלויות

אם תחליטו להשתמש באחד מהרכיבים הבאים של Google Cloud, השימוש במשאבים יחויב:

  • ‫Bigtable (לא נגבה תשלום על שימוש באמולטור Bigtable)
  • Managed Service for Apache Spark
  • Cloud Storage

התמחור של Managed Service for Apache Spark חל על השימוש ב-Managed Service for Apache Spark באשכולות של Compute Engine. התמחור של Managed Service for Apache Spark Serverless חל על עומסי עבודה ועל סשנים שמופעלים ב-Managed Service for Apache Spark Serverless for Spark.

כדי ליצור הערכת עלויות בהתאם לשימוש החזוי, אתם יכולים להשתמש במחשבון התמחור.

לפני שמתחילים

לפני שמשתמשים במחבר Bigtable Spark, צריך לוודא שמתקיימות הדרישות המוקדמות הבאות.

התפקידים הנדרשים

כדי לקבל את ההרשאות שדרושות לשימוש ב-Bigtable Spark connector, צריך לבקש מהאדמין להקצות לכם את תפקידי ה-IAM הבאים בפרויקט:

  • אדמין של Bigtable‏ (roles/bigtable.admin)(אופציונלי): מאפשר לקרוא או לכתוב נתונים וליצור טבלה חדשה.
  • משתמש Bigtable‏ (roles/bigtable.user): מאפשר לקרוא או לכתוב נתונים, אבל לא מאפשר ליצור טבלה חדשה.

להסבר על מתן תפקידים, ראו איך מנהלים את הגישה ברמת הפרויקט, התיקייה והארגון.

יכול להיות שאפשר לקבל את ההרשאות הנדרשות גם באמצעות תפקידים בהתאמה אישית או תפקידים מוגדרים מראש.

אם אתם משתמשים ב-Managed Service for Apache Spark או ב-Cloud Storage, יכול להיות שתידרשו הרשאות נוספות. מידע נוסף זמין במאמרים הרשאות בשירות המנוהל ל-Apache Spark והרשאות ב-Cloud Storage.

הגדרת Spark

בנוסף ליצירת מופע Bigtable, צריך גם להגדיר את מופע Spark. אפשר לעשות את זה באופן מקומי או לבחור באחת מהאפשרויות הבאות כדי להשתמש ב-Spark עם Managed Service for Apache Spark:

  • אשכול Managed Service for Apache Spark
  • Managed Service for Apache Spark Serverless

מידע נוסף על בחירה בין אשכול Managed Service for Apache Spark לבין אפשרות ללא שרת זמין במאמר Managed Service for Apache Spark Serverless for Spark בהשוואה ל-Managed Service for Apache Spark ב-Compute Engine .

הורדת קובץ ה-JAR של המחבר

אפשר למצוא את קוד המקור של מחבר Bigtable Spark עם דוגמאות במאגר GitHub של מחבר Bigtable Spark.

בהתאם להגדרה של Spark, אפשר לגשת לקובץ ה-JAR באופן הבא:

  • אם אתם מריצים את PySpark באופן מקומי, אתם צריכים להוריד את קובץ ה-JAR של המחבר ממיקום Cloud Storage‏ gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar.

    מחליפים את SCALA_VERSION ב-2.12 או ב-2.13, שהן הגרסאות היחידות של Scala שנתמכות, ומחליפים את CONNECTOR_VERSION בגרסת המחבר שרוצים להשתמש בה.

  • באפשרות Managed Service for Apache Spark cluster או באפשרות Serverless, משתמשים בקובץ ה-JAR העדכני כארטיפקט שאפשר להוסיף לאפליקציות Scala או Java Spark. מידע נוסף על השימוש בקובץ JAR כארטיפקט זמין במאמר ניהול יחסי תלות.

  • אם שולחים את משימת PySpark אל Managed Service for Apache Spark, משתמשים בדגל gcloud dataproc jobs submit pyspark --jars כדי להגדיר את ה-URI למיקום של קובץ ה-JAR ב-Cloud Storage – לדוגמה gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar.

קביעת סוג המחשוב

למשימות קריאה עם תפוקה גבוהה, כמו אלה שמבוצעות על ידי אפליקציות Spark, אפשר להשתמש ב-Data Boost serverless compute כדי למנוע השפעה על אשכולות שרת האפליקציות. כדי להשתמש ב-Data Boost, אפליקציית Spark צריכה להשתמש בגרסה 1.1.0 או בגרסה מתקדמת יותר של מחבר Spark.

כדי להשתמש ב-Data Boost, צריך ליצור פרופיל אפליקציה של Data Boost ואז לספק את מזהה פרופיל האפליקציה עבור האפשרות spark.bigtable.app_profile.id Spark כשמוסיפים את ההגדרה של Bigtable לאפליקציית Spark. אם כבר יצרתם פרופיל אפליקציה למשימות קריאה של Spark ואתם רוצים להמשיך להשתמש בו בלי לשנות את קוד האפליקציה, אתם יכולים להמיר את פרופיל האפליקציה לפרופיל אפליקציה של Data Boost. מידע נוסף זמין במאמר המרת פרופיל של אפליקציה.

מידע נוסף זמין במאמר סקירה כללית על Bigtable Data Boost.

לגבי משימות שכוללות קריאה וכתיבה, אפשר להשתמש בצמתי האשכול של המופע לחישובים על ידי ציון פרופיל אפליקציה רגיל בבקשה.

מגדירים או יוצרים פרופיל אפליקציה לשימוש

אם לא מציינים מזהה של פרופיל אפליקציה, המחבר משתמש בפרופיל האפליקציה שמוגדר כברירת מחדל.

מומלץ להשתמש בפרופיל אפליקציה ייחודי לכל אפליקציה שמפעילים, כולל אפליקציית Spark. מידע נוסף על הסוגים וההגדרות של פרופילים של אפליקציות זמין במאמר סקירה כללית על פרופילים של אפליקציות. הוראות מפורטות מופיעות במאמר יצירה והגדרה של פרופילים לאפליקציות.

הוספת הגדרת Bigtable לאפליקציית Spark

באפליקציית Spark, מוסיפים את האפשרויות של Spark שמאפשרות אינטראקציה עם Bigtable.

אפשרויות Spark נתמכות

משתמשים באפשרויות של Spark שזמינות כחלק מחבילת com.google.cloud.spark.bigtable.

שם האפשרות חובה ערך ברירת המחדל משמעות
spark.bigtable.project.id כן לא רלוונטי מגדירים את מזהה הפרויקט ב-Bigtable.
spark.bigtable.instance.id כן לא רלוונטי מגדירים את מזהה המכונה של Bigtable.
catalog כן לא רלוונטי מגדירים את פורמט ה-JSON שמציין את פורמט ההמרה בין סכימת ה-SQL של DataFrame לבין סכימת הטבלה של Bigtable.

מידע נוסף זמין במאמר בנושא יצירת מטא-נתונים של טבלה בפורמט JSON.
spark.bigtable.app_profile.id לא default מגדירים את מזהה פרופיל האפליקציה של Bigtable.
spark.bigtable.write.timestamp.milliseconds לא השעה הנוכחית במערכת מגדירים את חותמת הזמן במילישניות שבה רוצים להשתמש כשכותבים DataFrame ל-Bigtable.

שימו לב: מכיוון שכל השורות ב-DataFrame משתמשות באותה חותמת זמן, שורות עם אותה עמודה של מפתח שורה ב-DataFrame נשמרות כגרסה יחידה ב-Bigtable כי יש להן אותה חותמת זמן.
spark.bigtable.create.new.table לא false מגדירים את הערך true כדי ליצור טבלה חדשה לפני הכתיבה ל-Bigtable.
spark.bigtable.read.timerange.start.milliseconds או spark.bigtable.read.timerange.end.milliseconds לא לא רלוונטי מגדירים חותמות זמן (באלפיות שנייה מאז ראשית זמן יוניקס) כדי לסנן תאים עם תאריך התחלה ותאריך סיום ספציפיים, בהתאמה.
spark.bigtable.push.down.row.key.filters לא true הגדרה לערך true כדי לאפשר סינון פשוט של מפתחות שורות בצד השרת. סינון של מפתחות שורות מורכבים מיושם בצד הלקוח.

מידע נוסף זמין במאמר קריאת שורה ספציפית ב-DataFrame באמצעות מסנן.
spark.bigtable.read.rows.attempt.timeout.milliseconds לא ‫30 דקות מגדירים את משך הזמן הקצוב לתפוגה לניסיון קריאת שורות שמתאים למחיצה אחת של DataFrame בלקוח Bigtable ל-Java.
spark.bigtable.read.rows.total.timeout.milliseconds לא ‫12 שעות מגדירים את משך הזמן הקצוב הכולל לתפוגה לניסיון קריאת שורות שמתאים למחיצה אחת של DataFrame בלקוח Bigtable ל-Java.
spark.bigtable.mutate.rows.attempt.timeout.milliseconds לא דקה מגדירים את משך הזמן הקצוב לתפוגה של ניסיון לשנות שורות שמתאים למחיצה אחת של DataFrame בלקוח Bigtable ל-Java.
spark.bigtable.mutate.rows.total.timeout.milliseconds לא ‫10 דקות מגדירים את משך הזמן של total timeout לניסיון של שינוי שורות שמתאים למחיצה אחת של DataFrame בלקוח Bigtable ל-Java.
spark.bigtable.batch.mutate.size לא 100 מגדירים את מספר המוטציות בכל קבוצה. הערך המקסימלי שאפשר להגדיר הוא 100000.
spark.bigtable.enable.batch_mutate.flow_control לא false מגדירים את הערך true כדי להפעיל בקרה על זרימת נתונים לשינויים בקבוצות.

יצירת מטא-נתונים של טבלה בפורמט JSON

צריך להמיר את פורמט הטבלה של Spark SQL DataFrames לטבלת Bigtable באמצעות מחרוזת בפורמט JSON. פורמט ה-JSON של המחרוזת הזו מאפשר להתאים את פורמט הנתונים ל-Bigtable. אפשר להעביר את פורמט ה-JSON בקוד האפליקציה באמצעות האפשרות .option("catalog", catalog_json_string).

לדוגמה, נניח שיש לכם טבלת DataFrame וטבלת Bigtable תואמת.

בדוגמה הזו, העמודות name ו-birthYear ב-DataFrame מקובצות יחד תחת קבוצת העמודות info, והשמות שלהן משתנים ל-name ו-birth_year, בהתאמה. באופן דומה, העמודה address מאוחסנת תחת קבוצת העמודות location עם אותו שם עמודה. העמודה id מ-DataFrame מומרת למפתח השורה ב-Bigtable.

למפתחות השורה אין שם עמודה ייעודי ב-Bigtable, ובדוגמה הזו, id_rowkey משמש רק כדי לציין למחבר שזו עמודת מפתח השורה. אתם יכולים להשתמש בכל שם לעמודה של מפתח השורה, אבל חשוב לוודא שאתם משתמשים באותו שם כשאתם מצהירים על השדה "rowkey":"column_name" בפורמט JSON.

DataFrame Bigtable table = t1
עמודות מפתח שורה משפחות עמודות
מידע מיקום
עמודות עמודות
id name birthYear address id_rowkey name birth_year address

הפורמט של קטלוג ב-JSON הוא:

    """
    {
      "table": {"name": "t1"},
      "rowkey": "id_rowkey",
      "columns": {
        "id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"},
        "name": {"cf": "info", "col": "name", "type": "string"},
        "birthYear": {"cf": "info", "col": "birth_year", "type": "long"},
        "address": {"cf": "location", "col": "address", "type": "string"}
      }
    }
    """

המפתחות והערכים שמשמשים בפורמט JSON הם:

מפתח קטלוג ערך הקטלוג פורמט JSON
טבלה שם הטבלה ב-Bigtable. "table":{"name":"t1"}

אם הטבלה לא קיימת, משתמשים בפונקציה .option("spark.bigtable.create.new.table", "true") כדי ליצור טבלה.
rowkey שם העמודה שתשמש כמפתח השורה ב-Bigtable. מוודאים ששם העמודה של DataFrame משמש כמפתח השורה – לדוגמה, id_rowkey.

אפשר להשתמש גם במפתחות מורכבים כמפתחות שורה. לדוגמה, "rowkey":"name:address". הגישה הזו עלולה לגרום לכך שמפתחות השורות ידרשו סריקה מלאה של הטבלה לכל בקשות הקריאה.
"rowkey":"id_rowkey",
עמודות מיפוי של כל עמודה ב-DataFrame לקבוצת העמודות ("cf") ולשם העמודה ("col") המתאימים ב-Bigtable. שם העמודה יכול להיות שונה משם העמודה בטבלת DataFrame. סוגי הנתונים הנתמכים כוללים string,‏ long ו-binary. "columns": {"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"}, "name": {"cf": "info", "col": "name", "type": "string"}, "birthYear": {"cf":"info", "col": "birth_year", "type": "long"}, "address": {"cf": "location", "col": "address", "type":"string"}}"

בדוגמה הזו, id_rowkey הוא מפתח השורה, ו-info ו-location הם משפחות העמודות.

סוגי נתונים נתמכים

המחבר תומך בשימוש בסוגים string, ‏long ו-binary (מערך בייטים) בקטלוג. עד שנוסיף תמיכה בסוגים אחרים כמו intו-float, תוכלו להמיר באופן ידני סוגי נתונים כאלה למערכי בייטים (BinaryType ב-Spark SQL) לפני שתשתמשו במחבר כדי לכתוב אותם ב-Bigtable.

בנוסף, אפשר להשתמש ב-Avro כדי לבצע סריאליזציה של סוגים מורכבים, כמו ArrayType. מידע נוסף זמין במאמר סדרת נתונים מורכבים באמצעות Apache Avro.

כתיבה ל-Bigtable

משתמשים בפונקציה .write() ובאפשרויות הנתמכות כדי לכתוב את הנתונים ב-Bigtable.

Java

הקוד הבא ממאגר GitHub משתמש ב-Java וב-Maven כדי לכתוב ל-Bigtable.

  String catalog = "{" +
        "\"table\":{\"name\":\"" + tableName + "\"," +
        "\"tableCoder\":\"PrimitiveType\"}," +
        "\"rowkey\":\"wordCol\"," +
        "\"columns\":{" +
        "\"word\":{\"cf\":\"rowkey\", \"col\":\"wordCol\", \"type\":\"string\"}," +
        "\"count\":{\"cf\":\"example_family\", \"col\":\"countCol\", \"type\":\"long\"}" +
        "}}".replaceAll("\\s+", "");



  private static void writeDataframeToBigtable(Dataset<Row> dataframe, String catalog,
        String createNewTable) {
      dataframe
          .write()
          .format("bigtable")
          .option("catalog", catalog)
          .option("spark.bigtable.project.id", projectId)
          .option("spark.bigtable.instance.id", instanceId)
          .option("spark.bigtable.create.new.table", createNewTable)
          .save();
    }

Python

הקוד הבא ממאגר GitHub משתמש ב-Python כדי לכתוב ל-Bigtable.

  catalog = ''.join(("""{
        "table":{"name":" """ + bigtable_table_name + """
        ", "tableCoder":"PrimitiveType"},
        "rowkey":"wordCol",
        "columns":{
          "word":{"cf":"rowkey", "col":"wordCol", "type":"string"},
          "count":{"cf":"example_family", "col":"countCol", "type":"long"}
        }
        }""").split())
  

  input_data = spark.createDataFrame(data)
  print('Created the DataFrame:')
  input_data.show()

  input_data.write \
        .format('bigtable') \
        .options(catalog=catalog) \
        .option('spark.bigtable.project.id', bigtable_project_id) \
        .option('spark.bigtable.instance.id', bigtable_instance_id) \
        .option('spark.bigtable.create.new.table', create_new_table) \
        .save()
  print('DataFrame was written to Bigtable.')

  

קריאה מ-Bigtable

כדי לבדוק אם הטבלה יובאה ל-Bigtable, משתמשים בפונקציה .read().

Java

  
  private static Dataset<Row> readDataframeFromBigtable(String catalog) {
      Dataset<Row> dataframe = spark
          .read()
          .format("bigtable")
          .option("catalog", catalog)
          .option("spark.bigtable.project.id", projectId)
          .option("spark.bigtable.instance.id", instanceId)
          .load();
      return dataframe;
    }

Python

  

  records = spark.read \
        .format('bigtable') \
        .option('spark.bigtable.project.id', bigtable_project_id) \
        .option('spark.bigtable.instance.id', bigtable_instance_id) \
        .options(catalog=catalog) \
        .load()

  print('Reading the DataFrame from Bigtable:')
  records.show()

קומפילציה של הפרויקט

יוצרים את קובץ ה-JAR שמשמש להרצת משימה באחד מהמקרים הבאים: אשכול Managed Service for Apache Spark,‏ Managed Service for Apache Spark Serverless או מופע Spark מקומי. אפשר לקמפל את קובץ ה-JAR באופן מקומי ואז להשתמש בו כדי לשלוח עבודה. הנתיב לקובץ ה-JAR המהודר מוגדר כמשתנה הסביבה PATH_TO_COMPILED_JAR כששולחים עבודה.

השלב הזה לא רלוונטי לאפליקציות PySpark.

ניהול יחסי התלות

מחבר Bigtable Spark תומך בכלי ניהול התלות הבאים:

קומפילציה של קובץ ה-JAR

Maven

  1. מוסיפים את התלות spark-bigtable לקובץ pom.xml.

    <dependencies>
    <dependency>
      <groupId>com.google.cloud.spark.bigtable</groupId>
      <artifactId>spark-bigtable_SCALA_VERSION</artifactId>
      <version>0.1.0</version>
    </dependency>
    </dependencies>
    
  2. מוסיפים את Maven Shade plugin לקובץ pom.xml כדי ליצור uber JAR:

    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.2.4</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
    
  3. מריצים את הפקודה mvn clean install כדי ליצור קובץ JAR.

sbt

  1. מוסיפים את התלות spark-bigtable לקובץ build.sbt:

    libraryDependencies += "com.google.cloud.spark.bigtable" % "spark-bigtable_SCALA_VERSION" % "0.1.0{""}}"
  2. מוסיפים את התוסף sbt-assembly לקובץ project/plugins.sbt או project/assembly.sbt כדי ליצור קובץ Uber JAR.

    addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1")
  3. מריצים את הפקודה sbt clean assembly כדי ליצור את קובץ ה-JAR.

Gradle

  1. מוסיפים את התלות spark-bigtable לקובץ build.gradle.

    dependencies {
    implementation group: 'com.google.cloud.bigtable', name: 'spark-bigtable_SCALA_VERSION', version: '0.1.0'
    }
  2. מוסיפים את הפלאגין Shadow לקובץ build.gradle כדי ליצור קובץ JAR גדול:

    plugins {
    id 'com.github.johnrengelman.shadow' version '8.1.1'
    id 'java'
    }
  3. מידע נוסף על הגדרות ועל קומפילציה של JAR זמין במסמכי התיעוד של Shadow plugin.

שליחת משרה

שולחים משימת Spark באמצעות Managed Service for Apache Spark,‏ Managed Service for Apache Spark Serverless או מופע Spark מקומי כדי להפעיל את האפליקציה.

הגדרת סביבת זמן ריצה

מגדירים את משתני הסביבה הבאים.

      #Google Cloud
      export BIGTABLE_SPARK_PROJECT_ID=PROJECT_ID
      export BIGTABLE_SPARK_INSTANCE_ID=INSTANCE_ID
      export BIGTABLE_SPARK_TABLE_NAME=TABLE_NAME
      export BIGTABLE_SPARK_DATAPROC_CLUSTER=DATAPROC_CLUSTER
      export BIGTABLE_SPARK_DATAPROC_REGION=DATAPROC_REGION
      export BIGTABLE_SPARK_DATAPROC_ZONE=DATAPROC_ZONE

      #Managed Service for Apache Spark Serverless
      export BIGTABLE_SPARK_SUBNET=SUBNET
      export BIGTABLE_SPARK_GCS_BUCKET_NAME=GCS_BUCKET_NAME

      #Scala/Java
      export PATH_TO_COMPILED_JAR=PATH_TO_COMPILED_JAR

      #PySpark
      export GCS_PATH_TO_CONNECTOR_JAR=GCS_PATH_TO_CONNECTOR_JAR
      export PATH_TO_PYTHON_FILE=PATH_TO_PYTHON_FILE
      export LOCAL_PATH_TO_CONNECTOR_JAR=LOCAL_PATH_TO_CONNECTOR_JAR

מחליפים את מה שכתוב בשדות הבאים:

  • PROJECT_ID: המזהה הקבוע של פרויקט Bigtable.
  • INSTANCE_ID: המזהה הקבוע של מופע Bigtable.
  • TABLE_NAME: המזהה הקבוע של הטבלה.
  • DATAPROC_CLUSTER: המזהה הקבוע של אשכול Managed Service for Apache Spark.
  • DATAPROC_REGION: האזור של Managed Service for Apache Spark שמכיל אחד מהאשכולות במופע של Managed Service for Apache Spark, לדוגמה, northamerica-northeast2.
  • DATAPROC_ZONE: האזור שבו פועל אשכול Managed Service for Apache Spark.
  • SUBNET: נתיב המשאב המלא של תת-הרשת.
  • GCS_BUCKET_NAME: הקטגוריה של Cloud Storage שאליה מעלים את התלויות של עומס העבודה של Spark.
  • PATH_TO_COMPILED_JAR: הנתיב המלא או היחסי לקובץ ה-JAR שעבר קומפילציה. לדוגמה, /path/to/project/root/target/<compiled_JAR_name> ב-Maven.
  • GCS_PATH_TO_CONNECTOR_JAR: קטגוריית Cloud Storage‏ gs://spark-lib/bigtable, שבה נמצא הקובץ spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar.
  • PATH_TO_PYTHON_FILE: ביישומי PySpark, הנתיב לקובץ Python שישמש לכתיבת נתונים ב-Bigtable ולקריאת נתונים מ-Bigtable.
  • LOCAL_PATH_TO_CONNECTOR_JAR: ביישומי PySpark, הנתיב לקובץ ה-JAR של מחבר Bigtable Spark שהורד.

שליחת משימה ב-Spark

כדי להעלות נתונים ל-Bigtable, מריצים משימת Spark במופעים של Managed Service for Apache Spark או בהגדרת Spark המקומית.

אשכול Managed Service for Apache Spark

משתמשים בקובץ ה-JAR המהודר ויוצרים משימה של אשכול Managed Service for Apache Spark שקוראת נתונים מ-Bigtable וכותבת נתונים ל-Bigtable.

  1. יוצרים אשכול של Managed Service for Apache Spark. בדוגמה הבאה מוצגת פקודה לדוגמה ליצירת אשכול Managed Service for Apache Spark v2.0 עם Debian 10, שני צמתי עובד והגדרות ברירת מחדל.

    gcloud dataproc clusters create \
      $BIGTABLE_SPARK_DATAPROC_CLUSTER --region $BIGTABLE_SPARK_DATAPROC_REGION \
      --zone $BIGTABLE_SPARK_DATAPROC_ZONE \
      --master-machine-type n2-standard-4 --master-boot-disk-size 500 \
      --num-workers 2 --worker-machine-type n2-standard-4 --worker-boot-disk-size 500 \
      --image-version 2.0-debian10 --project $BIGTABLE_SPARK_PROJECT_ID
    
  2. שליחת משרה.

    Scala/Java

    בדוגמה הבאה מוצג המחלקה spark.bigtable.example.WordCount שכוללת את הלוגיקה ליצירת טבלת בדיקה ב-DataFrame, כתיבת הטבלה ל-Bigtable ואז ספירת מספר המילים בטבלה.

        gcloud dataproc jobs submit spark \
        --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \
        --region=$BIGTABLE_SPARK_DATAPROC_REGION \
        --class=spark.bigtable.example.WordCount \
        --jar=$PATH_TO_COMPILED_JAR \
        -- \
        $BIGTABLE_SPARK_PROJECT_ID \
        $BIGTABLE_SPARK_INSTANCE_ID \
        $BIGTABLE_SPARK_TABLE_NAME \
    

    PySpark

        gcloud dataproc jobs submit pyspark \
        --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \
        --region=$BIGTABLE_SPARK_DATAPROC_REGION \
        --jars=$GCS_PATH_TO_CONNECTOR_JAR \
        --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
        $PATH_TO_PYTHON_FILE \
        -- \
        --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
        --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
        --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME \
    

Managed Service for Apache Spark Serverless

משתמשים בקובץ ה-JAR המהודר ויוצרים משימה של Managed Service for Apache Spark שקוראת נתונים מ-Bigtable וכותבת נתונים ל-Bigtable באמצעות מופע Serverless של Managed Service for Apache Spark.

Scala/Java

  gcloud dataproc batches submit spark \
  --region=$BIGTABLE_SPARK_DATAPROC_REGION \
  --subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
  --deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME --jar=$PATH_TO_COMPILED_JAR \
  --  \
  $BIGTABLE_SPARK_PROJECT_ID \
  $BIGTABLE_SPARK_INSTANCE_ID \
  $BIGTABLE_SPARK_TABLE_NAME

PySpark

  gcloud dataproc batches submit pyspark $PATH_TO_PYTHON_FILE \
  --region=$BIGTABLE_SPARK_DATAPROC_REGION \
  --subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
  --deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME \
  --jars=$GCS_PATH_TO_CONNECTOR_JAR \
  --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
  -- \
  --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
  --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
  --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME

ניצוץ מקומי

משתמשים בקובץ ה-JAR שהורד ויוצרים משימת Spark שקוראת נתונים מ-Bigtable וכותבת נתונים ל-Bigtable באמצעות מופע Spark מקומי. אפשר גם להשתמש באמולטור של Bigtable כדי לשלוח את עבודת Spark.

שימוש באמולטור Bigtable

אם מחליטים להשתמש באמולטור של Bigtable, פועלים לפי השלבים הבאים:

  1. מריצים את הפקודה הבאה כדי להפעיל את האמולטור:

    gcloud beta emulators bigtable start
    

    כברירת מחדל, האמולטור בוחר באפשרות localhost:8086.

  2. מגדירים את משתנה הסביבה BIGTABLE_EMULATOR_HOST:

    export BIGTABLE_EMULATOR_HOST=localhost:8086
    
  3. שולחים את עבודת Spark.

מידע נוסף על השימוש באמולטור של Bigtable זמין במאמר בדיקה באמצעות האמולטור.

שליחת משימה ב-Spark

משתמשים בפקודה spark-submit כדי לשלוח משימת Spark, בלי קשר לשאלה אם אתם משתמשים באמולטור Bigtable מקומי.

Scala/Java

  spark-submit $PATH_TO_COMPILED_JAR \
  $BIGTABLE_SPARK_PROJECT_ID \
  $BIGTABLE_SPARK_INSTANCE_ID \
  $BIGTABLE_SPARK_TABLE_NAME

PySpark

  spark-submit \
  --jars=$LOCAL_PATH_TO_CONNECTOR_JAR \
  --packages=org.slf4j:slf4j-reload4j:1.7.36 \
  $PATH_TO_PYTHON_FILE \
  --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
  --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
  --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME

אימות הנתונים בטבלה

מריצים את הפקודה הבאה של cbt CLI כדי לוודא שהנתונים נכתבים ב-Bigtable. ‫cbt CLI הוא רכיב של Google Cloud CLI. מידע נוסף מפורט בסקירה הכללית בנושא CLI של cbt.

    cbt -project=$BIGTABLE_SPARK_PROJECT_ID -instance=$BIGTABLE_SPARK_INSTANCE_ID \
    read $BIGTABLE_SPARK_TABLE_NAME

פתרונות נוספים

להשתמש במחבר Bigtable Spark לפתרונות ספציפיים, כמו סריאליזציה של סוגים מורכבים של Spark SQL, קריאה של שורות ספציפיות ויצירה של מדדים בצד הלקוח.

קריאת שורה ספציפית ב-DataFrame באמצעות מסנן

כשמשתמשים ב-DataFrames כדי לקרוא מ-Bigtable, אפשר לציין מסנן כדי לקרוא רק שורות ספציפיות. מסננים פשוטים כמו ==,‏ <= ו-startsWith בעמודה של מפתח השורה מוחלים בצד השרת כדי למנוע סריקה של כל הטבלה. מסננים במפתחות שורות מורכבים או מסננים מורכבים כמו המסנן LIKE בעמודה של מפתח השורה מוחלים בצד הלקוח.

אם אתם קוראים טבלאות גדולות, מומלץ להשתמש במסננים פשוטים של מפתחות שורות כדי להימנע מסריקה של כל הטבלה. בדוגמה הבאה אפשר לראות איך קוראים נתונים באמצעות מסנן פשוט. מוודאים שבמסנן Spark משתמשים בשם של עמודת DataFrame שהומר למפתח השורה:

    dataframe.filter("id == 'some_id'").show()
  

כשמחילים מסנן, משתמשים בשם העמודה של DataFrame במקום בשם העמודה של טבלת Bigtable.

סריאליזציה של סוגי נתונים מורכבים באמצעות Apache Avro

מחבר Bigtable Spark תומך בשימוש ב-Apache Avro כדי לבצע סריאליזציה של סוגי Spark SQL מורכבים, כמו ArrayType,‏ MapType או StructType. ‫Apache Avro מספק סריאליזציה של נתונים עבור נתוני רשומות, שמשמשת בדרך כלל לעיבוד ולאחסון של מבני נתונים מורכבים.

משתמשים בתחביר כמו "avro":"avroSchema" כדי לציין שעמודה ב-Bigtable צריכה להיות מקודדת באמצעות Avro. אחר כך אפשר להשתמש ב-.option("avroSchema", avroSchemaString) כשקוראים מ-Bigtable או כותבים ל-Bigtable כדי לציין את סכמת Avro שמתאימה לעמודה הזו בפורמט מחרוזת. אפשר להשתמש בשמות שונים של אפשרויות – לדוגמה, "anotherAvroSchema" לעמודות שונות ולהעביר סכימות Avro לכמה עמודות.

def catalogWithAvroColumn = s"""{
                    |"table":{"name":"ExampleAvroTable"},
                    |"rowkey":"key",
                    |"columns":{
                    |"col0":{"cf":"rowkey", "col":"key", "type":"string&quot;},
                    |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
                    |}
                    |}""".stripMargin

שימוש במדדים בצד הלקוח

מחבר Bigtable Spark מבוסס על Bigtable Client for Java, ולכן מדדים בצד הלקוח מופעלים בתוך המחבר כברירת מחדל. מידע נוסף על גישה למדדים האלה ועל פירוש שלהם זמין במאמר בנושא מדדים בצד הלקוח.

שימוש בלקוח Bigtable ל-Java עם פונקציות RDD ברמה נמוכה

מחבר Bigtable Spark מבוסס על לקוח Bigtable ל-Java, ולכן אפשר להשתמש ישירות בלקוח באפליקציות Spark ולבצע בקשות קריאה או כתיבה מבוזרות בפונקציות RDD ברמה נמוכה, כמו mapPartitions ו-foreachPartition.

כדי להשתמש בלקוח Bigtable עבור מחלקות Java, מוסיפים את התחילית com.google.cloud.spark.bigtable.repackaged לשמות החבילות. לדוגמה, במקום להשתמש בשם המחלקה com.google.cloud.bigtable.data.v2.BigtableDataClient, צריך להשתמש ב-com.google.cloud.spark.bigtable.repackaged.com.google.cloud.bigtable.data.v2.BigtableDataClient.

מידע נוסף על לקוח Bigtable ל-Java זמין במאמר בנושא לקוח Bigtable ל-Java.

המאמרים הבאים