שימוש בטבלאות Apache Iceberg עם Dataproc Metastore

בדף הזה מוסבר איך להשתמש בטבלאות Apache Iceberg עם שירות Dataproc Metastore שמצורף לאשכול Dataproc. ‫Apache Iceberg הוא פורמט טבלה פתוח למערכי נתונים אנליטיים גדולים.

תאימות

טבלאות Iceberg תומכות בתכונות הבאות.

דרייברים בחירה הוספה יצירת טבלה
Spark
כוורת
פרסטו

לפני שמתחילים

שימוש בטבלת Iceberg עם Spark

בדוגמה הבאה מוצג אופן השימוש בטבלאות Iceberg עם Spark.

טבלאות Iceberg תומכות בפעולות קריאה וכתיבה. מידע נוסף זמין במאמר בנושא Apache Iceberg – Spark.

הגדרות Spark

קודם מפעילים את Spark shell ומשתמשים בקטגוריה של Cloud Storage לאחסון נתונים. כדי לכלול את Iceberg בהתקנת Spark, מוסיפים את קובץ ה-JAR של Iceberg Spark Runtime לתיקיית ה-JAR של Spark. כדי להוריד את קובץ ה-JAR, אפשר לעיין במאמר הורדות של Apache Iceberg. הפקודה הבאה מפעילה את מעטפת Spark עם תמיכה ב-Apache Iceberg:

$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar

שימוש בקטלוג Hive ליצירת טבלאות Iceberg

  1. מגדירים את ההגדרות של Hive Catalog כדי ליצור טבלאות Iceberg ב-Spark Scala:

    import org.apache.iceberg.hive.HiveCatalog
    import org.apache.iceberg.catalog._
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    import java.util.HashMap
    
  2. יוצרים טבלה כדי להוסיף ולעדכן נתונים. הנה דוגמה.

    1. צור טבלה בשם example במסד הנתונים default:

      val catalog = new HiveCatalog();
      catalog.setConf(spark.sparkContext.hadoopConfiguration);
      catalog.initialize("hive", new HashMap[String,String]());
      
      val name = TableIdentifier.of("default","example");
      
    2. הוספת נתונים לדוגמה:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. מציינים אסטרטגיית חלוקה לפי העמודה id:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. יוצרים את הטבלה:

      val table=catalog.createTable(name,df1_schema,partition_spec);
      
    5. מוסיפים את Iceberg Storage Handler ו-SerDe כמאפיין הטבלה:

      table.updateProperties().set("engine.hive.enabled", "true").commit();
      
    6. כתיבת הנתונים לטבלה:

      df1.write.format("iceberg").mode("overwrite").save("default.example");
      
    7. קוראים את הנתונים:

      val read_df1=spark.read.format("iceberg").load("default.example");
      read_df1.show;
      
  3. שינוי סכימת הטבלה. הנה דוגמה.

    1. מקבלים את הטבלה ומוסיפים עמודה חדשה grade:

      val table = catalog.loadTable(TableIdentifier.of("default", "example"));
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. בודקים את סכימת הטבלה החדשה:

      table.schema.toString;
      
  4. מוסיפים עוד נתונים וצופים בהתפתחות הסכימה. הנה דוגמה.

    1. כדי להוסיף נתונים חדשים לטבלה:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save("default.example");
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save("default.example");
      
    2. בודקים את הנתונים החדשים שהוספתם:

      val read_df2=spark.read.format("iceberg").load("default.example");
      read_df2.show;
      
    3. צפייה בהיסטוריה של הטבלה:

      spark.read.format("iceberg").load("default.example.history").show(truncate = false);
      
    4. צפייה בתמונות המצב:

      spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
      
    5. צפייה בקובצי המניפסט:

      spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
      
    6. צפייה בקובצי הנתונים:

      spark.read.format("iceberg").load("default.example.files").show(truncate = false);
      
    7. נניח שטעיתם והוספתם את השורה עם הערך id=6 ואתם רוצים לחזור לגרסה הנכונה של הטבלה:

      spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
      

      מחליפים את snapshot-id בגרסה שאליה רוצים לחזור.

שימוש ב-Hadoop Tables ליצירת טבלאות Iceberg

  1. מגדירים את ההגדרות של טבלאות Hadoop כדי ליצור טבלאות Iceberg ב-Spark Scala:

    import org.apache.hadoop.conf.Configuration
    import org.apache.iceberg.hadoop.HadoopTables
    import org.apache.iceberg.Table
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. יוצרים טבלה כדי להוסיף ולעדכן נתונים. הנה דוגמה.

    1. צור טבלה בשם example במסד הנתונים default:

      val conf = new Configuration();
      val tables = new HadoopTables(conf);
      
    2. הוספת נתונים לדוגמה:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. מציינים אסטרטגיית חלוקה לפי העמודה id:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. יוצרים את הטבלה:

      val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>";
      val table = tables.create(df1_schema, partition_spec, table_location);
      
    5. כתיבת הנתונים לטבלה:

      df1.write.format("iceberg").mode("overwrite").save(table_location);
      
    6. קוראים את הנתונים:

      val read_df1=spark.read.format("iceberg").load(table_location);
      read_df1.show;
      
  3. שינוי סכימת הטבלה. הנה דוגמה.

    1. מקבלים את הטבלה ומוסיפים עמודה חדשה grade:

      val table = tables.load(table_location);
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. בודקים את סכימת הטבלה החדשה:

      table.schema.toString;
      
  4. מוסיפים עוד נתונים וצופים בהתפתחות הסכימה. הנה דוגמה.

    1. כדי להוסיף נתונים חדשים לטבלה:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save(table_location);
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save(table_location);
      
    2. בודקים את הנתונים החדשים שהוספתם:

      val read_df2=spark.read.format("iceberg").load(table_location);
      read_df2.show;
      
    3. צפייה בהיסטוריה של הטבלה:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
      
    4. צפייה בתמונות המצב:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
      
    5. צפייה בקובצי המניפסט:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
      
    6. צפייה בקובצי הנתונים:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
      
    7. כדי לחזור לגרסה ספציפית של הטבלה:

      spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
      

      מחליפים את snapshot-id בגרסה שאליה רוצים לחזור ומוסיפים "L" בסוף. לדוגמה, "3943776515926014142L".

שימוש בטבלת Iceberg ב-Hive

‫Iceberg תומך בקריאת טבלאות באמצעות Hive על ידי שימוש ב-StorageHandler. הערה יש תמיכה רק בגרסאות Hive 2.x ו-3.1.2. מידע נוסף זמין במאמר בנושא Apache Iceberg - Hive. בנוסף, מוסיפים את קובץ ה-JAR של Iceberg Hive Runtime לנתיב המחלקה של Hive. כדי להוריד את קובץ ה-JAR, אפשר לעיין במאמר הורדות של Apache Iceberg. כדי להוסיף שכבת-על של טבלת Hive מעל טבלת Iceberg, צריך ליצור את טבלת Iceberg באמצעות קטלוג Hive או טבלת Hadoop. בנוסף, צריך להגדיר את Hive בהתאם כדי לקרוא נתונים מטבלת Iceberg.

קריאת טבלת Iceberg (קטלוג Hive) ב-Hive

  1. פותחים את לקוח Hive ומגדירים את ההגדרות לקריאת טבלאות Iceberg בסשן של לקוח Hive:

    add jar /path/to/iceberg-hive-runtime.jar;
    set iceberg.engine.hive.enabled=true;
    set engine.hive.enabled=true;
    set iceberg.mr.catalog=hive;
    set hive.vectorized.execution.enabled=false;
    
  2. קריאת סכימת הטבלה והנתונים. הנה דוגמה.

    1. בודקים את סכימת הטבלה ואם פורמט הטבלה הוא Iceberg:

      describe formatted example;
      
    2. קוראים את הנתונים מהטבלה:

      select * from example;
      

קריאת טבלת Iceberg (טבלת Hadoop) ב-Hive

  1. פותחים את לקוח Hive ומגדירים את ההגדרות לקריאת טבלאות Iceberg בסשן של לקוח Hive:

    add jar /path/to/iceberg-hive-runtime.jar;
    set engine.hive.enabled=true;
    set hive.vectorized.execution.enabled=false;
    
  2. קריאת סכימת הטבלה והנתונים. הנה דוגמה.

    1. יצירת טבלה חיצונית (הוספת טבלת Hive מעל טבלת Iceberg):

      CREATE EXTERNAL TABLE hadoop_table
      STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
      LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>'
      TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
      
    2. בודקים את סכימת הטבלה ואם פורמט הטבלה הוא Iceberg:

      describe formatted hadoop_table;
      
    3. קוראים את הנתונים מהטבלה:

      select * from hadoop_table;
      

שימוש בטבלת Iceberg ב-Presto

שאילתות Presto משתמשות במחבר Hive כדי לקבל מיקומי מחיצות, ולכן צריך להגדיר את Presto בהתאם כדי לקרוא ולכתוב נתונים בטבלת Iceberg. מידע נוסף זמין במאמרים Presto/Trino - Hive Connector ו-Presto/Trino - Iceberg Connector.

הגדרות Presto

  1. בכל צומת של אשכול Dataproc, יוצרים קובץ בשם iceberg.properties /etc/presto/conf/catalog/iceberg.properties ומגדירים את hive.metastore.uri באופן הבא:

    connector.name=iceberg
    hive.metastore.uri=thrift://<example.net:9083>
    

    מחליפים את example.net:9083 במארח ובפורט הנכונים של שירות Hive Metastore Thrift.

  2. מפעילים מחדש את שירות Presto כדי להעביר את ההגדרות:

    sudo systemctl restart presto.service
    

יצירת טבלת Iceberg ב-Presto

  1. פותחים את לקוח Presto ומשתמשים במחבר Iceberg כדי לקבל את מאגר המטא-נתונים:

    --catalog iceberg --schema default
    
  2. יוצרים טבלה כדי להוסיף ולעדכן נתונים. הנה דוגמה.

    1. צור טבלה בשם example במסד הנתונים default:

      CREATE TABLE iceberg.default.example (
        id integer,
        name VARCHAR,
        major VARCHAR,
        grade VARCHAR)
      WITH (partitioning = ARRAY['major', 'grade']);
      
    2. הוספת נתונים לדוגמה:

      INSERT INTO iceberg.default.example
        VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
      
    3. קריאת נתונים מהטבלה:

      SELECT * FROM iceberg.default.example;
      
    4. כדי לבדוק את תמונות המצב, מוסיפים עוד נתונים חדשים:

      INSERT INTO example
        VALUES (4, 'Cindy', 'UX Design', 'Junior');
      
      INSERT INTO example
        VALUES (5, 'Amy', 'UX Design', 'Sophomore');
      
    5. צפייה בתמונות המצב:

      SELECT snapshot_id FROM iceberg.default."example$snapshots";
      

      כדי למצוא את המזהה של התמונה העדכנית ביותר, מוסיפים את הפקודה ORDER BY committed_at DESC LIMIT 1;.

    6. חזרה לגרסה ספציפית של הטבלה:

      CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
      

      מחליפים את snapshot-id בגרסה שאליה רוצים לחזור.

המאמרים הבאים