בדף הזה מוסבר איך להשתמש בטבלאות Apache Iceberg עם שירות Dataproc Metastore שמצורף לאשכול Dataproc. Apache Iceberg הוא פורמט טבלה פתוח למערכי נתונים אנליטיים גדולים.
תאימות
טבלאות Iceberg תומכות בתכונות הבאות.
| דרייברים | בחירה | הוספה | יצירת טבלה |
|---|---|---|---|
| Spark | ✓ | ✓ | ✓ |
| כוורת | ✓ | ✓ | |
| פרסטו | ✓ | ✓ | ✓ |
לפני שמתחילים
- יוצרים שירות Dataproc Metastore.
- מצרפים Dataproc Metastore לאשכול Dataproc.
שימוש בטבלת Iceberg עם Spark
בדוגמה הבאה מוצג אופן השימוש בטבלאות Iceberg עם Spark.
טבלאות Iceberg תומכות בפעולות קריאה וכתיבה. מידע נוסף זמין במאמר בנושא Apache Iceberg – Spark.
הגדרות Spark
קודם מפעילים את Spark shell ומשתמשים בקטגוריה של Cloud Storage לאחסון נתונים. כדי לכלול את Iceberg בהתקנת Spark, מוסיפים את קובץ ה-JAR של Iceberg Spark Runtime לתיקיית ה-JAR של Spark. כדי להוריד את קובץ ה-JAR, אפשר לעיין במאמר הורדות של Apache Iceberg. הפקודה הבאה מפעילה את מעטפת Spark עם תמיכה ב-Apache Iceberg:
$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar
שימוש בקטלוג Hive ליצירת טבלאות Iceberg
מגדירים את ההגדרות של Hive Catalog כדי ליצור טבלאות Iceberg ב-Spark Scala:
import org.apache.iceberg.hive.HiveCatalog import org.apache.iceberg.catalog._ import org.apache.iceberg.Schema import org.apache.iceberg.types.Types._ import org.apache.iceberg.PartitionSpec import org.apache.iceberg.spark.SparkSchemaUtil import org.apache.spark.sql._ import java.util.HashMapיוצרים טבלה כדי להוסיף ולעדכן נתונים. הנה דוגמה.
צור טבלה בשם
exampleבמסד הנתוניםdefault:val catalog = new HiveCatalog(); catalog.setConf(spark.sparkContext.hadoopConfiguration); catalog.initialize("hive", new HashMap[String,String]()); val name = TableIdentifier.of("default","example");הוספת נתונים לדוגמה:
val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major"); val df1_schema = SparkSchemaUtil.convert(df1.schema);מציינים אסטרטגיית חלוקה לפי העמודה
id:val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;יוצרים את הטבלה:
val table=catalog.createTable(name,df1_schema,partition_spec);מוסיפים את Iceberg Storage Handler ו-SerDe כמאפיין הטבלה:
table.updateProperties().set("engine.hive.enabled", "true").commit();כתיבת הנתונים לטבלה:
df1.write.format("iceberg").mode("overwrite").save("default.example");קוראים את הנתונים:
val read_df1=spark.read.format("iceberg").load("default.example"); read_df1.show;
שינוי סכימת הטבלה. הנה דוגמה.
מקבלים את הטבלה ומוסיפים עמודה חדשה
grade:val table = catalog.loadTable(TableIdentifier.of("default", "example")); table.updateSchema.addColumn("grade", StringType.get()).commit();בודקים את סכימת הטבלה החדשה:
table.schema.toString;
מוסיפים עוד נתונים וצופים בהתפתחות הסכימה. הנה דוגמה.
כדי להוסיף נתונים חדשים לטבלה:
val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade"); df2.write.format("iceberg").mode("append").save("default.example"); val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade"); df3.write.format("iceberg").mode("append").save("default.example");בודקים את הנתונים החדשים שהוספתם:
val read_df2=spark.read.format("iceberg").load("default.example"); read_df2.show;צפייה בהיסטוריה של הטבלה:
spark.read.format("iceberg").load("default.example.history").show(truncate = false);צפייה בתמונות המצב:
spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);צפייה בקובצי המניפסט:
spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);צפייה בקובצי הנתונים:
spark.read.format("iceberg").load("default.example.files").show(truncate = false);נניח שטעיתם והוספתם את השורה עם הערך
id=6ואתם רוצים לחזור לגרסה הנכונה של הטבלה:spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();מחליפים את
snapshot-idבגרסה שאליה רוצים לחזור.
שימוש ב-Hadoop Tables ליצירת טבלאות Iceberg
מגדירים את ההגדרות של טבלאות Hadoop כדי ליצור טבלאות Iceberg ב-Spark Scala:
import org.apache.hadoop.conf.Configuration import org.apache.iceberg.hadoop.HadoopTables import org.apache.iceberg.Table import org.apache.iceberg.Schema import org.apache.iceberg.types.Types._ import org.apache.iceberg.PartitionSpec import org.apache.iceberg.spark.SparkSchemaUtil import org.apache.spark.sql._יוצרים טבלה כדי להוסיף ולעדכן נתונים. הנה דוגמה.
צור טבלה בשם
exampleבמסד הנתוניםdefault:val conf = new Configuration(); val tables = new HadoopTables(conf);הוספת נתונים לדוגמה:
val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major"); val df1_schema = SparkSchemaUtil.convert(df1.schema);מציינים אסטרטגיית חלוקה לפי העמודה
id:val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;יוצרים את הטבלה:
val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>"; val table = tables.create(df1_schema, partition_spec, table_location);כתיבת הנתונים לטבלה:
df1.write.format("iceberg").mode("overwrite").save(table_location);קוראים את הנתונים:
val read_df1=spark.read.format("iceberg").load(table_location); read_df1.show;
שינוי סכימת הטבלה. הנה דוגמה.
מקבלים את הטבלה ומוסיפים עמודה חדשה
grade:val table = tables.load(table_location); table.updateSchema.addColumn("grade", StringType.get()).commit();בודקים את סכימת הטבלה החדשה:
table.schema.toString;
מוסיפים עוד נתונים וצופים בהתפתחות הסכימה. הנה דוגמה.
כדי להוסיף נתונים חדשים לטבלה:
val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade"); df2.write.format("iceberg").mode("append").save(table_location); val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade"); df3.write.format("iceberg").mode("append").save(table_location);בודקים את הנתונים החדשים שהוספתם:
val read_df2=spark.read.format("iceberg").load(table_location); read_df2.show;צפייה בהיסטוריה של הטבלה:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);צפייה בתמונות המצב:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);צפייה בקובצי המניפסט:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);צפייה בקובצי הנתונים:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);כדי לחזור לגרסה ספציפית של הטבלה:
spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;מחליפים את
snapshot-idבגרסה שאליה רוצים לחזור ומוסיפים"L"בסוף. לדוגמה,"3943776515926014142L".
שימוש בטבלת Iceberg ב-Hive
Iceberg תומך בקריאת טבלאות באמצעות Hive על ידי שימוש ב-StorageHandler. הערה
יש תמיכה רק בגרסאות Hive 2.x ו-3.1.2. מידע נוסף זמין במאמר בנושא Apache Iceberg - Hive. בנוסף, מוסיפים את קובץ ה-JAR של Iceberg Hive Runtime לנתיב המחלקה של Hive. כדי להוריד את קובץ ה-JAR, אפשר לעיין במאמר הורדות של Apache Iceberg.
כדי להוסיף שכבת-על של טבלת Hive מעל טבלת Iceberg, צריך ליצור את טבלת Iceberg באמצעות קטלוג Hive או טבלת Hadoop. בנוסף, צריך להגדיר את Hive בהתאם כדי לקרוא נתונים מטבלת Iceberg.
קריאת טבלת Iceberg (קטלוג Hive) ב-Hive
פותחים את לקוח Hive ומגדירים את ההגדרות לקריאת טבלאות Iceberg בסשן של לקוח Hive:
add jar /path/to/iceberg-hive-runtime.jar; set iceberg.engine.hive.enabled=true; set engine.hive.enabled=true; set iceberg.mr.catalog=hive; set hive.vectorized.execution.enabled=false;קריאת סכימת הטבלה והנתונים. הנה דוגמה.
בודקים את סכימת הטבלה ואם פורמט הטבלה הוא Iceberg:
describe formatted example;קוראים את הנתונים מהטבלה:
select * from example;
קריאת טבלת Iceberg (טבלת Hadoop) ב-Hive
פותחים את לקוח Hive ומגדירים את ההגדרות לקריאת טבלאות Iceberg בסשן של לקוח Hive:
add jar /path/to/iceberg-hive-runtime.jar; set engine.hive.enabled=true; set hive.vectorized.execution.enabled=false;קריאת סכימת הטבלה והנתונים. הנה דוגמה.
יצירת טבלה חיצונית (הוספת טבלת Hive מעל טבלת Iceberg):
CREATE EXTERNAL TABLE hadoop_table STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>' TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');בודקים את סכימת הטבלה ואם פורמט הטבלה הוא Iceberg:
describe formatted hadoop_table;קוראים את הנתונים מהטבלה:
select * from hadoop_table;
שימוש בטבלת Iceberg ב-Presto
שאילתות Presto משתמשות במחבר Hive כדי לקבל מיקומי מחיצות, ולכן צריך להגדיר את Presto בהתאם כדי לקרוא ולכתוב נתונים בטבלת Iceberg. מידע נוסף זמין במאמרים Presto/Trino - Hive Connector ו-Presto/Trino - Iceberg Connector.
הגדרות Presto
בכל צומת של אשכול Dataproc, יוצרים קובץ בשם
iceberg.properties/etc/presto/conf/catalog/iceberg.propertiesומגדירים אתhive.metastore.uriבאופן הבא:connector.name=iceberg hive.metastore.uri=thrift://<example.net:9083>מחליפים את
example.net:9083במארח ובפורט הנכונים של שירות Hive Metastore Thrift.מפעילים מחדש את שירות Presto כדי להעביר את ההגדרות:
sudo systemctl restart presto.service
יצירת טבלת Iceberg ב-Presto
פותחים את לקוח Presto ומשתמשים במחבר Iceberg כדי לקבל את מאגר המטא-נתונים:
--catalog iceberg --schema defaultיוצרים טבלה כדי להוסיף ולעדכן נתונים. הנה דוגמה.
צור טבלה בשם
exampleבמסד הנתוניםdefault:CREATE TABLE iceberg.default.example ( id integer, name VARCHAR, major VARCHAR, grade VARCHAR) WITH (partitioning = ARRAY['major', 'grade']);הוספת נתונים לדוגמה:
INSERT INTO iceberg.default.example VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');קריאת נתונים מהטבלה:
SELECT * FROM iceberg.default.example;כדי לבדוק את תמונות המצב, מוסיפים עוד נתונים חדשים:
INSERT INTO example VALUES (4, 'Cindy', 'UX Design', 'Junior'); INSERT INTO example VALUES (5, 'Amy', 'UX Design', 'Sophomore');צפייה בתמונות המצב:
SELECT snapshot_id FROM iceberg.default."example$snapshots";כדי למצוא את המזהה של התמונה העדכנית ביותר, מוסיפים את הפקודה
ORDER BY committed_at DESC LIMIT 1;.חזרה לגרסה ספציפית של הטבלה:
CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);מחליפים את
snapshot-idבגרסה שאליה רוצים לחזור.