שימוש במחבר Spark BigQuery

המחבר spark-bigquery-connector משמש עם Apache Spark לקריאה ולכתיבה של נתונים מ-BigQuery ואליו. המחבר משתמש ב-BigQuery Storage API כשקוראים נתונים מ-BigQuery.

במדריך הזה מוסבר על הזמינות של המחבר שהותקן מראש, ומוצגות הוראות להפיכת גרסה ספציפית של מחבר לזמינה לעבודות Spark. בדוגמת הקוד אפשר לראות איך משתמשים במחבר Spark BigQuery באפליקציית Spark.

שימוש במחבר שמותקן מראש

מחבר Spark BigQuery מותקן מראש וזמין למשימות Spark שמופעלות באשכולות Managed Service for Apache Spark שנוצרו עם גרסאות תמונה 2.1 ואילך. גרסת המחבר שמותקנת מראש מופיעה בדפי הגרסאות של תמונת המערכת.

הפיכת גרסה ספציפית של מחבר לזמינה לעבודות Spark

אם רוצים להשתמש בגרסה של מחבר ששונה מגרסה שהותקנה מראש באשכול של גרסת תמונה 2.1 ומעלה, או אם רוצים להתקין את המחבר באשכול של גרסת תמונה שקודמת ל-2.1, צריך לפעול לפי ההוראות שבקטע הזה.

חשוב: גרסה spark-bigquery-connector צריכה להיות תואמת לגרסת תמונת האשכול של Managed Service for Apache Spark. אפשר לעיין בטבלת התאימות של תמונות המחבר ל-Managed Service for Apache Spark.

אשכולות של גרסאות תמונות מ-2.1 ואילך

כשיוצרים אשכול Managed Service for Apache Spark עם גרסת תמונה 2.1 ואילך, צריך לציין את גרסת המחבר כמטא-נתונים של האשכול.

דוגמה ל-CLI של gcloud:

gcloud dataproc clusters create CLUSTER_NAME \
    --region=REGION \
    --image-version=2.2 \
    --metadata=SPARK_BQ_CONNECTOR_VERSION or SPARK_BQ_CONNECTOR_URL\
    other flags

הערות:

  • SPARK_BQ_CONNECTOR_VERSION: ציון גרסת מחבר. גרסאות של מחבר Spark BigQuery מפורטות בדף spark-bigquery-connector/releases ב-GitHub.

    דוגמה:

    --metadata=SPARK_BQ_CONNECTOR_VERSION=0.42.1
    
  • SPARK_BQ_CONNECTOR_URL: מציינים כתובת URL שמפנה לקובץ ה-JAR ב-Cloud Storage. אפשר לציין את כתובת ה-URL של מחבר שמופיע בעמודה link בקטע Downloading and Using the Connector ב-GitHub, או את הנתיב למיקום ב-Cloud Storage שבו שמרתם קובץ jar של מחבר מותאם אישית.

    דוגמאות:

    --metadata=SPARK_BQ_CONNECTOR_URL=gs://spark-lib/bigquery/spark-3.5-bigquery-0.42.1.jar
    --metadata=SPARK_BQ_CONNECTOR_URL=gs://PATH_TO_CUSTOM_JAR
    

2.0 וקלאסטרים של גרסאות קודמות של תמונות

אפשר להשתמש במחבר Spark BigQuery באפליקציה באחת מהדרכים הבאות:

  1. כדי להתקין את spark-bigquery-connector בספריית ה-JAR של Spark בכל צומת, משתמשים בפעולת האתחול של Managed Service for Apache Spark connectors כשיוצרים את האשכול.

  2. כששולחים את העבודה לאשכול באמצעות Google Cloud המסוף, gcloud CLI או Managed Service for Apache Spark API, צריך לספק את כתובת ה-URL של קובץ ה-JAR של המחבר.

    המסוף

    משתמשים בפריט Jars files (קבצי JAR) במשימת Spark בדף Submit a job (שליחת משימה) של Managed Service for Apache Spark.

    gcloud

    משתמשים בדגל gcloud dataproc jobs submit spark --jars.

    API

    משתמשים בשדה SparkJob.jarFileUris.

    איך מציינים את קובץ ה-JAR של המחבר כשמריצים משימות Spark באשכולות של גרסת תמונה ישנה יותר מ-2.0

    • מציינים את קובץ ה-JAR של המחבר על ידי החלפת המידע על גרסת המחבר ו-Scala במחרוזת ה-URI הבאה:
      gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
      
    • שימוש ב-Scala 2.12 עם גרסאות תמונות של Managed Service for Apache Spark 1.5+
      gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
      
      דוגמה ל-CLI של gcloud:
      gcloud dataproc jobs submit spark \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \
          -- job args
      
    • שימוש ב-Scala 2.11 עם גרסאות תמונת Managed Service for Apache Spark‏ 1.4 וגרסאות קודמות:
      gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
      
      דוגמה ל-CLI של gcloud:
      gcloud dataproc jobs submit spark \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \
          -- job-args
      
  3. כוללים את קובץ ה-jar של המחבר באפליקציית Scala או Java Spark כתלות (ראו קומפילציה מול המחבר).

חישוב העלויות

במסמך הזה משתמשים ברכיבים הבאים של Google Cloud, והשימוש בהם כרוך בתשלום:

  • Managed Service for Apache Spark
  • BigQuery
  • Cloud Storage

כדי להעריך את ההוצאות בהתאם לתחזית השימוש שלכם, אתם יכולים להיעזר במחשבון העלויות.

משתמשים חדשים של Google Cloud ? יכול להיות שאתם זכאים לתקופת ניסיון בחינם.

קריאה וכתיבה של נתונים מ-BigQuery ואליו

בדוגמה הזו, הנתונים נקראים מ-BigQuery לתוך Spark DataFrame כדי לבצע ספירת מילים באמצעות API של מקור נתונים סטנדרטי.

המחבר כותב את הנתונים ל-BigQuery על ידי אחסון כל הנתונים בטבלה זמנית ב-Cloud Storage. לאחר מכן, המערכת מעתיקה את כל הנתונים אל BigQuery בפעולה אחת. המחבר מנסה למחוק את הקבצים הזמניים אחרי שפעולת הטעינה של BigQuery מצליחה, ושוב כשאפליקציית Spark מסתיימת. אם העבודה נכשלת, מסירים את כל הקבצים הזמניים שנותרו ב-Cloud Storage. בדרך כלל, קבצים זמניים של BigQuery נמצאים בתיקייה gs://[bucket]/.spark-bigquery-[jobid]-[UUID].

הגדרת חיוב

כברירת מחדל, הפרויקט שמשויך לפרטי הכניסה או לחשבון השירות מחויב על השימוש ב-API. כדי לחייב פרויקט אחר, מגדירים את ההגדרה הבאה: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").

אפשר גם להוסיף אותו לפעולת קריאה או כתיבה, באופן הבא: .option("parentProject", "<BILLED-GCP-PROJECT>").

הרצת הקוד

לפני שמריצים את הדוגמה הזו, צריך ליצור מערך נתונים בשם wordcount_dataset או לשנות את מערך הנתונים של הפלט בקוד למערך נתונים קיים ב-BigQuery בפרויקטGoogle Cloud .

משתמשים בפקודה bq כדי ליצור את wordcount_dataset:

bq mk wordcount_dataset

משתמשים בפקודה Google Cloud CLI כדי ליצור קטגוריה של Cloud Storage, שתשמש לייצוא ל-BigQuery:

gcloud storage buckets create gs://[bucket]

Scala

  1. בודקים את הקוד ומחליפים את placeholder‏ [bucket] בקטגוריה של Cloud Storage שיצרתם קודם.
    /*
     * Remove comment if you are not running in spark-shell.
     *
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder()
      .appName("spark-bigquery-demo")
      .getOrCreate()
    */
    
    // Use the Cloud Storage bucket for temporary BigQuery export data used
    // by the connector.
    val bucket = "[bucket]"
    spark.conf.set("temporaryGcsBucket", bucket)
    
    // Load data in from BigQuery. See
    // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties
    // for option information.
    val wordsDF =
      spark.read.bigquery("bigquery-public-data:samples.shakespeare")
      .cache()
    
    wordsDF.createOrReplaceTempView("words")
    
    // Perform word count.
    val wordCountDF = spark.sql(
      "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word")
    wordCountDF.show()
    wordCountDF.printSchema()
    
    // Saving the data to BigQuery.
    (wordCountDF.write.format("bigquery")
      .save("wordcount_dataset.wordcount_output"))
  2. הרצת הקוד באשכול
    1. משתמשים ב-SSH כדי להתחבר לצומת הראשי של אשכול Managed Service for Apache Spark master node
      1. נכנסים לדף Managed Service for Apache Spark Clusters במסוף Google Cloud ולוחצים על שם האשכול הדף Dataproc clusters במסוף Cloud..
      2. בדף >פרטי האשכול, בוחרים בכרטיסייה 'מכונות וירטואליות'. אחר כך לוחצים על SSH משמאל לשם של הצומת הראשי של האשכול> הדף &#39;פרטי אשכול Dataproc&#39; ב-Cloud Console.
        חלון דפדפן נפתח בספריית הבית שלכם בצומת הראשי
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. יוצרים את wordcount.scala באמצעות עורך הטקסט vi,‏ vim או nano שהותקן מראש, ואז מדביקים את קוד Scala מתוך רשימת קוד Scala.
      nano wordcount.scala
        
    3. מפעילים את spark-shell REPL.
      $ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
      ...
      Using Scala version ...
      Type in expressions to have them evaluated.
      Type :help for more information.
      ...
      Spark context available as sc.
      ...
      SQL context available as sqlContext.
      scala>
      
    4. מריצים את wordcount.scala באמצעות הפקודה :load wordcount.scala כדי ליצור את הטבלה wordcount_output ב-BigQuery. ברשימת הפלט מוצגות 20 שורות מתוך הפלט של ספירת המילים.
      :load wordcount.scala
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      כדי לראות תצוגה מקדימה של טבלת הפלט, פותחים את הדף BigQuery, בוחרים את הטבלה wordcount_output ולוחצים על תצוגה מקדימה. תצוגה מקדימה של הטבלה בדף BigQuery Explorer ב-מסוף Cloud.

PySpark

  1. בודקים את הקוד ומחליפים את placeholder‏ [bucket] בקטגוריה של Cloud Storage שיצרתם קודם.
    #!/usr/bin/env python
    
    """BigQuery I/O PySpark example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "[bucket]"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .load('bigquery-public-data:samples.shakespeare') \
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Save the data to BigQuery
    word_count.write.format('bigquery') \
      .save('wordcount_dataset.wordcount_output')
  2. הפעלת הקוד באשכול
    1. שימוש ב-SSH כדי להתחבר לצומת הראשי של אשכול Managed Service for Apache Spark
      1. נכנסים לדף Managed Service for Apache Spark Clusters במסוף Google Cloud ולוחצים על שם האשכול הדף Clusters במסוף Cloud..
      2. בדף פרטי האשכול, בוחרים בכרטיסייה 'מכונות וירטואליות'. אחר כך, לוחצים על SSH משמאל לשם של הצומת הראשי של האשכול בדף פרטי האשכול במסוף Cloud, בוחרים באפשרות SSH בשורה של שם האשכול.
        חלון דפדפן נפתח בספריית הבית שלכם בצומת הראשי
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. יוצרים wordcount.py באמצעות עורך הטקסט vi,‏ vim או nano שהותקן מראש, ואז מדביקים את קוד PySpark מתוך רשימת קוד PySpark.
      nano wordcount.py
      
    3. מריצים את wordcount עם spark-submit כדי ליצור את הטבלה wordcount_output ב-BigQuery. ברשימת הפלט מוצגות 20 שורות מפלט ספירת המילים.
      spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      כדי לראות תצוגה מקדימה של טבלת הפלט, פותחים את הדף BigQuery, בוחרים את הטבלה wordcount_output ולוחצים על תצוגה מקדימה. תצוגה מקדימה של הטבלה בדף BigQuery Explorer ב-מסוף Cloud.

טיפים לפתרון בעיות

אפשר לבדוק את יומני העבודות ב-Cloud Logging ובכלי לבדיקת עבודות ב-BigQuery כדי לפתור בעיות בעבודות Spark שמשתמשות במחבר BigQuery.

  • יומני מנהלי ההתקנים של Managed Service for Apache Spark מכילים רשומה של BigQueryClient עם מטא-נתונים של BigQuery שכוללים את jobId:

    ClassNotFoundException INFO BigQueryClient:.. jobId: JobId{project=PROJECT_ID, job=JOB_ID, location=LOCATION}
    
  • משימות BigQuery מכילות תוויות Managed Service for Apache Spark_job_id ו-Managed Service for Apache Spark_job_uuid:

    • רישום ביומן:
      protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.labels.dataproc_job_id="JOB_ID"
      protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.labels.dataproc_job_uuid="JOB_UUID"
      protoPayload.serviceData.jobCompletedEvent.job.jobName.jobId="JOB_NAME"
      
    • ‫BigQuery Jobs Explorer: לוחצים על מזהה משימה כדי לראות את פרטי המשימה בקטע Labels (תוויות) ב-Job information (פרטי המשימה).

המאמרים הבאים