שימוש במחבר BigQuery עם Google Cloud Serverless for Apache Spark

אפשר להשתמש ב-spark-bigquery-connector עם Apache Spark כדי לקרוא ולכתוב נתונים מ-BigQuery ואליו. במדריך הזה מוצגת אפליקציית PySpark שמשתמשת ב-spark-bigquery-connector.

שימוש במחבר BigQuery עם עומס העבודה

במאמר גרסאות של זמן ריצה של Serverless ל-Apache Spark מוסבר איך אפשר לדעת איזו גרסה של מחבר BigQuery מותקנת בגרסת זמן הריצה של עומס העבודה של אצווה. אם המחבר לא מופיע ברשימה, אפשר לעבור לקטע הבא כדי לקבל הוראות להוספת המחבר לאפליקציות.

איך משתמשים במחבר עם Spark runtime גרסה 2.0

מחבר BigQuery לא מותקן בגרסה 2.0 של Spark runtime. כשמשתמשים ב-Spark runtime גרסה 2.0, אפשר להפוך את המחבר לזמין לאפליקציה באחת מהדרכים הבאות:

  • משתמשים בפרמטר jars כדי להפנות לקובץ jar של מחבר כששולחים את עומס העבודה של Google Cloud Serverless for Apache Spark batch. בדוגמה הבאה מצוין קובץ jar של מחבר (רשימת קובצי ה-jar של המחברים הזמינים מופיעה במאגר GoogleCloudDataproc/spark-bigquery-connector ב-GitHub).
    • דוגמה ל-Google Cloud CLI:
      gcloud dataproc batches submit pyspark \
          --region=region \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-version.jar \
          ... other args
      
  • כוללים את קובץ ה-jar של המחבר באפליקציית Spark כתלות (ראו קומפילציה מול המחבר)

חישוב העלויות

במדריך הזה נעשה שימוש ברכיבים של Google Cloudשחלים עליהם חיובים, כולל:

  • ‫Serverless (בלי שרת) ל-Apache Spark
  • BigQuery
  • Cloud Storage

אפשר להשתמש במחשבון עלויות כדי ליצור הערכת עלויות בהתאם לשימוש החזוי.

משתמשים חדשים ב-Cloud Platform עשויים להיות זכאים לתקופת ניסיון בחינם.

BigQuery I/O

בדוגמה הזו, הנתונים נקראים מ-BigQuery לתוך Spark DataFrame כדי לבצע ספירת מילים באמצעות API סטנדרטי של מקור נתונים.

המחבר כותב את הפלט של ספירת המילים ל-BigQuery באופן הבא:

  1. העברת הנתונים למאגר זמני בקטגוריה של Cloud Storage

  2. העתקת הנתונים בפעולה אחת מהקטגוריה שלכם ב-Cloud Storage אל BigQuery

  3. מחיקת הקבצים הזמניים ב-Cloud Storage אחרי השלמת פעולת הטעינה ב-BigQuery (הקבצים הזמניים נמחקים גם אחרי סיום האפליקציה של Spark). אם המחיקה נכשלת, צריך למחוק קבצים זמניים לא רצויים ב-Cloud Storage, שבדרך כלל נמצאים בתיקייה gs://YOUR_BUCKET/.spark-bigquery-JOB_ID-UUID.

הגדרת חיוב

כברירת מחדל, הפרויקט שמשויך לפרטי הכניסה או לחשבון השירות מחויב על השימוש ב-API. כדי לחייב פרויקט אחר, מגדירים את ההגדרה הבאה: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").

אפשר גם להוסיף פעולת קריאה או כתיבה, באופן הבא: .option("parentProject", "<BILLED-GCP-PROJECT>").

הגשת עומס עבודה של ספירת מילים ב-PySpark

הרצת עומס עבודה של אצווה ב-Spark שסופר את מספר המילים במערך נתונים ציבורי.

  1. פותחים טרמינל מקומי או Cloud Shell.
  2. יוצרים את wordcount_dataset באמצעות כלי שורת הפקודה bq בטרמינל מקומי או ב-Cloud Shell.
    bq mk wordcount_dataset
    
  3. יוצרים קטגוריה של Cloud Storage באמצעות Google Cloud CLI.
    gcloud storage buckets create gs://YOUR_BUCKET
    
    מחליפים את YOUR_BUCKET בשם של קטגוריית Cloud Storage שיצרתם.
  4. יוצרים את הקובץ wordcount.py באופן מקומי בכלי לעריכת טקסט על ידי העתקת קוד PySpark הבא.
    #!/usr/bin/python
    """BigQuery I/O PySpark example."""
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "YOUR_BUCKET"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Saving the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
  5. שליחת עומס העבודה של קבוצת PySpark:
    gcloud dataproc batches submit pyspark wordcount.py \
        --region=REGION \
        --deps-bucket=YOUR_BUCKET
    
    פלט לדוגמה של מסוף:
    ...
    +---------+----------+
    |     word|word_count|
    +---------+----------+
    |     XVII|         2|
    |    spoil|        28|
    |    Drink|         7|
    |forgetful|         5|
    |   Cannot|        46|
    |    cures|        10|
    |   harder|        13|
    |  tresses|         3|
    |      few|        62|
    |  steel'd|         5|
    | tripping|         7|
    |   travel|        35|
    |   ransom|        55|
    |     hope|       366|
    |       By|       816|
    |     some|      1169|
    |    those|       508|
    |    still|       567|
    |      art|       893|
    |    feign|        10|
    +---------+----------+
    only showing top 20 rows
    
    root
     |-- word: string (nullable = false)
     |-- word_count: long (nullable = true)
    

    כדי לראות תצוגה מקדימה של טבלת הפלט במסוף Google Cloud , פותחים את הדף BigQuery בפרויקט, בוחרים את הטבלה wordcount_output ולוחצים על תצוגה מקדימה.

למידע נוסף