שימוש בקטלוג של זמן הריצה של Lakehouse עם טבלאות ב-BigQuery

במאמר הזה מובאת דוגמה לשימוש בקטלוג זמן הריצה של Lakehouse עם טבלאות BigQuery ועם Managed Service for Apache Spark.

באמצעות קטלוג זמן הריצה של Lakehouse, אתם יכולים ליצור ולהשתמש בטבלאות סטנדרטיות (מובנות), בטבלאות Apache Iceberg מנוהלות של BigQuery ובטבלאות Apache Iceberg חיצוניות מ-BigQuery.

לפני שמתחילים

  1. מפעילים את החיוב בפרויקט Google Cloud . כך בודקים אם החיוב מופעל בפרויקט
  2. מפעילים את ממשקי ה-API של BigQuery ו-Dataproc.

    הפעלת ממשקי ה-API

התפקידים הנדרשים

כדי לקבל את ההרשאות שדרושות לשימוש ב-Managed Service for Apache Spark עם קטלוג זמן הריצה של Lakehouse כמאגר מטא-נתונים, צריך לבקש מהאדמין להקצות לכם את תפקידי ה-IAM הבאים:

להסבר על מתן תפקידים, ראו איך מנהלים את הגישה ברמת הפרויקט, התיקייה והארגון.

יכול להיות שאפשר לקבל את ההרשאות הנדרשות גם באמצעות תפקידים בהתאמה אישית או תפקידים מוגדרים מראש.

קישור לטבלה

  1. יוצרים מערך נתונים במסוף Google Cloud .

    CREATE SCHEMA `PROJECT_ID`.DATASET_NAME;

    מחליפים את מה שכתוב בשדות הבאים:

    • PROJECT_ID: מזהה הפרויקט ב- Google Cloud שבו רוצים ליצור את מערך הנתונים.
    • DATASET_NAME: שם למערך הנתונים.
  2. יוצרים קישור למשאבים ב-Cloud.

  3. יוצרים טבלה רגילה ב-BigQuery.

    CREATE TABLE `PROJECT_ID`.DATASET_NAME.TABLE_NAME (name STRING,id INT64);

    מחליפים את מה שכתוב בשדות הבאים:

    • TABLE_NAME: שם לטבלה.
  4. הכנסת נתונים לטבלה ב-BigQuery הסטנדרטית.

    INSERT INTO `PROJECT_ID`.DATASET_NAME.TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);
  5. יוצרים טבלה מנוהלת של Apache Iceberg ב-BigQuery.

    לדוגמה, כדי ליצור טבלה, מריצים את ההצהרה CREATE הבאה.

    CREATE TABLE `PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME(
    name STRING,id INT64
    )
    WITH CONNECTION `CONNECTION_NAME`
    OPTIONS (
    file_format = 'PARQUET',
    table_format = 'ICEBERG',
    storage_uri = 'STORAGE_URI');

    מחליפים את מה שכתוב בשדות הבאים:

    • ICEBERG_TABLE_NAME: שם לטבלת Apache Iceberg המנוהלת. לדוגמה, iceberg_managed_table.
    • CONNECTION_NAME: השם של החיבור. יצרתם את זה בשלב הקודם. לדוגמה, myproject.us.myconnection.
    • STORAGE_URI: ‏URI מלא של Cloud Storage. לדוגמה, gs://mybucket/table.
  6. הכנסת נתונים לטבלת Apache Iceberg מנוהלת ב-BigQuery.

    INSERT INTO `PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);
  7. יוצרים טבלת Apache Iceberg חיצונית.

    לדוגמה, כדי ליצור טבלת Apache Iceberg חיצונית, מריצים את ההצהרה הבאה של CREATE.

    CREATE OR REPLACE EXTERNAL TABLE  `PROJECT_ID`.DATASET_NAME.READONLY_ICEBERG_TABLE_NAME
    WITH CONNECTION `CONNECTION_NAME`
    OPTIONS (
      format = 'ICEBERG',
      uris =
        ['BUCKET_PATH'],
      require_partition_filter = FALSE);

    מחליפים את מה שכתוב בשדות הבאים:

    • READONLY_ICEBERG_TABLE_NAME: שם לטבלה לקריאה בלבד.
    • BUCKET_PATH: הנתיב לקטגוריית Cloud Storage שמכילה את הנתונים של הטבלה החיצונית, בפורמט ['gs://bucket_name/[folder_name/]file_name'].
  8. מ-Apache Spark, מריצים שאילתה בטבלה רגילה, בטבלה מנוהלת של Apache Iceberg ב-BigQuery ובטבלה חיצונית של Apache Iceberg.

    from pyspark.sql import SparkSession
    
    # Create a spark session
    spark = SparkSession.builder \
    .appName("Lakehouse runtime catalog Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    spark.conf.set("viewsEnabled","true")
    
    # Use the Lakehouse runtime catalog
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("USE NAMESPACE DATASET_NAME;")
    
    # Configure spark for temp results
    spark.sql("CREATE namespace if not exists MATERIALIZATION_NAMESPACE");
    spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE")
    
    # List the tables in the dataset
    df = spark.sql("SHOW TABLES;")
    df.show();
    
    # Query the tables
    sql = """SELECT * FROM DATASET_NAME.TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()
    
    sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()
    
    sql = """SELECT * FROM DATASET_NAME.READONLY_ICEBERG_TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()

    מחליפים את מה שכתוב בשדות הבאים:

    • WAREHOUSE_DIRECTORY: ה-URI של תיקיית Cloud Storage שמקושרת לטבלת Apache Iceberg המנוהלת ב-BigQuery ולטבלת Apache Iceberg החיצונית.
    • CATALOG_NAME: השם של הקטלוג שבו אתם משתמשים.
    • MATERIALIZATION_NAMESPACE: מרחב השמות לאחסון תוצאות זמניות.
  9. מריצים את סקריפט Apache Spark באמצעות Managed Service for Apache Spark.

    gcloud dataproc batches submit pyspark SCRIPT_PATH \
      --version=2.2 \
      --project=PROJECT_ID \
      --region=REGION \
      --deps-bucket=YOUR_BUCKET \

    מחליפים את מה שכתוב בשדות הבאים:

    • SCRIPT_PATH: הנתיב לסקריפט שבו נעשה שימוש במשימת האצווה.
    • PROJECT_ID: מזהה Google Cloud הפרויקט שבו תופעל משימה באצווה.
    • REGION: האזור שבו עומס העבודה פועל.
    • YOUR_BUCKET: המיקום של קטגוריית Cloud Storage להעלאת התלויות של עומס העבודה. אין צורך להוסיף את הקידומת gs:// של ה-URI של הדלי. אפשר לציין את נתיב הקטגוריה או את שם הקטגוריה, לדוגמה, mybucketname1.

המאמרים הבאים