אפשר להתקין רכיבים נוספים כמו Iceberg כשיוצרים אשכול של Managed Service for Apache Spark באמצעות התכונה Optional components. בדף הזה מוסבר איך אפשר להתקין את רכיב Iceberg באשכול Managed Service for Apache Spark.
סקירה כללית
Apache Iceberg הוא פורמט טבלה פתוח למערכי נתונים אנליטיים גדולים. הוא מאפשר להשתמש בטבלאות SQL בנתונים גדולים בצורה פשוטה ואמינה, וגם מאפשר למנועים כמו Spark, Trino, PrestoDB, Flink ו-Hive לעבוד עם אותן טבלאות בו-זמנית בצורה בטוחה.
כשמתקינים את הרכיב Apache Iceberg באשכול Managed Service for Apache Spark, הוא מתקין ספריות Iceberg ומגדיר את Spark ו-Hive כך שיפעלו עם Iceberg באשכול.
תכונות עיקריות של Iceberg
התכונות של Iceberg כוללות את האפשרויות הבאות:
- שינוי סכימה: הוספה, הסרה או שינוי של שמות עמודות בלי לשכתב את כל הטבלה.
- מסע בזמן: אפשר להריץ שאילתות על תמונות מצב היסטוריות של טבלאות למטרות ביקורת או חזרה למצב קודם.
- חלוקה למחיצות מוסתרת: אופטימיזציה של פריסת הנתונים כדי להריץ שאילתות מהר יותר בלי לחשוף את פרטי המחיצות למשתמשים.
- עסקאות ACID: שמירה על עקביות הנתונים ומניעת קונפליקטים.
גרסאות תמונה תואמות של Managed Service for Apache Spark
אפשר להתקין את רכיב Iceberg באשכולות של Managed Service for Apache Spark שנוצרו עם גרסאות תמונה 2.2.47 ואילך. גרסת Iceberg שמותקנת באשכול מפורטת בדף 2.2 release versions.
מאפיינים שקשורים ל-Iceberg
כשיוצרים Managed Service for Apache Spark עם אשכול Iceberg, מאפייני Spark ו-Hive הבאים מוגדרים לעבודה עם Iceberg.
| קובץ תצורה | מאפיין (property) | ערך ברירת המחדל |
|---|---|---|
/etc/spark/conf/spark-defaults.conf |
spark.sql.extensions |
org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions |
spark.driver.extraClassPath |
/usr/lib/iceberg/lib/iceberg-spark-runtime-spark-version_scala-version.jar |
|
spark.executor.extraClassPath |
/usr/lib/iceberg/lib/iceberg-spark-runtime-spark-version_scala-version.jar |
|
/etc/hive/conf/hive-site.xml |
hive.aux.jars.path |
file:///usr/lib/iceberg/lib/iceberg-hive-runtime.jar |
iceberg.engine.hive.enabled |
true |
התקנת הרכיב האופציונלי Iceberg
מתקינים את רכיב Iceberg כשיוצרים אשכול של Managed Service for Apache Spark. בדפים של רשימת גרסאות תמונות אשכולות של Managed Service for Apache Spark מוצגת גרסת רכיב Iceberg שכלולה בגרסאות העדכניות של תמונות אשכולות של Managed Service for Apache Spark.
מסוףGoogle Cloud
כדי ליצור אשכול של Managed Service for Apache Spark שמתקין את רכיב Iceberg, מבצעים את השלבים הבאים במסוף Google Cloud :
- פותחים את הדף Create cluster.
- לוחצים על הגדרה נוספת כדי להרחיב את הקטע.
- עורכים את הרכיבים האופציונליים.
- בחלונית שנפתחת, מסמנים את התיבה לצד Iceberg.
- לוחצים על Save.
CLI של gcloud
כדי ליצור אשכול של Managed Service for Apache Spark שבו מותקן רכיב Iceberg, משתמשים בפקודה gcloud dataproc clusters create עם הדגל --optional-components.
gcloud dataproc clusters create CLUSTER_NAME \ --region=REGION \ --optional-components=ICEBERG \ other flags ...
מחליפים את מה שכתוב בשדות הבאים:
- CLUSTER_NAME: השם החדש של האשכול.
- REGION: האזור של האשכול.
API בארכיטקטורת REST
כדי ליצור אשכול של Managed Service for Apache Spark שבו מותקן הרכיב האופציונלי Iceberg, צריך לציין את Iceberg
SoftwareConfig.Component
כחלק מבקשת
clusters.create.
שימוש בטבלאות Iceberg עם Spark ו-Hive
אחרי שיוצרים אשכול של Managed Service for Apache Spark עם רכיב Iceberg אופציונלי שמותקן באשכול, אפשר להשתמש ב-Spark וב-Hive כדי לקרוא ולכתוב נתונים בטבלת Iceberg.
Spark
הגדרת סשן Spark ל-Iceberg
אפשר להשתמש ב-CLI של gcloud באופן מקומי, או ב-REPL (Read-Eval-Print Loop) של spark-shell או pyspark שפועל בצומת הראשי של מאסטר אשכולות (cluster master) של Dataproc, כדי להפעיל את התוספים של Spark ל-Iceberg ולהגדיר את קטלוג Spark לשימוש בטבלאות Iceberg.
gcloud
כדי לשלוח עבודת Spark ולהגדיר מאפייני Spark כדי להגדיר את סשן Spark ל-Iceberg, מריצים את הדוגמה הבאה של ה-CLI של gcloud בחלון טרמינל מקומי או ב-Cloud Shell.
gcloud dataproc jobs submit spark \ --cluster=CLUSTER_NAME \ --region=REGION \ --properties="spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \ --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \ --properties="spark.sql.catalog.CATALOG_NAME.type=hadoop" \ --properties="spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER" \ other flags ...
מחליפים את מה שכתוב בשדות הבאים:
- CLUSTER_NAME: שם האשכול.
- REGION: האזור של Compute Engine.
- CATALOG_NAME: שם קטלוג Iceberg.
- BUCKET ו-FOLDER: מיקום קטלוג Iceberg ב-Cloud Storage.
spark-shell
כדי להגדיר סשן Spark ל-Iceberg באמצעות spark-shell REPL באשכול Managed Service for Apache Spark, מבצעים את השלבים הבאים:
משתמשים ב-SSH כדי להתחבר לצומת הראשי של אשכול Managed Service for Apache Spark.
מריצים את הפקודה הבאה בטרמינל של סשן ה-SSH כדי להגדיר את סשן Spark ל-Iceberg.
spark-shell \
--conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
--conf "spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \
--conf "spark.sql.catalog.CATALOG_NAME.type=hadoop" \
--conf "spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER"
מחליפים את מה שכתוב בשדות הבאים:
- CLUSTER_NAME: שם האשכול.
- REGION: האזור של Compute Engine.
- CATALOG_NAME: שם קטלוג Iceberg.
- BUCKET ו-FOLDER: מיקום קטלוג Iceberg ב-Cloud Storage.
מעטפת pyspark
כדי להגדיר סשן Spark ל-Iceberg באמצעות pyspark REPL באשכול Managed Service for Apache Spark, מבצעים את השלבים הבאים:
משתמשים ב-SSH כדי להתחבר לצומת הראשי של אשכול Managed Service for Apache Spark.
מריצים את הפקודה הבאה בטרמינל של סשן ה-SSH כדי להגדיר את סשן Spark ל-Iceberg:
pyspark \
--conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
--conf "spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \
--conf "spark.sql.catalog.CATALOG_NAME.type=hadoop" \
--conf "spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER"
מחליפים את מה שכתוב בשדות הבאים:
- CLUSTER_NAME: שם האשכול.
- REGION: האזור של Compute Engine.
- CATALOG_NAME: שם קטלוג Iceberg.
- BUCKET ו-FOLDER: מיקום קטלוג Iceberg ב-Cloud Storage.
כתיבת נתונים לטבלת Iceberg
אפשר לכתוב נתונים לטבלת Iceberg באמצעות Spark. קטעי הקוד הבאים יוצרים DataFrame עם נתונים לדוגמה, יוצרים טבלת Iceberg ב-Cloud Storage ואז כותבים את הנתונים לטבלת Iceberg.
PySpark
# Create a DataFrame with sample data.
data = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"])
# Create an Iceberg table in Cloud Storage.
spark.sql("""CREATE TABLE IF NOT EXISTS CATALOG_NAME.NAMESPACE.TABLE_NAME (
id integer,
name string)
USING iceberg
LOCATION 'gs://BUCKET/FOLDER/NAMESPACE/TABLE_NAME'""")
# Write the DataFrame to the Iceberg table in Cloud Storage.
data.writeTo("CATALOG_NAME.NAMESPACE.TABLE_NAME").append()
Scala
// Create a DataFrame with sample data.
val data = Seq((1, "Alice"), (2, "Bob")).toDF("id", "name")
// Create an Iceberg table in Cloud Storage.
spark.sql("""CREATE TABLE IF NOT EXISTS CATALOG_NAME.NAMESPACE.TABLE_NAME (
id integer,
name string)
USING iceberg
LOCATION 'gs://BUCKET/FOLDER/NAMESPACE/TABLE_NAME'""")
// Write the DataFrame to the Iceberg table in Cloud Storage.
data.writeTo("CATALOG_NAME.NAMESPACE.TABLE_NAME").append()
קריאת נתונים מטבלת Iceberg
אפשר לקרוא נתונים מטבלת Iceberg באמצעות Spark. בקטעי הקוד הבאים, המערכת קוראת את הטבלה ואז מציגה את התוכן שלה.
PySpark
# Read Iceberg table data into a DataFrame.
df = spark.read.format("iceberg").load("CATALOG_NAME.NAMESPACE.TABLE_NAME")
# Display the data.
df.show()
Scala
// Read Iceberg table data into a DataFrame.
val df = spark.read.format("iceberg").load("CATALOG_NAME.NAMESPACE.TABLE_NAME")
// Display the data.
df.show()
Spark SQL
SELECT * FROM CATALOG_NAME.NAMESPACE.TABLE_NAME
Hive
יצירת טבלת Iceberg ב-Hive
ב-Managed Service for Apache Spark, אשכולות של Apache Spark מוגדרים מראש לעבודה עם Iceberg.
כדי להריץ את קטעי הקוד בקטע הזה, צריך לבצע את השלבים הבאים:
משתמשים ב-SSH כדי להתחבר לצומת הראשי של אשכול Managed Service for Apache Spark.
מציגים את
beelineבחלון הטרמינל של SSH.beeline -u jdbc:hive2://
אפשר ליצור ב-Hive טבלת Iceberg לא מחולקת או מחולקת למחיצות.
טבלה לא מחולקת
יוצרים טבלת Iceberg לא מחולקת ב-Hive.
CREATE TABLE my_table ( id INT, name STRING, created_at TIMESTAMP ) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';
טבלה מחולקת למחיצות (Partitions)
כדי ליצור טבלת Iceberg עם חלוקה למחיצות ב-Hive, מציינים את עמודות החלוקה למחיצות במשפט PARTITIONED BY.
CREATE TABLE my_partitioned_table ( id INT, name STRING ) PARTITIONED BY (date_sk INT) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';
הוספת נתונים לטבלת Iceberg ב-Hive
אפשר להוסיף נתונים לטבלת Iceberg באמצעות הצהרות INSERT סטנדרטיות של Hive.
SET hive.execution.engine=mr; INSERT INTO my_table SELECT 1, 'Alice', current_timestamp();
מגבלות
- מנוע ההפעלה MR (MapReduce) נתמך רק בפעולות DML (שפת טיפול בנתונים).
- הוצאנו משימוש את ההפעלה של MR ב-Hive
3.1.3.
קריאת נתונים מטבלת Iceberg ב-Hive
כדי לקרוא נתונים מטבלת Iceberg, משתמשים בהצהרת SELECT.
SELECT * FROM my_table;
מחיקת טבלה של Iceberg ב-Hive.
כדי להסיר טבלה של Iceberg ב-Hive, משתמשים בהצהרה DROP TABLE.
DROP TABLE my_table;