Sie können zusätzliche Komponenten wie Iceberg installieren, wenn Sie einen Dataproc Cluster mit dem Feature Optionale Komponenten erstellen. Auf dieser Seite wird beschrieben, wie Sie die Iceberg-Komponente optional in einem Dataproc-Cluster installieren können.
Übersicht
Apache Iceberg ist ein offenes Tabellenformat für große analytische Datasets. Es bietet die Zuverlässigkeit und Einfachheit von SQL-Tabellen für Big Data und ermöglicht es Engines wie Spark, Trino, PrestoDB, Flink und Hive, gleichzeitig sicher mit denselben Tabellen zu arbeiten.
Wenn die Apache Iceberg-Komponente in einem Dataproc-Cluster installiert ist, werden Iceberg -Bibliotheken installiert und Spark und Hive so konfiguriert, dass sie mit Iceberg im Cluster funktionieren.
Wichtige Iceberg-Funktionen
Zu den Iceberg-Funktionen gehören:
- Schemaentwicklung:Sie können Spalten hinzufügen, entfernen oder umbenennen, ohne die gesamte Tabelle neu zu schreiben.
- Zeitreisen:Sie können historische Tabellensnapshots für Audits oder Rollbacks abfragen.
- Versteckte Partitionierung: Sie können das Datenlayout für schnellere Abfragen optimieren, ohne Nutzern Details zur Partitionierung preiszugeben.
- ACID-Transaktionen: Sie können die Datenkonsistenz gewährleisten und Konflikte vermeiden.
Kompatible Dataproc-Image-Versionen
Sie können die Iceberg-Komponente auf Dataproc-Clustern installieren, die mit Image-Version 2.2.47 und höher erstellt wurden. Die im Cluster installierte Iceberg-Version ist auf der Seite mit den Releaseversionen von 2.2 aufgeführt.
Iceberg-bezogene Attribute
Wenn Sie einen Dataproc-Cluster mit Iceberg erstellen, werden die folgenden Spark- und Hive-Attribute für die Verwendung mit Iceberg konfiguriert.
| Konfigurationsdatei | Attribut | Standardwert |
|---|---|---|
/etc/spark/conf/spark-defaults.conf |
spark.sql.extensions |
org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions |
spark.driver.extraClassPath |
/usr/lib/iceberg/lib/iceberg-spark-runtime-spark-version_scala-version.jar |
|
spark.executor.extraClassPath |
/usr/lib/iceberg/lib/iceberg-spark-runtime-spark-version_scala-version.jar |
|
/etc/hive/conf/hive-site.xml |
hive.aux.jars.path |
file:///usr/lib/iceberg/lib/iceberg-hive-runtime.jar |
iceberg.engine.hive.enabled |
true |
Optionale Iceberg-Komponente installieren
Installieren Sie die Iceberg-Komponente, wenn Sie einen Dataproc-Cluster erstellen. Auf den Seiten mit der Liste der Dataproc-Cluster-Image-Versionen ist die Iceberg-Komponentenversion aufgeführt, die in den neuesten Dataproc Cluster-Image-Versionen enthalten ist.
Google Cloud Console
So erstellen Sie einen Dataproc-Cluster, in dem die Iceberg-Komponente installiert ist: Führen Sie die folgenden Schritte in der Google Cloud Konsole aus:
- Öffnen Sie die Seite „Dataproc Cluster erstellen “. Der Bereich Cluster einrichten ist ausgewählt.
- Wählen Sie im Abschnitt Komponenten unter Optionale Komponenten die Komponente Iceberg aus.
- Bestätigen oder geben Sie andere Clustereinstellungen an und klicken Sie dann auf Erstellen.
Google Cloud CLI
Verwenden Sie zum Erstellen eines Dataproc-Clusters, in dem die Iceberg-Komponente installiert ist, den
gcloud dataproc clusters create
Befehl mit dem --optional-components Flag.
gcloud dataproc clusters create CLUSTER_NAME \ --region=REGION \ --optional-components=ICEBERG \ other flags ...
Ersetzen Sie Folgendes:
- CLUSTER_NAME: Der Name des neuen Clusters.
- REGION: Die Clusterregion.
REST API
Wenn Sie einen Dataproc-Cluster erstellen möchten, in dem die optionale Iceberg-Komponente installiert ist,
geben Sie die Iceberg
SoftwareConfig.Component
als Teil einer
clusters.create
Anfrage an.
Iceberg-Tabellen mit Spark und Hive verwenden
Nachdem Sie einen Dataproc-Cluster erstellt haben, in dem die optionale Iceberg-Komponente installiert ist, können Sie mit Spark und Hive Iceberg-Tabellendaten lesen und schreiben.
Spark
Spark-Sitzung für Iceberg konfigurieren
Sie können den gcloud CLI-Befehl lokal oder die spark-shell oder pyspark
REPLs (Read-Eval-Print Loops) verwenden, die auf dem Masterknoten des Dataproc-Clusters ausgeführt werden, um die Spark-Erweiterungen von Iceberg zu aktivieren und den Spark-Katalog für die Verwendung von Iceberg
Tabellen einzurichten.
gcloud
Führen Sie das folgende gcloud CLI-Beispiel in einem lokalen Terminalfenster oder in Cloud Shell aus, um einen Spark-Job zu senden und Spark-Attribute festzulegen, um die Spark-Sitzung für Iceberg zu konfigurieren.
gcloud dataproc jobs submit spark \ --cluster=CLUSTER_NAME \ --region=REGION \ --properties="spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \ --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \ --properties="spark.sql.catalog.CATALOG_NAME.type=hadoop" \ --properties="spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER" \ other flags ...
Ersetzen Sie Folgendes:
- CLUSTER_NAME: Der Clustername.
- REGION: Die Compute Engine-Region.
- CATALOG_NAME: Der Name des Iceberg-Katalogs.
- BUCKET und FOLDER: Der Speicherort des Iceberg-Katalogs in Cloud Storage.
spark-shell
So konfigurieren Sie eine Spark-Sitzung für Iceberg mit der spark-shell-REPL im
Dataproc-Cluster:
Stellen Sie eine SSH-Verbindung zum Masterknoten des Dataproc-Clusters her.
Führen Sie im Terminal der SSH-Sitzung den folgenden Befehl aus, um die Spark Sitzung für Iceberg zu konfigurieren.
spark-shell \
--conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
--conf "spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \
--conf "spark.sql.catalog.CATALOG_NAME.type=hadoop" \
--conf "spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER"
Ersetzen Sie Folgendes:
- CLUSTER_NAME: Der Clustername.
- REGION: Die Compute Engine-Region.
- CATALOG_NAME: Der Name des Iceberg-Katalogs.
- BUCKET und FOLDER: Der Speicherort des Iceberg-Katalogs in Cloud Storage.
pyspark-Shell
So konfigurieren Sie eine Spark-Sitzung für Iceberg mit der pyspark-REPL im
Dataproc-Cluster:
Stellen Sie eine SSH-Verbindung zum Masterknoten des Dataproc-Clusters her.
Führen Sie im Terminal der SSH-Sitzung den folgenden Befehl aus, um die Spark Sitzung für Iceberg zu konfigurieren:
pyspark \
--conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
--conf "spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \
--conf "spark.sql.catalog.CATALOG_NAME.type=hadoop" \
--conf "spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER"
Ersetzen Sie Folgendes:
- CLUSTER_NAME: Der Clustername.
- REGION: Die Compute Engine-Region.
- CATALOG_NAME: Der Name des Iceberg-Katalogs.
- BUCKET und FOLDER: Der Speicherort des Iceberg-Katalogs in Cloud Storage.
Daten in eine Iceberg-Tabelle schreiben
Sie können mit Spark Daten in eine Iceberg-Tabelle schreiben. Die folgenden Code-Snippets erstellen ein
DataFrame
mit Beispieldaten, erstellen eine Iceberg-Tabelle in Cloud Storage,
und schreiben dann die Daten in die Iceberg-Tabelle.
PySpark
# Create a DataFrame with sample data.
data = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"])
# Create an Iceberg table in Cloud Storage.
spark.sql("""CREATE TABLE IF NOT EXISTS CATALOG_NAME.NAMESPACE.TABLE_NAME (
id integer,
name string)
USING iceberg
LOCATION 'gs://BUCKET/FOLDER/NAMESPACE/TABLE_NAME'""")
# Write the DataFrame to the Iceberg table in Cloud Storage.
data.writeTo("CATALOG_NAME.NAMESPACE.TABLE_NAME").append()
Scala
// Create a DataFrame with sample data.
val data = Seq((1, "Alice"), (2, "Bob")).toDF("id", "name")
// Create an Iceberg table in Cloud Storage.
spark.sql("""CREATE TABLE IF NOT EXISTS CATALOG_NAME.NAMESPACE.TABLE_NAME (
id integer,
name string)
USING iceberg
LOCATION 'gs://BUCKET/FOLDER/NAMESPACE/TABLE_NAME'""")
// Write the DataFrame to the Iceberg table in Cloud Storage.
data.writeTo("CATALOG_NAME.NAMESPACE.TABLE_NAME").append()
Daten aus einer Iceberg-Tabelle lesen
Sie können mit Spark Daten aus einer Iceberg-Tabelle lesen. Die folgenden Code-Snippets lesen die Tabelle und zeigen dann ihren Inhalt an.
PySpark
# Read Iceberg table data into a DataFrame.
df = spark.read.format("iceberg").load("CATALOG_NAME.NAMESPACE.TABLE_NAME")
# Display the data.
df.show()
Scala
// Read Iceberg table data into a DataFrame.
val df = spark.read.format("iceberg").load("CATALOG_NAME.NAMESPACE.TABLE_NAME")
// Display the data.
df.show()
Spark SQL
SELECT * FROM CATALOG_NAME.NAMESPACE.TABLE_NAME
Hive
Iceberg-Tabelle in Hive erstellen
Dataproc-Cluster konfigurieren Hive vor, um mit Iceberg zu arbeiten.
Führen Sie die folgenden Schritte aus, um die Code-Snippets in diesem Abschnitt auszuführen:
Stellen Sie eine SSH-Verbindung zum Masterknoten Ihres Dataproc-Clusters her.
Rufen Sie
beelineim SSH-Terminalfenster auf.beeline -u jdbc:hive2://
Sie können in Hive eine nicht partitionierte oder partitionierte Iceberg-Tabelle erstellen.
Nicht partitionierte Tabelle
Erstellen Sie in Hive eine nicht partitionierte Iceberg-Tabelle.
CREATE TABLE my_table ( id INT, name STRING, created_at TIMESTAMP ) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';
Partitionierte Tabelle
Erstellen Sie in Hive eine partitionierte Iceberg-Tabelle, indem Sie die Partitionierungsspalten in der
PARTITIONED BY Klausel angeben.
CREATE TABLE my_partitioned_table ( id INT, name STRING ) PARTITIONED BY (date_sk INT) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';
Daten in eine Iceberg-Tabelle in Hive einfügen
Sie können mit Standard-Hive-INSERT Anweisungen Daten in eine Iceberg-Tabelle einfügen.
SET hive.execution.engine=mr; INSERT INTO my_table SELECT 1, 'Alice', current_timestamp();
Einschränkungen
- Die MR-Ausführungs-Engine (MapReduce) wird nur für DML (Data Manipulation Language) Vorgänge unterstützt.
- Die MR-Ausführung ist in Hive
3.1.3veraltet.
Daten aus einer Iceberg-Tabelle in Hive lesen
Verwenden Sie eine SELECT-Anweisung, um Daten aus einer Iceberg-Tabelle zu lesen.
SELECT * FROM my_table;
Iceberg-Tabelle in Hive löschen
Verwenden Sie die Anweisung DROP TABLE, um eine Iceberg-Tabelle in Hive zu löschen.
DROP TABLE my_table;
Nächste Schritte
- Kurzanleitung zu Iceberg ansehen.