רכיב Iceberg אופציונלי ב-Dataproc

אפשר להתקין רכיבים נוספים כמו Iceberg כשיוצרים אשכול Dataproc באמצעות התכונה רכיבים אופציונליים. בדף הזה מוסבר איך אפשר להתקין את רכיב Iceberg באשכול Dataproc.

סקירה כללית

Apache Iceberg הוא פורמט טבלה פתוח למערכי נתונים אנליטיים גדולים. הוא מאפשר להשתמש בטבלאות SQL בנתונים גדולים בצורה פשוטה ואמינה, וגם מאפשר למנועים כמו Spark,‏ Trino,‏ PrestoDB,‏ Flink ו-Hive לעבוד עם אותן טבלאות בו-זמנית בצורה בטוחה.

כשמתקינים את הרכיב Apache Iceberg באשכול Dataproc, הוא מתקין ספריות Iceberg ומגדיר את Spark ו-Hive כך שיפעלו עם Iceberg באשכול.

תכונות מרכזיות של Iceberg

התכונות של Iceberg כוללות את האפשרויות הבאות:

  • שינוי סכימה: הוספה, הסרה או שינוי שם של עמודות בלי לשכתב את כל הטבלה.
  • מסע בזמן: אפשר להריץ שאילתות על תמונות מצב היסטוריות של טבלאות למטרות ביקורת או חזרה למצב קודם.
  • חלוקה למחיצות מוסתרת: אופטימיזציה של פריסת הנתונים כדי להריץ שאילתות מהר יותר בלי לחשוף את פרטי המחיצות למשתמשים.
  • עסקאות ACID: שמירה על עקביות הנתונים ומניעת קונפליקטים.

גרסאות תמונות Dataproc תואמות

אפשר להתקין את רכיב Iceberg באשכולות Dataproc שנוצרו עם גרסאות אימג' 2.2.47 ואילך. גרסת Iceberg שמותקנת באשכול מפורטת בדף 2.2 release versions.

כשיוצרים אשכול Dataproc עם Iceberg, מאפייני Spark ו-Hive הבאים מוגדרים לעבודה עם Iceberg.

קובץ תצורה מאפיין (property) ערך ברירת המחדל
/etc/spark/conf/spark-defaults.conf spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.driver.extraClassPath /usr/lib/iceberg/lib/iceberg-spark-runtime-spark-version_scala-version.jar
spark.executor.extraClassPath /usr/lib/iceberg/lib/iceberg-spark-runtime-spark-version_scala-version.jar
/etc/hive/conf/hive-site.xml hive.aux.jars.path file:///usr/lib/iceberg/lib/iceberg-hive-runtime.jar
iceberg.engine.hive.enabled true

התקנת הרכיב האופציונלי Iceberg

מתקינים את רכיב Iceberg כשיוצרים אשכול Dataproc. בדפים של רשימת גרסאות האימג' של אשכול Dataproc מוצגת גרסת רכיב Iceberg שכלולה בגרסאות האימג' העדכניות ביותר של אשכול Dataproc.

מסוףGoogle Cloud

כדי ליצור אשכול Dataproc שמתקין את רכיב Iceberg, מבצעים את השלבים הבאים במסוף Google Cloud :

  1. פותחים את הדף Dataproc Create a cluster. החלונית הגדרת אשכול נבחרת.
  2. בקטע Components, מתחת לOptional components, בוחרים ברכיב Iceberg.
  3. מאשרים או מציינים הגדרות אחרות של האשכול, ואז לוחצים על יצירה.

Google Cloud CLI

כדי ליצור אשכול Dataproc שמתקין את רכיב Iceberg, משתמשים בפקודה gcloud dataproc clusters create עם הדגל --optional-components.

gcloud dataproc clusters create CLUSTER_NAME \
    --region=REGION \
    --optional-components=ICEBERG \
     other flags ...

מחליפים את מה שכתוב בשדות הבאים:

API ל-REST

כדי ליצור אשכול Dataproc שמתקין את הרכיב האופציונלי Iceberg, צריך לציין את Iceberg‏ SoftwareConfig.Component כחלק מבקשת clusters.create.

שימוש בטבלאות Iceberg עם Spark ו-Hive

אחרי שיוצרים אשכול Dataproc שרכיב Iceberg האופציונלי מותקן בו, אפשר להשתמש ב-Spark וב-Hive כדי לקרוא ולכתוב נתונים בטבלת Iceberg.

Spark

הגדרת סשן Spark ל-Iceberg

אפשר להשתמש ב-CLI של gcloud באופן מקומי, או ב-REPL (לולאות קריאה-הערכה-הדפסה) של spark-shell או pyspark שפועלות בצומת הראשי של מאסטר אשכולות (cluster master) של Dataproc, כדי להפעיל את התוספים של Spark ל-Iceberg ולהגדיר את קטלוג Spark לשימוש בטבלאות Iceberg.

gcloud

כדי לשלוח עבודת Spark ולהגדיר מאפייני Spark כדי להגדיר את סשן Spark ל-Iceberg, מריצים את הדוגמה הבאה של ה-CLI של gcloud בחלון טרמינל מקומי או ב-Cloud Shell.

gcloud dataproc jobs submit spark  \
    --cluster=CLUSTER_NAME \
    --region=REGION \
    --properties="spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
    --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \
    --properties="spark.sql.catalog.CATALOG_NAME.type=hadoop" \
    --properties="spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER" \
     other flags ...

מחליפים את מה שכתוב בשדות הבאים:

  • CLUSTER_NAME: שם האשכול.
  • REGION: האזור של Compute Engine.
  • CATALOG_NAME: שם קטלוג Iceberg.
  • BUCKET ו-FOLDER: מיקום קטלוג Iceberg ב-Cloud Storage.

spark-shell

כדי להגדיר סשן Spark ל-Iceberg באמצעות spark-shell REPL באשכול Dataproc, מבצעים את השלבים הבאים:

  1. משתמשים ב-SSH כדי להתחבר לצומת הראשי של אשכול Dataproc.

  2. מריצים את הפקודה הבאה בטרמינל של סשן ה-SSH כדי להגדיר את סשן Spark ל-Iceberg.

spark-shell \
    --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
    --conf "spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \
    --conf "spark.sql.catalog.CATALOG_NAME.type=hadoop" \
    --conf "spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER"

מחליפים את מה שכתוב בשדות הבאים:

  • CLUSTER_NAME: שם האשכול.
  • REGION: האזור של Compute Engine.
  • CATALOG_NAME: שם קטלוג Iceberg.
  • BUCKET ו-FOLDER: מיקום קטלוג Iceberg ב-Cloud Storage.

מעטפת pyspark

כדי להגדיר סשן Spark ל-Iceberg באמצעות pyspark REPL באשכול Dataproc, מבצעים את השלבים הבאים:

  1. משתמשים ב-SSH כדי להתחבר לצומת הראשי של אשכול Dataproc.

  2. מריצים את הפקודה הבאה בטרמינל של סשן ה-SSH כדי להגדיר את סשן Spark ל-Iceberg:

pyspark \
    --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
    --conf "spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \
    --conf "spark.sql.catalog.CATALOG_NAME.type=hadoop" \
    --conf "spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER"

מחליפים את מה שכתוב בשדות הבאים:

  • CLUSTER_NAME: שם האשכול.
  • REGION: האזור של Compute Engine.
  • CATALOG_NAME: שם קטלוג Iceberg.
  • BUCKET ו-FOLDER: מיקום קטלוג Iceberg ב-Cloud Storage.

כתיבת נתונים לטבלת Iceberg

אפשר לכתוב נתונים לטבלת Iceberg באמצעות Spark. קטעי הקוד הבאים יוצרים DataFrame עם נתונים לדוגמה, יוצרים טבלת Iceberg ב-Cloud Storage ואז כותבים את הנתונים לטבלת Iceberg.

PySpark

# Create a DataFrame with sample data.
data = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"])

# Create an Iceberg table in Cloud Storage.
spark.sql("""CREATE TABLE IF NOT EXISTS CATALOG_NAME.NAMESPACE.TABLE_NAME (
    id integer,
    name string)
USING iceberg
LOCATION 'gs://BUCKET/FOLDER/NAMESPACE/TABLE_NAME'""")

# Write the DataFrame to the Iceberg table in Cloud Storage.
data.writeTo("CATALOG_NAME.NAMESPACE.TABLE_NAME").append()

Scala

// Create a DataFrame with sample data.
val data = Seq((1, "Alice"), (2, "Bob")).toDF("id", "name")

// Create an Iceberg table in Cloud Storage.
spark.sql("""CREATE TABLE IF NOT EXISTS CATALOG_NAME.NAMESPACE.TABLE_NAME (
    id integer,
    name string)
USING iceberg
LOCATION 'gs://BUCKET/FOLDER/NAMESPACE/TABLE_NAME'""")

// Write the DataFrame to the Iceberg table in Cloud Storage.
data.writeTo("CATALOG_NAME.NAMESPACE.TABLE_NAME").append()

קריאת נתונים מטבלת Iceberg

אפשר לקרוא נתונים מטבלת Iceberg באמצעות Spark. קטעי הקוד הבאים קוראים את הטבלה, ואז מציגים את התוכן שלה.

PySpark

# Read Iceberg table data into a DataFrame.
df = spark.read.format("iceberg").load("CATALOG_NAME.NAMESPACE.TABLE_NAME")
# Display the data.
df.show()

Scala

// Read Iceberg table data into a DataFrame.
val df = spark.read.format("iceberg").load("CATALOG_NAME.NAMESPACE.TABLE_NAME")

// Display the data.
df.show()

Spark SQL

SELECT * FROM CATALOG_NAME.NAMESPACE.TABLE_NAME

כוורת

יצירת טבלת Iceberg ב-Hive

אשכולות Dataproc מוגדרים מראש לעבודה עם Hive ו-Iceberg.

כדי להריץ את קטעי הקוד בקטע הזה, צריך לבצע את השלבים הבאים:

  1. משתמשים ב-SSH כדי להתחבר לצומת הראשי של אשכול Dataproc.

  2. מציגים את beeline בחלון הטרמינל של SSH.

    beeline -u jdbc:hive2://
    

אפשר ליצור ב-Hive טבלת Iceberg לא מחולקת או מחולקת למחיצות.

טבלה לא מחולקת

יוצרים טבלת Iceberg לא מחולקת ב-Hive.

CREATE TABLE my_table (
  id INT,
  name STRING,
  created_at TIMESTAMP
) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';

טבלה מחולקת למחיצות (Partitions)

יוצרים טבלת Iceberg עם חלוקה למחיצות ב-Hive על ידי ציון עמודות החלוקה למחיצות בסעיף PARTITIONED BY.

CREATE TABLE my_partitioned_table (
  id INT,
  name STRING
) PARTITIONED BY (date_sk INT)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';

הוספת נתונים לטבלת Iceberg ב-Hive

אפשר להוסיף נתונים לטבלת Iceberg באמצעות הצהרות INSERT סטנדרטיות של Hive.

SET hive.execution.engine=mr;

INSERT INTO my_table
SELECT 1, 'Alice', current_timestamp();

מגבלות

  • מנוע הביצוע MR ‏ (MapReduce) נתמך רק בפעולות DML ‏(data manipulation language).
  • הוצאנו משימוש את ההפעלה של MR ב-Hive 3.1.3.

קריאת נתונים מטבלת Iceberg ב-Hive

כדי לקרוא נתונים מטבלת Iceberg, משתמשים בהצהרת SELECT.

SELECT * FROM my_table;

משחררים טבלת Iceberg ב-Hive.

כדי להשליך טבלת Iceberg ב-Hive, משתמשים בהצהרת DROP TABLE.

DROP TABLE my_table;

המאמרים הבאים