רכיב Iceberg אופציונלי של Managed Service for Apache Spark

אפשר להתקין רכיבים נוספים כמו Iceberg כשיוצרים אשכול של Managed Service for Apache Spark באמצעות התכונה Optional components. בדף הזה מוסבר איך אפשר להתקין את רכיב Iceberg באשכול Managed Service for Apache Spark.

סקירה כללית

Apache Iceberg הוא פורמט טבלה פתוח למערכי נתונים אנליטיים גדולים. הוא מאפשר להשתמש בטבלאות SQL בנתונים גדולים בצורה פשוטה ואמינה, וגם מאפשר למנועים כמו Spark,‏ Trino,‏ PrestoDB,‏ Flink ו-Hive לעבוד עם אותן טבלאות בו-זמנית בצורה בטוחה.

כשמתקינים את הרכיב Apache Iceberg באשכול Managed Service for Apache Spark, הוא מתקין ספריות Iceberg ומגדיר את Spark ו-Hive כך שיפעלו עם Iceberg באשכול.

תכונות עיקריות של Iceberg

התכונות של Iceberg כוללות את האפשרויות הבאות:

  • שינוי סכימה: הוספה, הסרה או שינוי של שמות עמודות בלי לשכתב את כל הטבלה.
  • מסע בזמן: אפשר להריץ שאילתות על תמונות מצב היסטוריות של טבלאות למטרות ביקורת או חזרה למצב קודם.
  • חלוקה למחיצות מוסתרת: אופטימיזציה של פריסת הנתונים כדי להריץ שאילתות מהר יותר בלי לחשוף את פרטי המחיצות למשתמשים.
  • עסקאות ACID: שמירה על עקביות הנתונים ומניעת קונפליקטים.

גרסאות תמונה תואמות של Managed Service for Apache Spark

אפשר להתקין את רכיב Iceberg באשכולות של Managed Service for Apache Spark שנוצרו עם גרסאות תמונה 2.2.47 ואילך. גרסת Iceberg שמותקנת באשכול מפורטת בדף 2.2 release versions.

כשיוצרים Managed Service for Apache Spark עם אשכול Iceberg, מאפייני Spark ו-Hive הבאים מוגדרים לעבודה עם Iceberg.

קובץ תצורה מאפיין (property) ערך ברירת המחדל
/etc/spark/conf/spark-defaults.conf spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.driver.extraClassPath /usr/lib/iceberg/lib/iceberg-spark-runtime-spark-version_scala-version.jar
spark.executor.extraClassPath /usr/lib/iceberg/lib/iceberg-spark-runtime-spark-version_scala-version.jar
/etc/hive/conf/hive-site.xml hive.aux.jars.path file:///usr/lib/iceberg/lib/iceberg-hive-runtime.jar
iceberg.engine.hive.enabled true

התקנת הרכיב האופציונלי Iceberg

מתקינים את רכיב Iceberg כשיוצרים אשכול של Managed Service for Apache Spark. בדפים של רשימת גרסאות תמונות אשכולות של Managed Service for Apache Spark מוצגת גרסת רכיב Iceberg שכלולה בגרסאות העדכניות של תמונות אשכולות של Managed Service for Apache Spark.

מסוףGoogle Cloud

כדי ליצור אשכול של Managed Service for Apache Spark שמתקין את רכיב Iceberg, מבצעים את השלבים הבאים במסוף Google Cloud :

  1. פותחים את הדף Create cluster.
  2. לוחצים על הגדרה נוספת כדי להרחיב את הקטע.
  3. עורכים את הרכיבים האופציונליים.
  4. בחלונית שנפתחת, מסמנים את התיבה לצד Iceberg.
  5. לוחצים על Save.

‫CLI של gcloud

כדי ליצור אשכול של Managed Service for Apache Spark שבו מותקן רכיב Iceberg, משתמשים בפקודה gcloud dataproc clusters create עם הדגל --optional-components.

gcloud dataproc clusters create CLUSTER_NAME \
    --region=REGION \
    --optional-components=ICEBERG \
     other flags ...

מחליפים את מה שכתוב בשדות הבאים:

‫API בארכיטקטורת REST

כדי ליצור אשכול של Managed Service for Apache Spark שבו מותקן הרכיב האופציונלי Iceberg, צריך לציין את Iceberg SoftwareConfig.Component כחלק מבקשת clusters.create.

שימוש בטבלאות Iceberg עם Spark ו-Hive

אחרי שיוצרים אשכול של Managed Service for Apache Spark עם רכיב Iceberg אופציונלי שמותקן באשכול, אפשר להשתמש ב-Spark וב-Hive כדי לקרוא ולכתוב נתונים בטבלת Iceberg.

Spark

הגדרת סשן Spark ל-Iceberg

אפשר להשתמש ב-CLI של gcloud באופן מקומי, או ב-REPL‏ (Read-Eval-Print Loop) של spark-shell או pyspark שפועל בצומת הראשי של מאסטר אשכולות (cluster master) של Dataproc, כדי להפעיל את התוספים של Spark ל-Iceberg ולהגדיר את קטלוג Spark לשימוש בטבלאות Iceberg.

gcloud

כדי לשלוח עבודת Spark ולהגדיר מאפייני Spark כדי להגדיר את סשן Spark ל-Iceberg, מריצים את הדוגמה הבאה של ה-CLI של gcloud בחלון טרמינל מקומי או ב-Cloud Shell.

gcloud dataproc jobs submit spark  \
    --cluster=CLUSTER_NAME \
    --region=REGION \
    --properties="spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
    --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \
    --properties="spark.sql.catalog.CATALOG_NAME.type=hadoop" \
    --properties="spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER" \
     other flags ...

מחליפים את מה שכתוב בשדות הבאים:

  • CLUSTER_NAME: שם האשכול.
  • REGION: האזור של Compute Engine.
  • CATALOG_NAME: שם קטלוג Iceberg.
  • BUCKET ו-FOLDER: מיקום קטלוג Iceberg ב-Cloud Storage.

spark-shell

כדי להגדיר סשן Spark ל-Iceberg באמצעות spark-shell REPL באשכול Managed Service for Apache Spark, מבצעים את השלבים הבאים:

  1. משתמשים ב-SSH כדי להתחבר לצומת הראשי של אשכול Managed Service for Apache Spark.

  2. מריצים את הפקודה הבאה בטרמינל של סשן ה-SSH כדי להגדיר את סשן Spark ל-Iceberg.

spark-shell \
    --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
    --conf "spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \
    --conf "spark.sql.catalog.CATALOG_NAME.type=hadoop" \
    --conf "spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER"

מחליפים את מה שכתוב בשדות הבאים:

  • CLUSTER_NAME: שם האשכול.
  • REGION: האזור של Compute Engine.
  • CATALOG_NAME: שם קטלוג Iceberg.
  • BUCKET ו-FOLDER: מיקום קטלוג Iceberg ב-Cloud Storage.

מעטפת pyspark

כדי להגדיר סשן Spark ל-Iceberg באמצעות pyspark REPL באשכול Managed Service for Apache Spark, מבצעים את השלבים הבאים:

  1. משתמשים ב-SSH כדי להתחבר לצומת הראשי של אשכול Managed Service for Apache Spark.

  2. מריצים את הפקודה הבאה בטרמינל של סשן ה-SSH כדי להגדיר את סשן Spark ל-Iceberg:

pyspark \
    --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
    --conf "spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \
    --conf "spark.sql.catalog.CATALOG_NAME.type=hadoop" \
    --conf "spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER"

מחליפים את מה שכתוב בשדות הבאים:

  • CLUSTER_NAME: שם האשכול.
  • REGION: האזור של Compute Engine.
  • CATALOG_NAME: שם קטלוג Iceberg.
  • BUCKET ו-FOLDER: מיקום קטלוג Iceberg ב-Cloud Storage.

כתיבת נתונים לטבלת Iceberg

אפשר לכתוב נתונים לטבלת Iceberg באמצעות Spark. קטעי הקוד הבאים יוצרים DataFrame עם נתונים לדוגמה, יוצרים טבלת Iceberg ב-Cloud Storage ואז כותבים את הנתונים לטבלת Iceberg.

PySpark

# Create a DataFrame with sample data.
data = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"])

# Create an Iceberg table in Cloud Storage.
spark.sql("""CREATE TABLE IF NOT EXISTS CATALOG_NAME.NAMESPACE.TABLE_NAME (
    id integer,
    name string)
USING iceberg
LOCATION 'gs://BUCKET/FOLDER/NAMESPACE/TABLE_NAME'""")

# Write the DataFrame to the Iceberg table in Cloud Storage.
data.writeTo("CATALOG_NAME.NAMESPACE.TABLE_NAME").append()

Scala

// Create a DataFrame with sample data.
val data = Seq((1, "Alice"), (2, "Bob")).toDF("id", "name")

// Create an Iceberg table in Cloud Storage.
spark.sql("""CREATE TABLE IF NOT EXISTS CATALOG_NAME.NAMESPACE.TABLE_NAME (
    id integer,
    name string)
USING iceberg
LOCATION 'gs://BUCKET/FOLDER/NAMESPACE/TABLE_NAME'""")

// Write the DataFrame to the Iceberg table in Cloud Storage.
data.writeTo("CATALOG_NAME.NAMESPACE.TABLE_NAME").append()

קריאת נתונים מטבלת Iceberg

אפשר לקרוא נתונים מטבלת Iceberg באמצעות Spark. בקטעי הקוד הבאים, המערכת קוראת את הטבלה ואז מציגה את התוכן שלה.

PySpark

# Read Iceberg table data into a DataFrame.
df = spark.read.format("iceberg").load("CATALOG_NAME.NAMESPACE.TABLE_NAME")
# Display the data.
df.show()

Scala

// Read Iceberg table data into a DataFrame.
val df = spark.read.format("iceberg").load("CATALOG_NAME.NAMESPACE.TABLE_NAME")

// Display the data.
df.show()

Spark SQL

SELECT * FROM CATALOG_NAME.NAMESPACE.TABLE_NAME

Hive

יצירת טבלת Iceberg ב-Hive

ב-Managed Service for Apache Spark, אשכולות של Apache Spark מוגדרים מראש לעבודה עם Iceberg.

כדי להריץ את קטעי הקוד בקטע הזה, צריך לבצע את השלבים הבאים:

  1. משתמשים ב-SSH כדי להתחבר לצומת הראשי של אשכול Managed Service for Apache Spark.

  2. מציגים את beeline בחלון הטרמינל של SSH.

    beeline -u jdbc:hive2://
    

אפשר ליצור ב-Hive טבלת Iceberg לא מחולקת או מחולקת למחיצות.

טבלה לא מחולקת

יוצרים טבלת Iceberg לא מחולקת ב-Hive.

CREATE TABLE my_table (
  id INT,
  name STRING,
  created_at TIMESTAMP
) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';

טבלה מחולקת למחיצות (Partitions)

כדי ליצור טבלת Iceberg עם חלוקה למחיצות ב-Hive, מציינים את עמודות החלוקה למחיצות במשפט PARTITIONED BY.

CREATE TABLE my_partitioned_table (
  id INT,
  name STRING
) PARTITIONED BY (date_sk INT)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';

הוספת נתונים לטבלת Iceberg ב-Hive

אפשר להוסיף נתונים לטבלת Iceberg באמצעות הצהרות INSERT סטנדרטיות של Hive.

SET hive.execution.engine=mr;

INSERT INTO my_table
SELECT 1, 'Alice', current_timestamp();

מגבלות

  • מנוע ההפעלה MR (MapReduce) נתמך רק בפעולות DML (שפת טיפול בנתונים).
  • הוצאנו משימוש את ההפעלה של MR ב-Hive 3.1.3.

קריאת נתונים מטבלת Iceberg ב-Hive

כדי לקרוא נתונים מטבלת Iceberg, משתמשים בהצהרת SELECT.

SELECT * FROM my_table;

מחיקת טבלה של Iceberg ב-Hive.

כדי להסיר טבלה של Iceberg ב-Hive, משתמשים בהצהרה DROP TABLE.

DROP TABLE my_table;

המאמרים הבאים