תכונות נוספות של קטלוג זמן הריצה של Lakehouse

כדי להתאים אישית את ההגדרה של קטלוג זמן הריצה של Lakehouse, אפשר להשתמש בתכונות הנוספות הבאות:

  • פרוצדורות של Apache Spark Iceberg.
  • אפשרות הסינון לטבלאות שלא נתמכות.
  • שינוי ברירת המחדל של החיבור ל-BigQuery.
  • כללי מדיניות של בקרת גישה לטבלאות Iceberg בקטלוג של זמן הריצה של Lakehouse.

שימוש בפרוצדורות של Apache Iceberg Spark

כדי להשתמש בפרוצדורות של Apache Spark, צריך לכלול את תוספי Apache Iceberg SQL בהגדרות של Apache Spark. לדוגמה, אפשר ליצור הליך לחזרה למצב קודם.

שימוש ב-Apache Spark SQL אינטראקטיבי כדי לחזור למצב קודם

אתם יכולים להשתמש בהליך Apache Spark כדי ליצור טבלה, לשנות אותה ולהחזיר אותה למצב הקודם. לדוגמה:

  1. יוצרים טבלת Apache Spark:

    spark-sql \
       --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,BIGLAKE_ICEBERG_CATALOG_JAR \
       --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
       --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
       --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
       --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

    מחליפים את מה שכתוב בשדות הבאים:

    • BIGLAKE_ICEBERG_CATALOG_JAR: ה-URI של Cloud Storage של תוסף הקטלוג המותאם אישית של Apache Iceberg שבו רוצים להשתמש. בהתאם למספר הגרסה של Apache Iceberg, בוחרים באחת מהאפשרויות הבאות:
      • ‫Iceberg 1.9.1: gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.9.1-1.0.1.jar
      • ‫Iceberg 1.6.1: gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.2.jar
    • CATALOG_NAME: שם הקטלוג שמפנה לטבלת Apache Spark.
    • PROJECT_ID: מזהה Google Cloud הפרויקט.

    • WAREHOUSE_DIRECTORY: ה-URI של תיקיית Cloud Storage שבה מאוחסן מחסן הנתונים.

    USE `CATALOG_NAME`;
    CREATE NAMESPACE NAMESPACE_NAME;
    USE NAMESPACE NAMESPACE_NAME;
    CREATE TABLE NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';
    INSERT INTO NAMESPACE_NAME.TABLE_NAME VALUES (1, "first row");
    DESCRIBE EXTENDED TABLE_NAME;

    מחליפים את מה שכתוב בשדות הבאים:

    • NAMESPACE_NAME: השם של מרחב השמות שמפנה לטבלת Apache Spark.
    • TABLE_NAME: שם של טבלה שמפנה לטבלת Apache Spark.

    הפלט מכיל פרטים על הגדרת הטבלה:

    ...
    Table Properties [current-snapshot-id=1659239298328512231,format=iceberg/parquet,format-version=2,write.parquet.compression-codec=zstd]
    ...
  2. משנים שוב את הטבלה, ואז מחזירים אותה למצב הקודם שבו נוצר הסנאפשוט 1659239298328512231:

    ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);
    INSERT INTO TABLE_NAME VALUES (2, "second row", 2.5);
    SELECT * FROM TABLE_NAME;
    CALL CATALOG_NAME.system.set_current_snapshot('NAMESPACE_NAME.TABLE_NAME', SNAPSHOT_ID);
    SELECT * FROM TABLE_NAME;

    מחליפים את מה שכתוב בשדות הבאים:

    • SNAPSHOT_ID: המזהה של ה-snapshot שאליו רוצים לבצע שחזור.

    הפלט אמור להיראות כך:

    1 first row
    Time taken: 0.997 seconds, Fetched 1 row(s)

סינון טבלאות לא נתמכות מפונקציות של רשימת טבלאות

כשמשתמשים ב-Apache Spark SQL עם קטלוג זמן הריצה של Lakehouse, הפקודה SHOW TABLES מציגה את כל הטבלאות במרחב השמות שצוין, גם את אלה שלא תואמות ל-Apache Spark.

כדי להציג רק טבלאות נתמכות, מפעילים את האפשרות filter_unsupported_tables:

spark-sql
  --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,BIGLAKE_ICEBERG_CATALOG_JAR \
  --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
  --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
  --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
  --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
  --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
  --conf spark.sql.catalog.CATALOG_NAME.filter_unsupported_tables="true"

מחליפים את מה שכתוב בשדות הבאים:

  • BIGLAKE_ICEBERG_CATALOG_JAR: ה-URI של Cloud Storage של תוסף הקטלוג המותאם אישית של Apache Iceberg שבו רוצים להשתמש. בהתאם למספר הגרסה של Apache Iceberg, בוחרים באחת מהאפשרויות הבאות:
    • ‫Iceberg 1.9.1: gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.9.1-1.0.1.jar
    • ‫Iceberg 1.6.1: gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.2.jar
  • CATALOG_NAME: השם של קטלוג Apache Spark שבו רוצים להשתמש.
  • PROJECT_ID: המזהה של Google Cloud הפרויקט שבו רוצים להשתמש.
  • LOCATION: המיקום של המשאבים ב-BigQuery.
  • WAREHOUSE_DIRECTORY: תיקיית Cloud Storage שבה רוצים להשתמש כמחסן נתונים.

הגדרת שינוי של חיבור ל-BigQuery

אפשר להשתמש בחיבורים ל-BigQuery כדי לגשת לנתונים שמאוחסנים מחוץ ל-BigQuery, למשל ב-Cloud Storage.

כדי להגדיר שינוי של חיבור BigQuery שיספק גישה לקטגוריה של Cloud Storage, צריך לבצע את הפעולות הבאות:

  1. בפרויקט BigQuery, יוצרים חיבור חדש למשאב Cloud Storage. החיבור הזה מגדיר איך BigQuery ניגש לנתונים שלכם.

  2. מקצים למשתמש או לחשבון השירות שמקבלים גישה לנתונים את התפקיד roles/bigquery.connectionUser בחיבור.

    חשוב לוודא שמשאב החיבור נמצא באותו מיקום כמו משאבי היעד ב-BigQuery. מידע נוסף מופיע במאמר בנושא ניהול קישורים.

  3. מציינים את החיבור בטבלת Apache Iceberg באמצעות המאפיין bq_connection:

    CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY' TBLPROPERTIES ('bq_connection'='projects/PROJECT_ID/locations/LOCATION/connections/CONNECTION_ID');

    מחליפים את מה שכתוב בשדות הבאים:

    • TABLE_NAME: שם הטבלה שלכם ב-Apache Spark.
    • WAREHOUSE_DIRECTORY: ה-URI של קטגוריית Cloud Storage שבה מאוחסנים הנתונים.
    • PROJECT_ID: המזהה של Google Cloud הפרויקט שבו רוצים להשתמש.
    • LOCATION: המיקום של החיבור.
    • CONNECTION_ID: מזהה החיבור.

הגדרת מדיניות בקרת גישה

אפשר להגדיר ב-BigQuery כללי מדיניות של בקרת גישה כדי להפעיל בקרת גישה פרטנית (FGAC) בטבלת Iceberg מנוהלת. אפשר להגדיר מדיניות של בקרת גישה רק בטבלאות שמשתמשות בהחלפה של חיבור BigQuery. אפשר להגדיר את המדיניות הזו בדרכים הבאות:

אחרי שמגדירים את מדיניות FGAC, אפשר להריץ שאילתה בטבלה מ-Apache Spark באמצעות הדוגמה הבאה:

from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
.appName("Lakehouse runtime catalog Iceberg") \
.config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
.config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
.config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
.config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
.getOrCreate()

spark.sql("USE `CATALOG_NAME`;")

# Configure spark for storing temp results
spark.conf.set("viewsEnabled","true")
spark.sql("CREATE NAMESPACE IF NOT EXISTS MATERIALIZATION_NAMESPACE");
spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE")

spark.sql("USE NAMESPACE DATASET_NAME;")

sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME"""
df = spark.read.format("bigquery").load(sql)
df.show()

מחליפים את מה שכתוב בשדות הבאים:

  • CATALOG_NAME: השם של הקטלוג.
  • PROJECT_ID: מזהה הפרויקט שמכיל את המשאבים שלכם ב-BigQuery.
  • LOCATION: המיקום של משאבי BigQuery.
  • WAREHOUSE_DIRECTORY: ה-URI של תיקיית Cloud Storage שמכילה את מחסן הנתונים.
  • MATERIALIZATION_NAMESPACE: מרחב השמות שבו רוצים לאחסן תוצאות זמניות.
  • DATASET_NAME: השם של מערך הנתונים שמכיל את הטבלה שאתם שולחים לגביה שאילתה.
  • ICEBERG_TABLE_NAME: שם הטבלה שרוצים לשלוח לגביה שאילתה.

המאמרים הבאים