המחבר spark-bigquery-connector משמש עם Apache Spark לקריאה ולכתיבה של נתונים מ-BigQuery ואליו. המחבר משתמש ב-BigQuery Storage API כשקוראים נתונים מ-BigQuery.
במדריך הזה מוסבר על הזמינות של המחבר שהותקן מראש, ומוצגות הוראות להפיכת גרסה ספציפית של מחבר לזמינה לעבודות Spark. בדוגמה לקוד אפשר לראות איך להשתמש במחבר Spark BigQuery באפליקציית Spark.
שימוש במחבר שמותקן מראש
מחבר Spark BigQuery מותקן מראש וזמין למשימות Spark שמופעלות באשכולות Dataproc שנוצרו עם גרסאות אימג' 2.1 ואילך. גרסת המחבר שמותקנת מראש מופיעה בכל דף של גרסת תמונה. לדוגמה, בשורה BigQuery Connector בדף 2.2.x image release versions מוצגת גרסת המחבר שמותקנת בגרסאות האחרונות של תמונות 2.2.
הפיכת גרסה ספציפית של מחבר לזמינה לעבודות Spark
אם רוצים להשתמש בגרסה של מחבר ששונה מהגרסה שהותקנה מראש באשכול של גרסת תמונה 2.1 ומעלה, או אם רוצים להתקין את המחבר באשכול של גרסת תמונה שקודמת ל-2.1, צריך לפעול לפי ההוראות שבקטע הזה.
חשוב: גרסה spark-bigquery-connector צריכה להיות תואמת לגרסת תמונת האשכול של Dataproc. מטריצת התאימות של המחבר ל-Dataproc Image
אשכולות בגרסת תמונה 2.1 ואילך
כשיוצרים אשכול Dataproc עם גרסת תמונה 2.1 ואילך, מציינים את גרסת המחבר כמטא-נתונים של האשכול.
דוגמה ל-CLI של gcloud:
gcloud dataproc clusters create CLUSTER_NAME \ --region=REGION \ --image-version=2.2 \ --metadata=SPARK_BQ_CONNECTOR_VERSION or SPARK_BQ_CONNECTOR_URL\ other flags
הערות:
SPARK_BQ_CONNECTOR_VERSION: ציון גרסת מחבר. גרסאות של מחבר Spark BigQuery מפורטות בדף spark-bigquery-connector/releases ב-GitHub.
דוגמה:
--metadata=SPARK_BQ_CONNECTOR_VERSION=0.42.1
SPARK_BQ_CONNECTOR_URL: מציינים כתובת URL שמפנה לקובץ ה-JAR ב-Cloud Storage. אפשר לציין את כתובת ה-URL של מחבר שמופיע בעמודה link בקטע Downloading and Using the Connector ב-GitHub, או את הנתיב למיקום ב-Cloud Storage שבו שמרתם קובץ jar של מחבר מותאם אישית.
דוגמאות:
--metadata=SPARK_BQ_CONNECTOR_URL=gs://spark-lib/bigquery/spark-3.5-bigquery-0.42.1.jar --metadata=SPARK_BQ_CONNECTOR_URL=gs://PATH_TO_CUSTOM_JAR
2.0 וקבוצות של גרסאות קודמות של תמונות
אפשר להשתמש במחבר Spark BigQuery באפליקציה באחת מהדרכים הבאות:
כדי להתקין את spark-bigquery-connector בספריית ה-JAR של Spark בכל צומת, משתמשים בפעולת האתחול של מחברי Dataproc כשיוצרים את האשכול.
כששולחים את העבודה לאשכול באמצעות Google Cloud המסוף, ה-CLI של gcloud או Dataproc API, צריך לספק את כתובת ה-URL של קובץ ה-JAR של המחבר.
המסוף
משתמשים בפריט Jars files של משימת Spark בדף Submit a job ב-Dataproc.
gcloud
API
משתמשים בשדה
SparkJob.jarFileUris.איך מציינים את קובץ ה-JAR של המחבר כשמריצים משימות Spark באשכולות של גרסת תמונה שקודמת לגרסה 2.0
- מציינים את קובץ ה-JAR של המחבר על ידי החלפת המידע על גרסת המחבר ו-Scala במחרוזת ה-URI הבאה:
gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
- שימוש ב-Scala
2.12עם גרסאות של תמונות Dataproc1.5+ דוגמה ל-CLI של gcloud:gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \ -- job args
- שימוש ב-Scala
2.11עם גרסאות של תמונות Dataproc1.4וגרסאות קודמות: דוגמה ל-CLI של gcloud:gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \ -- job-args
- מציינים את קובץ ה-JAR של המחבר על ידי החלפת המידע על גרסת המחבר ו-Scala במחרוזת ה-URI הבאה:
כוללים את קובץ ה-jar של המחבר באפליקציית Scala או Java Spark כתלות (ראו קומפילציה מול המחבר).
חישוב העלויות
במסמך הזה משתמשים ברכיבים הבאים של Google Cloud, והשימוש בהם כרוך בתשלום:
- Dataproc
- BigQuery
- Cloud Storage
כדי ליצור הערכת עלויות בהתאם לשימוש החזוי, אפשר להשתמש במחשבון התמחור.
קריאה וכתיבה של נתונים מ-BigQuery ואליו
בדוגמה הזו, הנתונים נקראים מ-BigQuery לתוך Spark DataFrame כדי לבצע ספירת מילים באמצעות API סטנדרטי של מקור נתונים.
המחבר כותב את הנתונים ל-BigQuery אחרי שהוא מאחסן את כל הנתונים בטבלה זמנית ב-Cloud Storage. לאחר מכן, המערכת מעתיקה את כל הנתונים מ- ל-BigQuery בפעולה אחת. המחבר מנסה למחוק את הקבצים הזמניים אחרי שפעולת הטעינה של BigQuery מצליחה, ושוב כשאפליקציית Spark מסתיימת.
אם העבודה נכשלת, מסירים את כל הקבצים הזמניים שנותרו ב-Cloud Storage. בדרך כלל, קבצים זמניים של BigQuery נמצאים בתיקייה gs://[bucket]/.spark-bigquery-[jobid]-[UUID].
הגדרת חיוב
כברירת מחדל, הפרויקט שמשויך לפרטי הכניסה או לחשבון השירות מחויב על השימוש ב-API. כדי לחייב פרויקט אחר, מגדירים את ההגדרה הבאה: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").
אפשר גם להוסיף אותו לפעולת קריאה או כתיבה, באופן הבא:
.option("parentProject", "<BILLED-GCP-PROJECT>").
הרצת הקוד
לפני שמריצים את הדוגמה הזו, צריך ליצור מערך נתונים בשם wordcount_dataset או לשנות את מערך הנתונים של הפלט בקוד למערך נתונים קיים ב-BigQuery בפרויקטGoogle Cloud .
משתמשים בפקודה bq כדי ליצור את wordcount_dataset:
bq mk wordcount_dataset
משתמשים בפקודה Google Cloud CLI כדי ליצור קטגוריה של Cloud Storage, שתשמש לייצוא ל-BigQuery:
gcloud storage buckets create gs://[bucket]
Scala
- בודקים את הקוד ומחליפים את ה-placeholder [bucket] בקטגוריה של Cloud Storage שיצרתם קודם.
/* * Remove comment if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-bigquery-demo") .getOrCreate() */ // Use the Cloud Storage bucket for temporary BigQuery export data used // by the connector. val bucket = "[bucket]" spark.conf.set("temporaryGcsBucket", bucket) // Load data in from BigQuery. See // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties // for option information. val wordsDF = spark.read.bigquery("bigquery-public-data:samples.shakespeare") .cache() wordsDF.createOrReplaceTempView("words") // Perform word count. val wordCountDF = spark.sql( "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word") wordCountDF.show() wordCountDF.printSchema() // Saving the data to BigQuery. (wordCountDF.write.format("bigquery") .save("wordcount_dataset.wordcount_output"))
- הרצת הקוד באשכול
- משתמשים ב-SSH כדי להתחבר לצומת הראשי של אשכול Dataproc
- נכנסים לדף Dataproc Clusters במסוף Google Cloud ולוחצים על שם האשכול
.
- בדף >פרטי אשכול, בוחרים בכרטיסייה 'מכונות וירטואליות'. אחר כך לוחצים על
SSHמשמאל לשם של מאסטר האשכולות>
חלון דפדפן נפתח בספריית הבית בצומת הראשיConnected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- נכנסים לדף Dataproc Clusters במסוף Google Cloud ולוחצים על שם האשכול
- יוצרים את
wordcount.scalaבאמצעות עורך הטקסטvi,vimאוnanoשהותקן מראש, ואז מדביקים את קוד Scala מתוך רשימת קוד Scala.nano wordcount.scala
- מפעילים את
spark-shellREPL.$ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar ... Using Scala version ... Type in expressions to have them evaluated. Type :help for more information. ... Spark context available as sc. ... SQL context available as sqlContext. scala>
- מריצים את הפקודה
:load wordcount.scalaכדי ליצור את הטבלהwordcount_outputב-BigQuery. ברשימת הפלט מוצגות 20 שורות מהפלט של ספירת המילים. :load wordcount.scala ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
כדי לראות תצוגה מקדימה של טבלת הפלט, פותחים את הדףBigQuery, בוחרים את הטבלהwordcount_outputולוחצים על תצוגה מקדימה.
- משתמשים ב-SSH כדי להתחבר לצומת הראשי של אשכול Dataproc
PySpark
- בודקים את הקוד ומחליפים את ה-placeholder [bucket] בקטגוריה של Cloud Storage שיצרתם קודם.
#!/usr/bin/env python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "[bucket]" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .load('bigquery-public-data:samples.shakespeare') \ words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Save the data to BigQuery word_count.write.format('bigquery') \ .save('wordcount_dataset.wordcount_output')
- מריצים את הקוד באשכול
- משתמשים ב-SSH כדי להתחבר לצומת הראשי של אשכול Dataproc
- נכנסים לדף Dataproc Clusters במסוף Google Cloud ולוחצים על שם האשכול
.
- בדף פרטי האשכול, בוחרים בכרטיסייה 'מכונות וירטואליות'. אחר כך לוחצים על
SSHמשמאל לשם של מאסטר אשכולות (cluster master)
חלון דפדפן נפתח בספריית הבית שלכם במאסטר הראשיConnected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- נכנסים לדף Dataproc Clusters במסוף Google Cloud ולוחצים על שם האשכול
- יוצרים
wordcount.pyבאמצעות עורך הטקסטvi,vimאוnanoשמותקן מראש, ואז מדביקים את קוד PySpark מתוך רשימת קוד PySpark.nano wordcount.py
- מריצים את wordcount עם
spark-submitכדי ליצור את הטבלהwordcount_outputב-BigQuery. ברשימת הפלט מוצגות 20 שורות מפלט ספירת המילים. spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
כדי לראות תצוגה מקדימה של טבלת הפלט, פותחים אתBigQueryהדף, בוחרים את הטבלהwordcount_outputולוחצים על תצוגה מקדימה.
- משתמשים ב-SSH כדי להתחבר לצומת הראשי של אשכול Dataproc
טיפים לפתרון בעיות
אפשר לבדוק את יומני העבודות ב-Cloud Logging וב-BigQuery Jobs Explorer כדי לפתור בעיות בעבודות Spark שמשתמשות במחבר BigQuery.
יומני מנהל התקנים של Dataproc מכילים רשומה
BigQueryClientעם מטא-נתונים של BigQuery שכוללים אתjobId:ClassNotFoundException
INFO BigQueryClient:.. jobId: JobId{project=PROJECT_ID, job=JOB_ID, location=LOCATION} משימות BigQuery מכילות את התוויות
Dataproc_job_idו-Dataproc_job_uuid:- רישום ביומן:
protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.labels.dataproc_job_id="JOB_ID" protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.labels.dataproc_job_uuid="JOB_UUID" protoPayload.serviceData.jobCompletedEvent.job.jobName.jobId="JOB_NAME"
- BigQuery Jobs Explorer: לוחצים על מזהה משימה כדי לראות את פרטי המשימה בקטע Labels (תוויות) ב-Job information (פרטי המשימה).
- רישום ביומן:
המאמרים הבאים
- מידע נוסף זמין במאמר BigQuery Storage & Spark SQL - Python.
- איך יוצרים קובץ הגדרת טבלה למקור נתונים חיצוני
- איך שולחים שאילתות לנתונים שחולקו למחיצות באופן חיצוני
- טיפים לשיפור ביצועים של משימות Spark