שליחת שאילתות לטבלאות ב-BigQuery
במאמר הזה מוסבר איך אפשר להשתמש ב-Spark SQL וב-Spark DataFrame API בעומסי עבודה של Managed Service for Apache Spark כדי לשלוח שאילתות לטבלאות BigQuery.
לפני שמתחילים
מפעילים את ממשקי ה-API, ואם צריך, מעניקים תפקידים של ניהול זהויות והרשאות גישה.
הפעלת ממשקי ה-API
- נכנסים לחשבון Google Cloud . אם אתם משתמשים חדשים ב- Google Cloud, צרו חשבון כדי שתוכלו להעריך את הביצועים של המוצרים שלנו בתרחישים מהעולם האמיתי. לקוחות חדשים מקבלים בחינם גם קרדיט בשווי 300$ להרצה, לבדיקה ולפריסה של עומסי העבודה.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc and BigQuery APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc and BigQuery APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.
הענקת תפקידים בניהול הזהויות והרשאות הגישה
כדי להריץ את הדוגמאות בדף הזה, צריך להעניק הרשאות תפקיד ל-Managed Service for Apache Spark ול-BigQuery. יכול להיות שהתפקידים האלה כבר הוקצו, בהתאם למדיניות הארגון. כדי לבדוק את התפקידים שהוקצו, ראו האם צריך להקצות תפקידים?.
תפקידים ב-Managed Service for Apache Spark
כברירת מחדל, הפעלות של אצוות וסשנים מתבצעות באמצעות חשבון השירות שמוגדר כברירת מחדל ב-Compute Engine, אלא אם מצוין חשבון שירות מותאם אישית לעומס העבודה או לסשן.
התפקיד 'משתמש בחשבון שירות'
כדי לקבל את ההרשאות שנדרשות לשליחת עומס עבודה של אצווה, צריך לבקש מהאדמין להקצות לכם ב-IAM את התפקיד משתמש בחשבון שירות (roles/iam.serviceAccountUser) בחשבון השירות שמוגדר כברירת מחדל ב-Compute Engine.
כדי לקרוא הסבר על מתן תפקידים, ראו איך מנהלים את הגישה ברמת הפרויקט, התיקייה והארגון.
יכול להיות שאפשר לקבל את ההרשאות הנדרשות גם באמצעות תפקידים בהתאמה אישית או תפקידים מוגדרים מראש.
תפקיד Dataproc Worker
כדי לוודא שלחשבון השירות שמוגדר כברירת מחדל ב-Compute Engine יש את ההרשאות שנדרשות לשליחת עומס עבודה של Batch, צריך לבקש מהאדמין להקצות לחשבון השירות שמוגדר כברירת מחדל ב-Compute Engine את תפקיד ה-IAM Dataproc Worker (roles/dataproc.worker) בפרויקט.
יכול להיות שהאדמין גם יוכל לתת לחשבון השירות שמוגדר כברירת מחדל ב-Compute Engine את ההרשאות שנדרשות באמצעות תפקידים בהתאמה אישית או תפקידים מוגדרים מראש אחרים.
תפקידים ב-BigQuery
לחשבון השירות שמשמש להפעלת עומס עבודה של אצווה או סשן אינטראקטיבי של Managed Service for Apache Spark, צריך להקצות את תפקידי ה-IAM הבאים במשאבים הבאים:
BigQuery Data Viewer (
roles/bigquery.dataViewer) כדי לקרוא נתונים מטבלאות, באופן הבא:- קריאה מ-bigquery.DATASET_ID.SOURCE_TABLE בדוגמאות של Spark SQL SELECT ו-INSERT INTO.
- קריאה מ-INFORMATION_SCHEMA בדוגמה של DataFrame API.
BigQuery User (
roles/bigquery.user) כדי לאפשר ל-Spark להריץ משימות שמתקשרות עם BigQuery.BigQuery Data Editor (
roles/bigquery.dataEditor) כדי לכתוב נתונים או מטא-נתונים, באופן הבא:- בדוגמה של Spark SQL INSERT INTO, כדי לכתוב אל bigquery.DATASET_ID.DESTINATION_TABLE.
- בדוגמה של DataFrame API לשאילתת INFORMATION_SCHEMA, נדרש התפקיד הזה ב-DATASET_ID שצוין ב-
.option('materializationDataset', ...)כדי לאפשר למחבר ליצור טבלאות זמניות לתוצאות.
שליחת עומס עבודה באצווה של Spark
אפשר להשתמש במסוף, ב-Google Cloud CLI או ב-API של Managed Service for Apache Spark כדי לשלוח עומס עבודה של אצווה ב-Managed Service for Apache Spark. Google Cloud
שימוש ב-Spark SQL
אפשר להשתמש בקטלוג Spark BigQuery כדי להריץ שאילתות על טבלאות BigQuery רגילות ישירות מתוך עומסי עבודה של אצווה או מתוך סשנים אינטראקטיביים. השיטה הזו מאפשרת להשתמש בתחביר סטנדרטי של GoogleSQL כדי ליצור אינטראקציה עם נתוני BigQuery במשימות spark-sql בלי לכתוב קוד PySpark או ליצור תצוגות זמניות באמצעות DataFrame API.
הגדרת הקטלוג של BigQuery
כדי להפעיל את קטלוג BigQuery, צריך לספק את מאפייני Spark הבאים לעומס העבודה של Spark SQL או לסשן האינטראקטיבי:
-
dataproc.sparkBqConnector.version=CONNECTOR_VERSION: מציין את הגרסה של מחבר Spark BigQuery. -
spark.sql.catalog.bigquery=com.google.cloud.spark.bigquery.BigQueryCatalog: (אופציונלי) רישום קטלוגbigqueryכקטלוג Spark SQL.
דוגמה ל-Google Cloud CLI:
gcloud dataproc batches submit spark-sql \
--project=PROJECT_ID \
--region=REGION \
--version=RUNTIME_VERSION \
--subnet=SUBNET \
--service-account=SERVICE_ACCOUNT \
--properties="dataproc.sparkBqConnector.version=CONNECTOR_VERSION,spark.sql.catalog.bigquery=com.google.cloud.spark.bigquery.BigQueryCatalog" \
gs://BUCKET/my_query.sql
מחליפים את מה שכתוב בשדות הבאים:
-
PROJECT_ID: מזהה הפרויקט. מזהי הפרויקטים מופיעים בקטע Project info בלוח הבקרה של מסוף Google Cloud . -
REGION: האזור שבו יופעל ה-Batch -
RUNTIME_VERSION: אופציונלי. גרסת זמן הריצה של Managed Service for Apache Spark. אם לא מציינים גרסה, המערכת בוחרת את גרסת זמן הריצה שמוגדרת כברירת מחדל. -
CONNECTOR_VERSION: גרסת Spark BigQuery connector. כדי למצוא גרסת מחבר שתואמת ל-RUNTIME_VERSION, אפשר לעיין במאמר גרסאות זמן הריצה של Managed Service for Apache Spark. אם המחבר לא מותקן מראש, אפשר למצוא גרסאות זמינות בדף הגרסאות של GitHub. -
SUBNET: אופציונלי. רשת משנה לשימוש בעומס העבודה של האצווה. אם לא מציינים רשת משנה, המערכת משתמשת בdefault. -
SERVICE_ACCOUNT: אופציונלי. חשבון השירות שדרכו תופעל משימת האצווה. אם לא מציינים חשבון שירות, המערכת משתמשת בחשבון השירות שמוגדר כברירת מחדל ב-Compute Engine. -
BUCKET: קטגוריית Cloud Storage שמכילה את קובץ ה-SQL.
שליחת שאילתות לטבלאות ב-BigQuery
אחרי שמגדירים את הקטלוג, אפשר להפנות לטבלאות BigQuery בסקריפט SQL באמצעות הפורמט הבא: bigquery.DATASET_ID.TABLE_ID.
דוגמה לשאילתת SQL:
-- Query data from a BigQuery table.
SELECT
column_a,
SUM(column_b)
FROM
bigquery.DATASET_ID.SOURCE_TABLE
WHERE
partition_date = CURRENT_DATE()
GROUP BY column_a;
-- Insert results into another BigQuery table.
INSERT INTO bigquery.DATASET_ID.DESTINATION_TABLE
SELECT column_a, column_b
FROM bigquery.DATASET_ID.SOURCE_TABLE
WHERE column_c = 'some_value';
מחליפים את מה שכתוב בשדות הבאים:
-
DATASET_ID: מזהה קבוצת נתונים ב-BigQuery. -
SOURCE_TABLE: המזהה של הטבלה שרוצים לשלוח אליה שאילתה. -
DESTINATION_TABLE: המזהה של הטבלה שאליה רוצים להוסיף נתונים.
שימוש ב-DataFrame API
כדי לגשת לתצוגות של INFORMATION_SCHEMA, צריך להשתמש ב-DataFrame API.
כדי לשלוח שאילתה ל-
INFORMATION_SCHEMA:- מגדירים את
spark.conf.set('viewsEnabled', 'true'). - צריך לספק
.option('materializationDataset', 'DATASET_ID')כדי שהמחבר יוכל לכתוב תוצאות זמניות.
- מגדירים את
דוגמה לשאילתת PySpark:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('BigQuery Info Schema Test').getOrCreate()
# Required for INFORMATION_SCHEMA.
spark.conf.set('viewsEnabled', 'true')
# Query INFORMATION_SCHEMA.TABLES.
info_schema_df = spark.read.format('bigquery') \
.option('project', 'PROJECT_ID') \
.option('materializationDataset', 'DATASET_ID') \
.load(f'SELECT table_name, creation_time FROM `PROJECT_ID.DATASET_ID.INFORMATION_SCHEMA.TABLES`')
info_schema_df.show(5, truncate=False)
מחליפים את מה שכתוב בשדות הבאים:
-
PROJECT_ID: מזהה הפרויקט. מזהי הפרויקטים מופיעים בקטע Project info בלוח הבקרה של מסוף Google Cloud . -
DATASET_ID: מזהה קבוצת הנתונים ב-BigQuery שבה מחבר SparkvBigQuery יכול לכתוב נתונים זמניים.
במאמר שליחת עומס עבודה של אצווה לחישוב מילים ב-PySpark יש דוגמה ל-PySpark שקוראת נתונים מטבלה רגילה ב-BigQuery, ואז כותבת את התוצאות לטבלת פלט.