אפשר להשתמש ב-spark-bigquery-connector עם Apache Spark כדי לקרוא ולכתוב נתונים מ-BigQuery ואליו.
במדריך הזה מוצגת אפליקציית PySpark שמשתמשת ב-spark-bigquery-connector.
שימוש במחבר BigQuery עם עומס העבודה
במאמר גרסאות של זמן ריצה של Serverless ל-Apache Spark מוסבר איך אפשר לדעת איזו גרסה של מחבר BigQuery מותקנת בגרסת זמן הריצה של עומס העבודה של אצווה. אם המחבר לא מופיע ברשימה, אפשר לעבור לקטע הבא כדי לקבל הוראות להוספת המחבר לאפליקציות.
איך משתמשים במחבר עם Spark runtime גרסה 2.0
מחבר BigQuery לא מותקן בגרסה 2.0 של Spark runtime. כשמשתמשים ב-Spark runtime גרסה 2.0, אפשר להפוך את המחבר לזמין לאפליקציה באחת מהדרכים הבאות:
- משתמשים בפרמטר
jarsכדי להפנות לקובץ jar של מחבר כששולחים את עומס העבודה של Google Cloud Serverless for Apache Spark batch. בדוגמה הבאה מצוין קובץ jar של מחבר (רשימת קובצי ה-jar של המחברים הזמינים מופיעה במאגר GoogleCloudDataproc/spark-bigquery-connector ב-GitHub).- דוגמה ל-Google Cloud CLI:
gcloud dataproc batches submit pyspark \ --region=region \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-version.jar \ ... other args
- דוגמה ל-Google Cloud CLI:
- כוללים את קובץ ה-jar של המחבר באפליקציית Spark כתלות (ראו קומפילציה מול המחבר)
חישוב העלויות
במדריך הזה נעשה שימוש ברכיבים של Google Cloudשחלים עליהם חיובים, כולל:
- Serverless (בלי שרת) ל-Apache Spark
- BigQuery
- Cloud Storage
אפשר להשתמש במחשבון עלויות כדי ליצור הערכת עלויות בהתאם לשימוש החזוי.
BigQuery I/O
בדוגמה הזו, הנתונים נקראים מ-BigQuery לתוך Spark DataFrame כדי לבצע ספירת מילים באמצעות API סטנדרטי של מקור נתונים.
המחבר כותב את הפלט של ספירת המילים ל-BigQuery באופן הבא:
העברת הנתונים למאגר זמני בקטגוריה של Cloud Storage
העתקת הנתונים בפעולה אחת מהקטגוריה שלכם ב-Cloud Storage אל BigQuery
מחיקת הקבצים הזמניים ב-Cloud Storage אחרי השלמת פעולת הטעינה ב-BigQuery (הקבצים הזמניים נמחקים גם אחרי סיום האפליקציה של Spark). אם המחיקה נכשלת, צריך למחוק קבצים זמניים לא רצויים ב-Cloud Storage, שבדרך כלל נמצאים בתיקייה
gs://YOUR_BUCKET/.spark-bigquery-JOB_ID-UUID.
הגדרת חיוב
כברירת מחדל, הפרויקט שמשויך לפרטי הכניסה או לחשבון השירות מחויב על השימוש ב-API. כדי לחייב פרויקט אחר, מגדירים את ההגדרה הבאה: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").
אפשר גם להוסיף פעולת קריאה או כתיבה, באופן הבא:
.option("parentProject", "<BILLED-GCP-PROJECT>").
הגשת עומס עבודה של ספירת מילים ב-PySpark
הרצת עומס עבודה של אצווה ב-Spark שסופר את מספר המילים במערך נתונים ציבורי.
- פותחים טרמינל מקומי או Cloud Shell.
- יוצרים את
wordcount_datasetבאמצעות כלי שורת הפקודה bq בטרמינל מקומי או ב-Cloud Shell.bq mk wordcount_dataset
- יוצרים קטגוריה של Cloud Storage באמצעות Google Cloud CLI.
מחליפים אתgcloud storage buckets create gs://YOUR_BUCKET
YOUR_BUCKETבשם של קטגוריית Cloud Storage שיצרתם. - יוצרים את הקובץ
wordcount.pyבאופן מקומי בכלי לעריכת טקסט על ידי העתקת קוד PySpark הבא.#!/usr/bin/python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "YOUR_BUCKET" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .option('table', 'bigquery-public-data:samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Saving the data to BigQuery word_count.write.format('bigquery') \ .option('table', 'wordcount_dataset.wordcount_output') \ .save()
- שליחת עומס העבודה של קבוצת PySpark:
פלט לדוגמה של מסוף:gcloud dataproc batches submit pyspark wordcount.py \ --region=REGION \ --deps-bucket=YOUR_BUCKET
... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
כדי לראות תצוגה מקדימה של טבלת הפלט במסוף Google Cloud , פותחים את הדף BigQuery בפרויקט, בוחרים את הטבלהwordcount_outputולוחצים על תצוגה מקדימה.
למידע נוסף
- BigQuery Storage & Spark SQL - Python
- יצירת קובץ הגדרת טבלה למקור נתונים חיצוני
- שימוש בנתונים שחולקו למחיצות באופן חיצוני