Mengonfigurasi katalog runtime Lakehouse untuk Managed Service for Apache Spark menggunakan Iceberg 1.10

Dokumen ini menjelaskan cara mengonfigurasi Katalog Apache Iceberg kustom untuk BigQuery dalam katalog runtime Lakehouse.

Anda dapat menyiapkannya menggunakan cluster Managed Service for Apache Spark atau Managed Service for Apache Spark. Hal ini membuat satu katalog bersama di seluruh Google Cloud Lakehouse yang berfungsi secara lancar dengan mesin open source seperti Apache Spark dan Apache Flink.

Sebelum memulai

  1. Aktifkan penagihan untuk project Google Cloud Anda. Pelajari cara memeriksa apakah penagihan telah diaktifkan pada suatu project.
  2. Aktifkan BigQuery API dan Managed Service for Apache Spark API.

    Aktifkan API

  3. Pahami katalog runtime Lakehouse.

Peran yang diperlukan

Untuk mendapatkan izin yang diperlukan untuk mengonfigurasi katalog runtime Lakehouse, minta administrator untuk memberi Anda peran IAM berikut:

  • Buat cluster Managed Service for Apache Spark: Dataproc Worker (roles/dataproc.worker) di akun layanan default Compute Engine dalam project
  • Buat tabel katalog runtime Lakehouse:
    • Dataproc Worker (roles/dataproc.worker) di akun layanan VM Managed Service for Apache Spark dalam project
    • BigQuery Data Editor (roles/bigquery.dataEditor) di akun layanan VM Managed Service for Apache Spark dalam project
    • Pengguna Objek Penyimpanan (roles/storage.objectUser) di akun layanan VM Managed Service for Apache Spark dalam project
  • Membuat kueri tabel katalog runtime Lakehouse:

Untuk mengetahui informasi selengkapnya tentang pemberian peran, lihat Mengelola akses ke project, folder, dan organisasi.

Anda mungkin juga bisa mendapatkan izin yang diperlukan melalui peran khusus atau peran bawaan lainnya.

Mengonfigurasi metastore dengan Managed Service for Apache Spark

Anda dapat mengonfigurasi katalog runtime Lakehouse dengan Managed Service untuk Apache Spark menggunakan Spark atau Flink:

Spark

  1. Konfigurasi cluster baru. Untuk membuat cluster Managed Service for Apache Spark baru, jalankan perintah gcloud dataproc clusters create berikut, yang berisi setelan yang perlu Anda gunakan untuk katalog runtime Lakehouse:

    gcloud dataproc clusters create CLUSTER_NAME \
        --project=PROJECT_ID \
        --region=LOCATION \
        --single-node

    Ganti kode berikut:

    • CLUSTER_NAME: nama untuk cluster Managed Service for Apache Spark Anda.
    • PROJECT_ID: ID Google Cloud project tempat Anda membuat cluster.
    • LOCATION: region Compute Engine tempat Anda membuat cluster.
  2. Kirimkan tugas Spark menggunakan salah satu metode berikut:

    Google Cloud CLI

    gcloud dataproc jobs submit spark-sql \
        --project=PROJECT_ID \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.10.0/iceberg-spark-runtime-3.5_2.12-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/1.10.0/iceberg-bigquery-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/1.10.0/iceberg-gcp-bundle-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/1.10.0/iceberg-gcp-1.10.0.jar \
        --properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,\
        spark.sql.catalog.CATALOG_NAME.type=bigquery,\
        spark.sql.catalog.CATALOG_NAME.gcp.bigquery.project-id=PROJECT_ID,\
        spark.sql.catalog.CATALOG_NAME.gcp.bigquery.location=LOCATION,\
        spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
        --execute="SPARK_SQL_COMMAND"

    Ganti kode berikut:

    • PROJECT_ID: ID Google Cloud project yang berisi cluster Managed Service for Apache Spark.
    • CLUSTER_NAME: nama cluster Managed Service for Apache Spark yang Anda gunakan untuk menjalankan tugas Spark SQL.
    • REGION: region Compute Engine tempat cluster Anda berada.
    • LOCATION: lokasi resource BigQuery.
    • CATALOG_NAME: nama katalog Spark yang akan digunakan dengan tugas SQL Anda.
    • WAREHOUSE_DIRECTORY: folder Cloud Storage yang berisi data warehouse Anda. Nilai ini diawali dengan gs://.
    • SPARK_SQL_COMMAND: kueri Spark SQL yang ingin Anda jalankan. Kueri ini mencakup perintah untuk membuat resource Anda. Misalnya, untuk membuat namespace dan tabel.

    spark-sql CLI

    1. Di konsol Google Cloud , buka halaman VM Instances.

      Buka Instance VM

    2. Untuk terhubung ke instance VM Managed Service for Apache Spark, klik SSH di baris yang mencantumkan nama instance VM utama cluster Managed Service for Apache Spark, yaitu nama cluster yang diikuti dengan akhiran -m. Outputnya mirip dengan hal berikut ini:

      Connected, host fingerprint: ssh-rsa ...
      Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ...
      ...
      example-cluster@cluster-1-m:~$
      
    3. Di terminal, jalankan perintah inisialisasi katalog runtime Lakehouse berikut:

      spark-sql \
          --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.10.0/iceberg-spark-runtime-3.5_2.12-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/1.10.0/iceberg-bigquery-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/1.10.0/iceberg-gcp-bundle-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/1.10.0/iceberg-gcp-1.10.0.jar \
          --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
          --conf spark.sql.catalog.CATALOG_NAME.type=bigquery \
          --conf spark.sql.catalog.CATALOG_NAME.gcp.bigquery.project-id=PROJECT_ID \
          --conf spark.sql.catalog.CATALOG_NAME.gcp.bigquery.location=LOCATION \
          --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

      Ganti kode berikut:

      • CATALOG_NAME: nama katalog Spark yang Anda gunakan dengan tugas SQL.
      • PROJECT_ID: Google Cloud project ID katalog Lakehouse runtime yang ditautkan dengan katalog Spark Anda.
      • LOCATION: Google Cloud lokasi katalog runtime Lakehouse.
      • WAREHOUSE_DIRECTORY: folder Cloud Storage yang berisi data warehouse Anda. Nilai ini diawali dengan gs://.

      Setelah berhasil terhubung ke cluster, terminal Spark Anda akan menampilkan perintah spark-sql, yang dapat Anda gunakan untuk mengirimkan tugas Spark.

      spark-sql (default)>
      
  1. Buat cluster Managed Service for Apache Spark dengan komponen Flink opsional yang diaktifkan, dan pastikan Anda menggunakan Managed Service for Apache Spark 2.2 atau yang lebih baru.
  2. Di konsol Google Cloud , buka halaman VM instances.

    Buka instance VM

  3. Di daftar instance virtual machine, klik SSH untuk terhubung ke instance VM cluster Managed Service for Apache Spark utama, yang tercantum sebagai nama cluster yang diikuti dengan akhiran -m.

  4. Konfigurasi plugin katalog kustom Apache Iceberg untuk katalog runtime Lakehouse:

    FLINK_VERSION=1.20
    ICEBERG_VERSION=1.10.0
    
    cd /usr/lib/flink
    
    sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib
    
    sudo wget -c https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/${ICEBERG_VERSION}/iceberg-bigquery-${ICEBERG_VERSION}.jar -P lib
    
    sudo wget -c https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/${ICEBERG_VERSION}/iceberg-gcp-bundle-${ICEBERG_VERSION}.jar -P lib
    
    sudo wget -c https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/${ICEBERG_VERSION}/iceberg-gcp-${ICEBERG_VERSION}.jar -P lib
  5. Mulai sesi Flink di YARN:

    HADOOP_CLASSPATH=`hadoop classpath`
    
    sudo bin/yarn-session.sh -nm flink-dataproc -d
    
    sudo bin/sql-client.sh embedded \
    -s yarn-session
  6. Buat katalog di Flink:

    CREATE CATALOG CATALOG_NAME WITH (
    'type'='iceberg',
    'warehouse'='WAREHOUSE_DIRECTORY',
    'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog',
    'gcp.bigquery.project-id'='PROJECT_ID',
    'gcp.bigquery.location'='LOCATION'
    );

    Ganti kode berikut:

    • CATALOG_NAME: ID katalog Flink, yang ditautkan ke katalog Lakehouse runtime.
    • WAREHOUSE_DIRECTORY: jalur dasar untuk direktori gudang (folder Cloud Storage tempat Flink membuat file). Nilai ini diawali dengan gs://.
    • PROJECT_ID: project ID katalog Lakehouse runtime yang ditautkan oleh katalog Flink.
    • LOCATION: lokasi resource BigQuery.

Sesi Flink Anda kini terhubung ke katalog runtime Lakehouse, dan Anda dapat menjalankan perintah Flink SQL.

Setelah terhubung ke katalog runtime Lakehouse, Anda dapat membuat dan melihat resource berdasarkan metadata yang disimpan di katalog runtime Lakehouse.

Misalnya, coba jalankan perintah berikut di sesi Flink SQL interaktif Anda untuk membuat database dan tabel Apache Iceberg.

  1. Gunakan katalog Apache Iceberg kustom:

    USE CATALOG CATALOG_NAME;

    Ganti CATALOG_NAME dengan ID katalog Flink Anda.

  2. Buat database, yang akan membuat set data di BigQuery:

    CREATE DATABASE IF NOT EXISTS DATABASE_NAME;

    Ganti DATABASE_NAME dengan nama database baru Anda.

  3. Gunakan database yang Anda buat:

    USE DATABASE_NAME;
  4. Buat tabel Apache Iceberg. Contoh berikut membuat tabel penjualan contoh:

    CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME (
      order_number BIGINT,
      price        DECIMAL(32,2),
      buyer        ROW<first_name STRING, last_name STRING>,
      order_time   TIMESTAMP(3)
    );

    Ganti ICEBERG_TABLE_NAME dengan nama untuk tabel baru Anda.

  5. Melihat metadata tabel:

    DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
  6. Mencantumkan tabel dalam database:

    SHOW TABLES;

Menyerap data ke dalam tabel

Setelah membuat tabel Apache Iceberg di bagian sebelumnya, Anda dapat menggunakan Flink DataGen sebagai sumber data untuk menyerap data real-time ke dalam tabel. Langkah-langkah berikut adalah contoh alur kerja ini:

  1. Buat tabel sementara menggunakan DataGen:

    CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME
    WITH (
      'connector' = 'datagen',
      'rows-per-second' = '10',
      'fields.order_number.kind' = 'sequence',
      'fields.order_number.start' = '1',
      'fields.order_number.end' = '1000000',
      'fields.price.min' = '0',
      'fields.price.max' = '10000',
      'fields.buyer.first_name.length' = '10',
      'fields.buyer.last_name.length' = '10'
    )
    LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);

    Ganti kode berikut:

    • DATABASE_NAME: nama database untuk menyimpan tabel sementara Anda.
    • TEMP_TABLE_NAME: nama untuk tabel sementara Anda.
    • ICEBERG_TABLE_NAME: nama tabel Apache Iceberg yang Anda buat di bagian sebelumnya.
  2. Tetapkan paralelisme ke 1:

    SET 'parallelism.default' = '1';
  3. Menetapkan interval titik pemeriksaan:

    SET 'execution.checkpointing.interval' = '10second';
  4. Tetapkan titik pemeriksaan:

    SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
  5. Mulai tugas streaming real-time:

    INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;

    Outputnya mirip dengan hal berikut ini:

    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 0de23327237ad8a811d37748acd9c10b
    
  6. Untuk memeriksa status tugas streaming, lakukan hal berikut:

    1. Di konsol Google Cloud , buka halaman Clusters.

      Buka Cluster

    2. Pilih cluster Anda.

    3. Klik tab Antarmuka web.

    4. Klik link YARN ResourceManager.

    5. Di antarmuka YARN ResourceManager, temukan sesi Flink Anda, lalu klik link ApplicationMaster di bagian Tracking UI.

    6. Di kolom Status, konfirmasi bahwa status tugas Anda adalah Berjalan.

  7. Buat kueri data streaming di klien Flink SQL:

    SELECT * FROM ICEBERG_TABLE_NAME
    /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/
    ORDER BY order_time desc
    LIMIT 20;
  8. Mengkueri data streaming di BigQuery:

    SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME`
    ORDER BY order_time desc
    LIMIT 20;
  9. Hentikan tugas streaming di klien Flink SQL:

    STOP JOB 'JOB_ID';

    Ganti JOB_ID dengan ID tugas yang ditampilkan di output saat Anda membuat tugas streaming.

Mengonfigurasi metastore dengan Managed Service for Apache Spark

Anda dapat mengonfigurasi katalog runtime Lakehouse dengan Managed Service untuk Apache Spark menggunakan Spark SQL atau PySpark.

Spark SQL

  1. Buat file SQL dengan perintah Spark SQL yang ingin Anda jalankan di katalog runtime Lakehouse. Misalnya, perintah ini akan membuat namespace dan tabel:

    SET `spark.sql.catalog.CATALOG_NAME`=`org.apache.iceberg.spark.SparkCatalog`;
    SET `spark.sql.catalog.CATALOG_NAME.type`=`bigquery`;
    SET `spark.sql.catalog.CATALOG_NAME.gcp.bigquery.project-id`=`PROJECT_ID`;
    SET `spark.sql.catalog.CATALOG_NAME.gcp.bigquery.location`=`LOCATION`;
    SET `spark.sql.catalog.CATALOG_NAME.warehouse`=`WAREHOUSE_DIRECTORY`;
    
    CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME;
    CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';

    Ganti kode berikut:

    • CATALOG_NAME: nama katalog yang mereferensikan tabel Spark Anda.
    • NAMESPACE_NAME: nama namespace yang mereferensikan tabel Spark Anda.
    • TABLE_NAME: nama tabel untuk tabel Spark Anda.
    • WAREHOUSE_DIRECTORY: URI folder Cloud Storage tempat data warehouse Anda disimpan.
  2. Kirimkan tugas batch Spark SQL dengan menjalankan perintah gcloud dataproc batches submit spark-sql berikut:

    gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \
        --project=PROJECT_ID \
        --region=REGION \
        --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
        --deps-bucket=BUCKET_PATH \
        --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.10.0/iceberg-spark-runtime-3.5_2.12-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/1.10.0/iceberg-bigquery-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/1.10.0/iceberg-gcp-bundle-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/1.10.0/iceberg-gcp-1.10.0.jar

    Ganti kode berikut:

    • SQL_SCRIPT_PATH: jalur ke file SQL yang digunakan oleh tugas batch.
    • PROJECT_ID: ID Google Cloud project untuk menjalankan tugas batch.
    • REGION: region tempat workload Anda berjalan.
    • SUBNET_NAME (opsional): nama subnet VPC di REGION yang memenuhi persyaratan subnet sesi.
    • BUCKET_PATH: lokasi bucket Cloud Storage untuk mengupload dependensi workload. WAREHOUSE_DIRECTORY berada di bucket ini. Awalan URI gs:// bucket tidak diperlukan. Anda dapat menentukan jalur bucket atau nama bucket, misalnya, mybucketname1.
    • LOCATION: lokasi untuk menjalankan tugas batch.

    Untuk mengetahui informasi selengkapnya tentang cara mengirimkan tugas batch Spark, lihat Menjalankan workload batch Spark.

PySpark

  1. Buat file Python dengan perintah PySpark yang ingin Anda jalankan di katalog runtime Lakehouse.

    Misalnya, perintah berikut menyiapkan lingkungan Spark untuk berinteraksi dengan tabel Apache Iceberg yang disimpan dalam katalog runtime Lakehouse. Kemudian, perintah akan membuat namespace baru dan tabel Apache Iceberg dalam namespace tersebut.

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder \
    .appName("Lakehouse runtime catalog Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.type", "bigquery") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp.bigquery.project-id", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp.bigquery.location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")

    Ganti kode berikut:

    • PROJECT_ID: ID Google Cloud project untuk menjalankan tugas batch.
    • LOCATION: lokasi tempat resource BigQuery berada.
    • CATALOG_NAME: nama katalog yang mereferensikan tabel Spark Anda.
    • TABLE_NAME: nama tabel untuk tabel Spark Anda.
    • WAREHOUSE_DIRECTORY: URI folder Cloud Storage tempat data warehouse Anda disimpan.
    • NAMESPACE_NAME: nama namespace yang mereferensikan tabel Spark Anda.
  2. Kirimkan tugas batch menggunakan gcloud dataproc batches submit pyspark perintah berikut:

    gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \
        --version=2.2 \
        --project=PROJECT_ID \
        --region=REGION \
        --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
        --deps-bucket=BUCKET_PATH \
        --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.10.0/iceberg-spark-runtime-3.5_2.12-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/1.10.0/iceberg-bigquery-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/1.10.0/iceberg-gcp-bundle-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/1.10.0/iceberg-gcp-1.10.0.jar

    Ganti kode berikut:

    • PYTHON_SCRIPT_PATH: jalur ke skrip Python yang digunakan tugas batch.
    • PROJECT_ID: ID Google Cloud project untuk menjalankan tugas batch.
    • REGION: region tempat workload Anda berjalan.
    • BUCKET_PATH: lokasi bucket Cloud Storage untuk mengupload dependensi workload. Awalan URI gs:// bucket tidak diperlukan. Anda dapat menentukan jalur bucket atau nama bucket, misalnya, mybucketname1.

    Untuk mengetahui informasi selengkapnya tentang cara mengirimkan tugas batch PySpark, lihat referensi gcloud PySpark.

Langkah berikutnya