Utilizzare le tabelle Apache Iceberg con Dataproc Metastore

Questa pagina spiega come utilizzare le tabelle Apache Iceberg con un servizio Dataproc Metastore collegato a un cluster Managed Service per Apache Spark. Apache Iceberg è un formato di tabella aperto per set di dati analitici di grandi dimensioni.

Compatibilità

Le tabelle Iceberg supportano le seguenti funzionalità.

Driver Seleziona Inserisci Crea tabella
Spark
Hive
Presto

Prima di iniziare

Utilizzare la tabella Iceberg con Spark

L'esempio seguente mostra come utilizzare le tabelle Iceberg con Spark.

Le tabelle Iceberg supportano le operazioni di lettura e scrittura. Per ulteriori informazioni, vedi Apache Iceberg - Spark.

Configurazioni di Spark

Innanzitutto, avvia la shell Spark e utilizza un bucket Cloud Storage per archiviare i dati. Per includere Iceberg nell'installazione di Spark, aggiungi il file JAR del runtime di Iceberg Spark alla cartella JAR di Spark. Per scaricare il file JAR, vedi Download di Apache Iceberg. Il comando seguente avvia la shell Spark con il supporto per Apache Iceberg:

$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar

Utilizzare il catalogo Hive per creare tabelle Iceberg

  1. Configura il catalogo Hive per creare tabelle Iceberg in Spark Scala:

    import org.apache.iceberg.hive.HiveCatalog
    import org.apache.iceberg.catalog._
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    import java.util.HashMap
    
  2. Crea una tabella per inserire e aggiornare i dati. Di seguito è riportato un esempio.

    1. Crea una tabella denominata example nel database default:

      val catalog = new HiveCatalog();
      catalog.setConf(spark.sparkContext.hadoopConfiguration);
      catalog.initialize("hive", new HashMap[String,String]());
      
      val name = TableIdentifier.of("default","example");
      
    2. Inserisci dati di esempio:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Specifica la strategia di partizionamento in base alla colonna id:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Crea la tabella:

      val table=catalog.createTable(name,df1_schema,partition_spec);
      
    5. Aggiungi Iceberg Storage Handler e SerDe come proprietà della tabella:

      table.updateProperties().set("engine.hive.enabled", "true").commit();
      
    6. Scrivi i dati nella tabella:

      df1.write.format("iceberg").mode("overwrite").save("default.example");
      
    7. Leggi i dati:

      val read_df1=spark.read.format("iceberg").load("default.example");
      read_df1.show;
      
  3. Modifica lo schema della tabella. Di seguito è riportato un esempio.

    1. Recupera la tabella e aggiungi una nuova colonna grade:

      val table = catalog.loadTable(TableIdentifier.of("default", "example"));
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Controlla il nuovo schema della tabella:

      table.schema.toString;
      
  4. Inserisci altri dati e visualizza l'evoluzione dello schema. Di seguito è riportato un esempio.

    1. Aggiungi nuovi dati alla tabella:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save("default.example");
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save("default.example");
      
    2. Controlla i nuovi dati inseriti:

      val read_df2=spark.read.format("iceberg").load("default.example");
      read_df2.show;
      
    3. Visualizza la cronologia della tabella:

      spark.read.format("iceberg").load("default.example.history").show(truncate = false);
      
    4. Visualizza le istantanee:

      spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
      
    5. Visualizza i file manifest:

      spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
      
    6. Visualizza i file di dati:

      spark.read.format("iceberg").load("default.example.files").show(truncate = false);
      
    7. Supponiamo che tu abbia commesso un errore aggiungendo la riga con il valore id=6 e che tu voglia tornare indietro per visualizzare una versione corretta della tabella:

      spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
      

      Sostituisci snapshot-id con la versione a cui vuoi tornare.

Utilizzare le tabelle Hadoop per creare tabelle Iceberg

  1. Configura le tabelle Hadoop per creare tabelle Iceberg in Spark Scala:

    import org.apache.hadoop.conf.Configuration
    import org.apache.iceberg.hadoop.HadoopTables
    import org.apache.iceberg.Table
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. Crea una tabella per inserire e aggiornare i dati. Di seguito è riportato un esempio.

    1. Crea una tabella denominata example nel database default:

      val conf = new Configuration();
      val tables = new HadoopTables(conf);
      
    2. Inserisci dati di esempio:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Specifica la strategia di partizionamento in base alla colonna id:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Crea la tabella:

      val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>";
      val table = tables.create(df1_schema, partition_spec, table_location);
      
    5. Scrivi i dati nella tabella:

      df1.write.format("iceberg").mode("overwrite").save(table_location);
      
    6. Leggi i dati:

      val read_df1=spark.read.format("iceberg").load(table_location);
      read_df1.show;
      
  3. Modifica lo schema della tabella. Di seguito è riportato un esempio.

    1. Recupera la tabella e aggiungi una nuova colonna grade:

      val table = tables.load(table_location);
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Controlla il nuovo schema della tabella:

      table.schema.toString;
      
  4. Inserisci altri dati e visualizza l'evoluzione dello schema. Di seguito è riportato un esempio.

    1. Aggiungi nuovi dati alla tabella:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save(table_location);
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save(table_location);
      
    2. Controlla i nuovi dati inseriti:

      val read_df2=spark.read.format("iceberg").load(table_location);
      read_df2.show;
      
    3. Visualizza la cronologia della tabella:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
      
    4. Visualizza le istantanee:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
      
    5. Visualizza i file manifest:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
      
    6. Visualizza i file di dati:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
      
    7. Torna indietro per visualizzare una versione specifica della tabella:

      spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
      

      Sostituisci snapshot-id con la versione a cui vuoi tornare e aggiungi "L" alla fine. Ad esempio, "3943776515926014142L".

Utilizzare la tabella Iceberg su Hive

Iceberg supporta la lettura delle tabelle utilizzando Hive tramite un StorageHandler. Tieni presente che sono supportate solo le versioni 2.x e 3.1.2 di Hive. Per ulteriori informazioni, vedi Apache Iceberg - Hive. Inoltre, aggiungi il file JAR del runtime di Iceberg Hive al classpath di Hive. Per scaricare il file JAR, vedi Download di Apache Iceberg. Per sovrapporre una tabella Hive a una tabella Iceberg, devi creare la tabella Iceberg utilizzando un catalogo Hive o una tabella Hadoop. Inoltre, devi configurare Hive di conseguenza per leggere i dati dalla tabella Iceberg.

Leggere la tabella Iceberg (catalogo Hive) su Hive

  1. Apri il client Hive e configura le impostazioni per leggere le tabelle Iceberg nella sessione del client Hive:

    add jar /path/to/iceberg-hive-runtime.jar;
    set iceberg.engine.hive.enabled=true;
    set engine.hive.enabled=true;
    set iceberg.mr.catalog=hive;
    set hive.vectorized.execution.enabled=false;
    
  2. Leggi lo schema e i dati della tabella. Di seguito è riportato un esempio.

    1. Controlla lo schema della tabella e se il formato della tabella è Iceberg:

      describe formatted example;
      
    2. Leggi i dati dalla tabella:

      select * from example;
      

Leggere la tabella Iceberg (tabella Hadoop) su Hive

  1. Apri il client Hive e configura le impostazioni per leggere le tabelle Iceberg nella sessione del client Hive:

    add jar /path/to/iceberg-hive-runtime.jar;
    set engine.hive.enabled=true;
    set hive.vectorized.execution.enabled=false;
    
  2. Leggi lo schema e i dati della tabella. Di seguito è riportato un esempio.

    1. Crea una tabella esterna (sovrapponi una tabella Hive alla tabella Iceberg):

      CREATE EXTERNAL TABLE hadoop_table
      STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
      LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>'
      TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
      
    2. Controlla lo schema della tabella e se il formato della tabella è Iceberg:

      describe formatted hadoop_table;
      
    3. Leggi i dati dalla tabella:

      select * from hadoop_table;
      

Utilizzare la tabella Iceberg su Presto

Le query Presto utilizzano il connettore Hive per ottenere le posizioni delle partizioni, quindi devi configurare Presto di conseguenza per leggere e scrivere i dati nella tabella Iceberg. Per ulteriori informazioni, vedi Connettore Hive di Presto/Trino e Connettore Iceberg di Presto/Trino.

Configurazioni di Presto

  1. In ogni nodo del cluster Managed Service per Apache Spark, crea un file denominato iceberg.properties /etc/presto/conf/catalog/iceberg.properties e configura hive.metastore.uri come segue:

    connector.name=iceberg
    hive.metastore.uri=thrift://<example.net:9083>
    

    Sostituisci example.net:9083 con l'host e la porta corretti per il servizio Thrift del metastore Hive.

  2. Riavvia il servizio Presto per eseguire il push delle configurazioni:

    sudo systemctl restart presto.service
    

Creare una tabella Iceberg su Presto

  1. Apri il client Presto e utilizza il connettore "Iceberg" per ottenere il metastore:

    --catalog iceberg --schema default
    
  2. Crea una tabella per inserire e aggiornare i dati. Di seguito è riportato un esempio.

    1. Crea una tabella denominata example nel database default:

      CREATE TABLE iceberg.default.example (
        id integer,
        name VARCHAR,
        major VARCHAR,
        grade VARCHAR)
      WITH (partitioning = ARRAY['major', 'grade']);
      
    2. Inserisci dati di esempio:

      INSERT INTO iceberg.default.example
        VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
      
    3. Leggi i dati dalla tabella:

      SELECT * FROM iceberg.default.example;
      
    4. Inserisci altri nuovi dati per controllare le istantanee:

      INSERT INTO example
        VALUES (4, 'Cindy', 'UX Design', 'Junior');
      
      INSERT INTO example
        VALUES (5, 'Amy', 'UX Design', 'Sophomore');
      
    5. Visualizza le istantanee:

      SELECT snapshot_id FROM iceberg.default."example$snapshots";
      

      Aggiungendo il comando ORDER BY committed_at DESC LIMIT 1;, puoi trovare l'ID dell'ultima istantanea.

    6. Esegui il rollback a una versione specifica della tabella:

      CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
      

      Sostituisci snapshot-id con la versione a cui vuoi tornare.

Passaggi successivi