BigLake Metastore für Managed Service for Apache Spark und Spark mit Iceberg 1.9 konfigurieren

In diesem Dokument wird erläutert, wie Sie den benutzerdefinierten Iceberg Katalog für BigQuery im BigLake-Metastore mit entweder Managed Service for Apache Spark oder Managed Service for Apache Spark konfigurieren. So können Sie einen einzelnen, freigegebenen Metastore erstellen, der mit Open-Source-Engines wie Apache Spark oder Apache Flink funktioniert.

Hinweis

  1. Aktivieren Sie die Abrechnung für Ihr Google Cloud Projekt. So prüfen Sie, ob die Abrechnung für ein Projekt aktiviert ist .
  2. Aktivieren Sie die BigQuery API und die Managed Service for Apache Spark API.

    APIs aktivieren

  3. Informationen zum BigLake-Metastore

Erforderliche Rollen

Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Konfigurieren des BigLake-Metastore benötigen:

  • Managed Service for Apache Spark-Cluster erstellen: Dataproc-Worker (roles/dataproc.worker) für das Compute Engine-Standarddienstkonto im Projekt
  • BigLake-Metastore-Tabellen erstellen:
  • BigLake-Metastore-Tabellen abfragen:

Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.

Metastore mit Managed Service for Apache Spark konfigurieren

Sie können den BigLake-Metastore mit Managed Service for Apache Spark entweder mit Spark oder Flink konfigurieren:

Spark

  1. Konfigurieren Sie einen neuen Cluster. Führen Sie den folgenden gcloud dataproc clusters create Befehl aus, um einen neuen Managed Service for Apache Spark-Cluster zu erstellen. Er enthält die Einstellungen, die Sie für die Verwendung des BigLake-Metastore benötigen:

    gcloud dataproc clusters create CLUSTER_NAME \
        --project=PROJECT_ID \
        --region=LOCATION \
        --single-node

    Ersetzen Sie Folgendes:

    • CLUSTER_NAME: ein Name für Ihren Managed Service for Apache Spark-Cluster.
    • PROJECT_ID: die ID des Google Cloud Projekts in dem Sie den Cluster erstellen.
    • LOCATION: die Compute Engine-Region, in der Sie den Cluster erstellen.
  2. Senden Sie einen Spark-Job mit einer der folgenden Methoden:

    Google Cloud CLI

    gcloud dataproc jobs submit spark-sql \
        --project=PROJECT_ID \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,BIGLAKE_ICEBERG_CATALOG_JAR \
        --properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,\
        spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,\
        spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,\
        spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,\
        spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
        --execute="SPARK_SQL_COMMAND"

    Ersetzen Sie Folgendes:

    • PROJECT_ID: die ID des Google Cloud Projekts das den Managed Service for Apache Spark-Cluster enthält.
    • CLUSTER_NAME: der Name des Managed Service for Apache Spark-Clusters, den Sie zum Ausführen des Spark SQL-Jobs verwenden.
    • REGION: die Compute Engine Region, in der sich Ihr Cluster befindet.
    • BIGLAKE_ICEBERG_CATALOG_JAR: der Cloud Storage-URI des benutzerdefinierten Iceberg-Katalog-Plug-ins, das verwendet werden soll. Wählen Sie je nach Iceberg-Versionsnummer eine der folgenden Optionen aus:
      • Iceberg 1.9.1: gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.9.1-1.0.1.jar
      • Iceberg 1.6.1: gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.2.jar
    • LOCATION: der Standort der BigQuery-Ressourcen.
    • CATALOG_NAME: der Name des Spark-Katalogs, der mit Ihrem SQL-Job verwendet werden soll.
    • WAREHOUSE_DIRECTORY: der Cloud Storage-Ordner, der Ihr Data Warehouse enthält. Dieser Wert beginnt mit gs://.
    • SPARK_SQL_COMMAND: die Spark SQL-Abfrage, die Sie ausführen möchten. Diese Abfrage enthält die Befehle zum Erstellen Ihrer Ressourcen. Beispiel: So erstellen Sie einen Namespace und eine Tabelle.

    spark-sql-Befehlszeile

    1. Rufen Sie in der Google Cloud Console die Seite VM-Instanzen auf.

      Zu Seite „VM-Instanzen“

    2. Klicken Sie in der Zeile mit dem Namen der Haupt-VM-Instanz des Managed Service for Apache Spark-Clusters auf SSH , um eine Verbindung zu einer VM-Instanz von Managed Service for Apache Spark herzustellen. Der Name der Haupt-VM-Instanz besteht aus dem Clusternamen gefolgt vom Suffix -m. Die Ausgabe sieht etwa so aus:

      Connected, host fingerprint: ssh-rsa ...
      Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ...
      ...
      example-cluster@cluster-1-m:~$
      
    3. Führen Sie im Terminal den folgenden Befehl zur Initialisierung des BigLake-Metastore aus:

      spark-sql \
          --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,BIGLAKE_ICEBERG_CATALOG_JAR \
          --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
          --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
          --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
          --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
          --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

      Ersetzen Sie Folgendes:

      • BIGLAKE_ICEBERG_CATALOG_JAR: der Cloud Storage-URI des benutzerdefinierten Iceberg-Katalog-Plug-ins, das verwendet werden soll. Wählen Sie je nach Iceberg-Versionsnummer eine der folgenden Optionen aus:
        • Iceberg 1.9.1: gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.9.1-1.0.1.jar
        • Iceberg 1.6.1: gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.2.jar
      • CATALOG_NAME: der Name des Spark-Katalogs, der mit Ihrem SQL-Job verwendet wird.
      • PROJECT_ID: die Google Cloud Projekt ID des BigLake-Metastore-Katalogs, mit dem Ihr Spark-Katalog verknüpft ist.
      • LOCATION: der Google Cloud Standort des BigLake-Metastore.
      • WAREHOUSE_DIRECTORY: der Cloud Storage-Ordner, der Ihr Data Warehouse enthält. Dieser Wert beginnt mit gs://.

      Nachdem Sie erfolgreich eine Verbindung zum Cluster hergestellt haben, wird in Ihrem Spark-Terminal die Eingabeaufforderung spark-sql angezeigt, mit der Sie Spark-Jobs senden können.

      spark-sql (default)>
      
  1. Erstellen Sie einen Managed Service for Apache Spark-Cluster, bei dem die optionale Flink-Komponente aktiviert ist, und stellen Sie sicher, dass Sie Managed Service for Apache Spark 2.2 oder höher verwenden.
  2. Rufen Sie in der Google Cloud Console die VM-Instanzen Seite auf.

    Zu Seite „VM-Instanzen“

  3. Klicken Sie in der Liste der VM-Instanzen auf SSH , um eine Verbindung zur Haupt-VM-Instanz des Managed Service for Apache Spark-Clusters herzustellen. Der Name der Haupt-VM-Instanz besteht aus dem Clusternamen gefolgt vom Suffix -m.

  4. Konfigurieren Sie das benutzerdefinierte Iceberg-Katalog-Plug-in für den BigLake-Metastore:

    FLINK_VERSION=1.19
    ICEBERG_VERSION=1.6.1
    
    cd /usr/lib/flink
    
    sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib
    
    sudo gcloud storage cp gs://spark-lib/bigquery/iceberg-bigquery-catalog-${ICEBERG_VERSION}-1.0.2.jar lib/
  5. Starten Sie die Flink-Sitzung in YARN:

    HADOOP_CLASSPATH=`hadoop classpath`
    
    sudo bin/yarn-session.sh -nm flink-dataproc -d
    
    sudo bin/sql-client.sh embedded \
    -s yarn-session
  6. Erstellen Sie einen Katalog in Flink:

    CREATE CATALOG CATALOG_NAME WITH (
    'type'='iceberg',
    'warehouse'='WAREHOUSE_DIRECTORY',
    'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog',
    'gcp_project'='PROJECT_ID',
    'gcp_location'='LOCATION'
    );

    Ersetzen Sie Folgendes:

    • CATALOG_NAME: die Flink-Katalog-ID, die mit einem BigLake-Metastore-Katalog verknüpft ist.
    • WAREHOUSE_DIRECTORY: der Basispfad für das Warehouse-Verzeichnis (der Cloud Storage-Ordner, in dem Flink Dateien erstellt). Dieser Wert beginnt mit gs://.
    • PROJECT_ID: die Projekt-ID des BigLake-Metastore-Katalogs, mit dem der Flink-Katalog verknüpft ist.
    • LOCATION: der Standort der BigQuery Ressourcen.

Ihre Flink-Sitzung ist jetzt mit dem BigLake-Metastore verbunden und Sie können Flink SQL-Befehle ausführen.

Nachdem Sie eine Verbindung zum BigLake-Metastore hergestellt haben, können Sie Ressourcen basierend auf den im BigLake-Metastore gespeicherten Metadaten erstellen und aufrufen.

Führen Sie beispielsweise die folgenden Befehle in Ihrer interaktiven Flink SQL-Sitzung aus, um eine Iceberg-Datenbank und -Tabelle zu erstellen.

  1. Verwenden Sie den benutzerdefinierten Iceberg-Katalog:

    USE CATALOG CATALOG_NAME;

    Ersetzen Sie CATALOG_NAME durch Ihre Flink-Katalog-ID.

  2. Erstellen Sie eine Datenbank, wodurch ein Dataset in BigQuery erstellt wird:

    CREATE DATABASE IF NOT EXISTS DATABASE_NAME;

    Ersetzen Sie DATABASE_NAME durch den Namen Ihrer neuen Datenbank.

  3. Verwenden Sie die von Ihnen erstellte Datenbank:

    USE DATABASE_NAME;
  4. Erstellen Sie eine Iceberg-Tabelle. Mit dem folgenden Befehl wird eine Beispiel-Verkaufstabelle erstellt:

    CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME (
      order_number BIGINT,
      price        DECIMAL(32,2),
      buyer        ROW<first_name STRING, last_name STRING>,
      order_time   TIMESTAMP(3)
    );

    Ersetzen Sie ICEBERG_TABLE_NAME durch einen Namen für Ihre neue Tabelle.

  5. Tabellenmetadaten aufrufen:

    DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
  6. Tabellen in der Datenbank auflisten:

    SHOW TABLES;

Daten in die Tabelle aufnehmen

Nachdem Sie im vorherigen Abschnitt eine Iceberg-Tabelle erstellt haben, können Sie Flink DataGen als Datenquelle verwenden, um Echtzeitdaten in die Tabelle aufzunehmen. Die folgenden Schritte sind ein Beispiel für diesen Workflow:

  1. Erstellen Sie mit DataGen eine temporäre Tabelle:

    CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME
    WITH (
      'connector' = 'datagen',
      'rows-per-second' = '10',
      'fields.order_number.kind' = 'sequence',
      'fields.order_number.start' = '1',
      'fields.order_number.end' = '1000000',
      'fields.price.min' = '0',
      'fields.price.max' = '10000',
      'fields.buyer.first_name.length' = '10',
      'fields.buyer.last_name.length' = '10'
    )
    LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);

    Ersetzen Sie Folgendes:

    • DATABASE_NAME: der Name der Datenbank, in der die temporäre Tabelle gespeichert werden soll.
    • TEMP_TABLE_NAME: ein Name für Ihre temporäre Tabelle.
    • ICEBERG_TABLE_NAME: der Name der Iceberg-Tabelle, die Sie im vorherigen Abschnitt erstellt haben.
  2. Legen Sie die Parallelität auf 1 fest:

    SET 'parallelism.default' = '1';
  3. Legen Sie das Prüfpunktintervall fest:

    SET 'execution.checkpointing.interval' = '10second';
  4. Legen Sie den Prüfpunkt fest:

    SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
  5. Starten Sie den Echtzeit-Streamingjob:

    INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;

    Die Ausgabe sieht etwa so aus:

    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 0de23327237ad8a811d37748acd9c10b
    
  6. So prüfen Sie den Status des Streamingjobs:

    1. Rufen Sie in der Google Cloud Console die Cluster Seite auf.

      Zu den Clustern

    2. Wählen Sie Ihren Cluster aus.

    3. Klicken Sie auf den Tab Weboberflächen.

    4. Klicken Sie auf den Link YARN ResourceManager.

    5. Suchen Sie auf der Benutzeroberfläche von YARN ResourceManager Ihre Flink-Sitzung und klicken Sie unter Tracking UI auf den Link ApplicationMaster.

    6. Prüfen Sie in der Spalte Status, ob der Jobstatus Running (Wird ausgeführt) lautet.

  7. Streamingdaten im Flink SQL-Client abfragen:

    SELECT * FROM ICEBERG_TABLE_NAME
    /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/
    ORDER BY order_time desc
    LIMIT 20;
  8. Streamingdaten in BigQuery abfragen:

    SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME`
    ORDER BY order_time desc
    LIMIT 20;
  9. Beenden Sie den Streamingjob im Flink SQL-Client:

    STOP JOB 'JOB_ID';

    Ersetzen Sie JOB_ID durch die Job-ID, die in der Ausgabe angezeigt wurde, als Sie den Streamingjob erstellt haben.

Metastore mit Managed Service for Apache Spark konfigurieren

Sie können den BigLake-Metastore mit Managed Service for Apache Spark entweder mit Spark SQL oder PySpark konfigurieren.

Spark SQL

  1. Erstellen Sie eine SQL-Datei mit den Spark SQL-Befehlen, die Sie im BigLake-Metastore ausführen möchten. Mit diesem Befehl werden beispielsweise ein Namespace und eine Tabelle erstellt:

    CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME;
    CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';

    Ersetzen Sie Folgendes:

    • CATALOG_NAME: der Katalogname, der auf Ihre Spark-Tabelle verweist.
    • NAMESPACE_NAME: der Namespace-Name, der auf Ihre Spark-Tabelle verweist.
    • TABLE_NAME: ein Tabellenname für Ihre Spark-Tabelle.
    • WAREHOUSE_DIRECTORY: der URI des Cloud Storage-Ordners, in dem Ihr Data Warehouse gespeichert ist.
  2. Senden Sie einen Spark SQL-Batchjob, indem Sie den folgenden gcloud dataproc batches submit spark-sql Befehl ausführen:

    gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \
        --project=PROJECT_ID \
        --region=REGION \
        --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
        --deps-bucket=BUCKET_PATH \
        --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \
        spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \
        spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \
        spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \
        .sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    Ersetzen Sie Folgendes:

    • SQL_SCRIPT_PATH: der Pfad zur SQL-Datei, die vom Batchjob verwendet wird.
    • PROJECT_ID: die ID des Google Cloud Projekts , in dem der Batchjob ausgeführt werden soll.
    • REGION: die Region, in der Ihre Arbeitslast ausgeführt wird.
    • SUBNET_NAME (optional): der Name eines VPC-Subnetzes in der REGION das die Anforderungen an das Sitzungssubnetz erfüllt.
    • BUCKET_PATH: der Speicherort des Cloud Storage-Bucket, in den Arbeitslastabhängigkeiten hochgeladen werden sollen. Das WAREHOUSE_DIRECTORY befindet sich in diesem Bucket. Das gs://-URI-Präfix des Buckets ist nicht erforderlich. Sie können den Bucket-Pfad oder den Bucket-Namen angeben, z. B. mybucketname1.
    • LOCATION: der Standort, an dem der Batchjob ausgeführt werden soll.

    Weitere Informationen zum Senden von Spark-Batchjobs, finden Sie unter Spark-Batcharbeitslast ausführen.

PySpark

  1. Erstellen Sie eine Python-Datei mit den PySpark-Befehlen, die Sie im BigLake-Metastore ausführen möchten.

    Mit dem folgenden Befehl wird beispielsweise eine Spark-Umgebung eingerichtet, um mit Iceberg-Tabellen zu interagieren, die im BigLake-Metastore gespeichert sind. Anschließend werden mit dem Befehl ein neuer Namespace und eine Iceberg-Tabelle in diesem Namespace erstellt.

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder \
    .appName("BigLake Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")

    Ersetzen Sie Folgendes:

    • PROJECT_ID: die ID des Google Cloud Projekts , in dem der Batchjob ausgeführt werden soll.
    • LOCATION: der Standort der BigQuery-Ressourcen.
    • CATALOG_NAME: der Katalogname, der auf Ihre Spark-Tabelle verweist.
    • TABLE_NAME: ein Tabellenname für Ihre Spark-Tabelle.
    • WAREHOUSE_DIRECTORY: der URI des Cloud Storage-Ordners, in dem Ihr Data Warehouse gespeichert ist.
    • NAMESPACE_NAME: der Namespace-Name, der auf Ihre Spark-Tabelle verweist.
  2. Senden Sie den Batchjob mit dem folgenden gcloud dataproc batches submit pyspark Befehl:

    gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \
        --version=2.2 \
        --project=PROJECT_ID \
        --region=REGION \
        --deps-bucket=BUCKET_PATH \
        --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    Ersetzen Sie Folgendes:

    • PYTHON_SCRIPT_PATH: der Pfad zum Python-Skript, das vom Batchjob verwendet wird.
    • PROJECT_ID: die ID des Google Cloud Projekts , in dem der Batchjob ausgeführt werden soll.
    • REGION: die Region, in der Ihre Arbeitslast ausgeführt wird.
    • BUCKET_PATH: der Speicherort des Cloud Storage-Bucket, in den Arbeitslastabhängigkeiten hochgeladen werden sollen. Das gs://-URI-Präfix des Buckets ist nicht erforderlich. Sie können den Bucket-Pfad oder den Bucket-Namen angeben, z. B. mybucketname1.

    Weitere Informationen zum Senden von PySpark-Batchjobs finden Sie in der gcloud-Referenz zu PySpark.

Nächste Schritte