במאמר הזה מובאת דוגמה לשימוש בקטלוג זמן הריצה של Lakehouse עם טבלאות BigQuery ועם Managed Service for Apache Spark.
באמצעות קטלוג זמן הריצה של Lakehouse, אתם יכולים ליצור ולהשתמש בטבלאות סטנדרטיות (מובנות), בטבלאות Apache Iceberg מנוהלות של BigQuery ובטבלאות Apache Iceberg חיצוניות מ-BigQuery.
לפני שמתחילים
- מפעילים את החיוב בפרויקט Google Cloud . כך בודקים אם החיוב מופעל בפרויקט
מפעילים את ממשקי ה-API של BigQuery ו-Dataproc.
התפקידים הנדרשים
כדי לקבל את ההרשאות שדרושות לשימוש ב-Managed Service for Apache Spark עם קטלוג זמן הריצה של Lakehouse כמאגר מטא-נתונים, צריך לבקש מהאדמין להקצות לכם את תפקידי ה-IAM הבאים:
-
יוצרים טבלאות קטלוג של Lakehouse בזמן ריצה ב-Apache Spark:
- Dataproc Worker (
roles/dataproc.worker) בחשבון השירות של Managed Service for Apache Spark בפרויקט - BigQuery Data Editor (
roles/bigquery.dataEditor) בחשבון שירות Managed Service for Apache Spark בפרויקט - משתמש באובייקטים באחסון (
roles/storage.objectUser) בחשבון השירות של Managed Service for Apache Spark בפרויקט
- Dataproc Worker (
-
שליחת שאילתות על טבלאות קטלוג של Lakehouse בזמן ריצה ב-BigQuery:
- BigQuery Data Viewer (
roles/bigquery.dataViewer) בפרויקט - BigQuery User (
roles/bigquery.user) בפרויקט - צפייה באובייקטים באחסון (
roles/storage.objectViewer) בפרויקט
- BigQuery Data Viewer (
להסבר על מתן תפקידים, ראו איך מנהלים את הגישה ברמת הפרויקט, התיקייה והארגון.
יכול להיות שאפשר לקבל את ההרשאות הנדרשות גם באמצעות תפקידים בהתאמה אישית או תפקידים מוגדרים מראש.
קישור לטבלה
יוצרים מערך נתונים במסוף Google Cloud .
CREATE SCHEMA `
PROJECT_ID`.DATASET_NAME;מחליפים את מה שכתוב בשדות הבאים:
-
PROJECT_ID: מזהה הפרויקט ב- Google Cloud שבו רוצים ליצור את מערך הנתונים. -
DATASET_NAME: שם למערך הנתונים.
-
יוצרים קישור למשאבים ב-Cloud.
יוצרים טבלה רגילה ב-BigQuery.
CREATE TABLE `
PROJECT_ID`.DATASET_NAME.TABLE_NAME (name STRING,id INT64);מחליפים את מה שכתוב בשדות הבאים:
-
TABLE_NAME: שם לטבלה.
-
הכנסת נתונים לטבלה ב-BigQuery הסטנדרטית.
INSERT INTO `
PROJECT_ID`.DATASET_NAME.TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);יוצרים טבלה מנוהלת של Apache Iceberg ב-BigQuery.
לדוגמה, כדי ליצור טבלה, מריצים את ההצהרה
CREATEהבאה.CREATE TABLE `
PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME( name STRING,id INT64 ) WITH CONNECTION `CONNECTION_NAME` OPTIONS ( file_format = 'PARQUET', table_format = 'ICEBERG', storage_uri = 'STORAGE_URI');מחליפים את מה שכתוב בשדות הבאים:
-
ICEBERG_TABLE_NAME: שם לטבלת Apache Iceberg המנוהלת. לדוגמה,iceberg_managed_table. -
CONNECTION_NAME: השם של החיבור. יצרתם את זה בשלב הקודם. לדוגמה,myproject.us.myconnection. -
STORAGE_URI: URI מלא של Cloud Storage. לדוגמה,gs://mybucket/table.
-
הכנסת נתונים לטבלת Apache Iceberg מנוהלת ב-BigQuery.
INSERT INTO `
PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);יוצרים טבלת Apache Iceberg חיצונית.
לדוגמה, כדי ליצור טבלת Apache Iceberg חיצונית, מריצים את ההצהרה הבאה של
CREATE.CREATE OR REPLACE EXTERNAL TABLE `
PROJECT_ID`.DATASET_NAME.READONLY_ICEBERG_TABLE_NAME WITH CONNECTION `CONNECTION_NAME` OPTIONS ( format = 'ICEBERG', uris = ['BUCKET_PATH'], require_partition_filter = FALSE);מחליפים את מה שכתוב בשדות הבאים:
-
READONLY_ICEBERG_TABLE_NAME: שם לטבלה לקריאה בלבד. -
BUCKET_PATH: הנתיב לקטגוריית Cloud Storage שמכילה את הנתונים של הטבלה החיצונית, בפורמט['gs://bucket_name/[folder_name/]file_name'].
-
מ-Apache Spark, מריצים שאילתה בטבלה רגילה, בטבלה מנוהלת של Apache Iceberg ב-BigQuery ובטבלה חיצונית של Apache Iceberg.
from pyspark.sql import SparkSession # Create a spark session spark = SparkSession.builder \ .appName("Lakehouse runtime catalog Iceberg") \ .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \ .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \ .getOrCreate() spark.conf.set("viewsEnabled","true") # Use the Lakehouse runtime catalog spark.sql("USE `CATALOG_NAME`;") spark.sql("USE NAMESPACE DATASET_NAME;") # Configure spark for temp results spark.sql("CREATE namespace if not exists MATERIALIZATION_NAMESPACE"); spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE") # List the tables in the dataset df = spark.sql("SHOW TABLES;") df.show(); # Query the tables sql = """SELECT * FROM DATASET_NAME.TABLE_NAME""" df = spark.read.format("bigquery").load(sql) df.show() sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME""" df = spark.read.format("bigquery").load(sql) df.show() sql = """SELECT * FROM DATASET_NAME.READONLY_ICEBERG_TABLE_NAME""" df = spark.read.format("bigquery").load(sql) df.show()
מחליפים את מה שכתוב בשדות הבאים:
-
WAREHOUSE_DIRECTORY: ה-URI של תיקיית Cloud Storage שמקושרת לטבלת Apache Iceberg המנוהלת ב-BigQuery ולטבלת Apache Iceberg החיצונית. -
CATALOG_NAME: השם של הקטלוג שבו אתם משתמשים. -
MATERIALIZATION_NAMESPACE: מרחב השמות לאחסון תוצאות זמניות.
-
מריצים את סקריפט Apache Spark באמצעות Managed Service for Apache Spark.
gcloud dataproc batches submit pyspark SCRIPT_PATH \ --version=2.2 \ --project=PROJECT_ID \ --region=REGION \ --deps-bucket=YOUR_BUCKET \
מחליפים את מה שכתוב בשדות הבאים:
-
SCRIPT_PATH: הנתיב לסקריפט שבו נעשה שימוש במשימת האצווה. -
PROJECT_ID: מזהה Google Cloud הפרויקט שבו תופעל משימה באצווה. -
REGION: האזור שבו עומס העבודה פועל. -
YOUR_BUCKET: המיקום של קטגוריית Cloud Storage להעלאת התלויות של עומס העבודה. אין צורך להוסיף את הקידומתgs://של ה-URI של הדלי. אפשר לציין את נתיב הקטגוריה או את שם הקטגוריה, לדוגמה,mybucketname1.
-