Membuat lakehouse dengan Spark dan katalog runtime Lakehouse

Arsitektur lakehouse menggabungkan fleksibilitas data lake dengan fitur pengelolaan data data warehouse. Dokumen ini menunjukkan cara menyiapkan lakehouse di Google Cloud. Anda menggunakan Apache Iceberg sebagai format tabel, Managed Service untuk Apache Spark untuk pemrosesan, dan Katalog REST Iceberg katalog runtime Lakehouse untuk pengelolaan metadata terpadu.

Arsitektur ini menggunakan format tabel terbuka seperti Iceberg untuk menambahkan kemampuan data warehousing, seperti transaksi dan evolusi skema, ke data di Cloud Storage. Pendekatan ini membuat satu sumber tepercaya untuk data Anda yang dapat diakses oleh berbagai mesin.

Diagram yang menunjukkan komponen arsitektur lakehouse, termasuk Managed Service untuk Apache Spark, Cloud Storage, dan Lakehouse REST Catalog.
Diagram arsitektur Lakehouse.

Sebelum memulai

  1. Login keakun Anda. Google Cloud Jika Anda baru menggunakan Google Cloud, buat akun untuk mengevaluasi performa produk kami dalam skenario dunia nyata. Pelanggan baru juga mendapatkan kredit gratis senilai $300 untuk menjalankan, menguji, dan men-deploy workload.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that you have the permissions required to complete this guide.

  4. Verify that billing is enabled for your Google Cloud project.

  5. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  6. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  7. Verify that you have the permissions required to complete this guide.

  8. Verify that billing is enabled for your Google Cloud project.

  9. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  10. Buat bucket Cloud Storage untuk menyimpan data Iceberg.

Peran yang diperlukan

Peran Identity and Access Management (IAM) tertentu diperlukan untuk menjalankan contoh di halaman ini. Bergantung pada kebijakan organisasi, peran ini mungkin sudah diberikan. Untuk memeriksa pemberian peran, lihat Apakah Anda perlu memberikan peran?.

Untuk mengetahui informasi selengkapnya tentang pemberian peran, lihat Mengelola akses ke project,folder, dan organisasi.

Peran pengguna

Untuk mendapatkan izin yang Anda perlukan untuk membuat cluster Managed Service untuk Apache Spark, minta administrator Anda untuk memberikan peran IAM berikut:

Peran akun layanan

Untuk memastikan bahwa akun layanan default Compute Engine memiliki izin yang diperlukan untuk membuat cluster Managed Service untuk Apache Spark, minta administrator Anda untuk memberikan peran IAM Dataproc Worker (roles/dataproc.worker) ke akun layanan default Compute Engine di project.

Membuat cluster Managed Service untuk Apache Spark

Buat cluster Managed Service untuk Apache Spark dengan komponen opsional Iceberg dan Jupyter.

  1. Untuk membuat cluster, jalankan perintah gcloud berikut:

    gcloud dataproc clusters create CLUSTER_NAME \
        --project=PROJECT_ID \
        --region=REGION \
        --image-version=2.3-debian12 \
        --optional-components=ICEBERG,JUPYTER \
        --enable-component-gateway \
        --properties 'dataproc:dataproc.lineage.enabled=true'
    

    Ganti kode berikut:

    • CLUSTER_NAME: nama untuk cluster Anda.
    • PROJECT_ID: ID Google Cloud project Anda.
    • REGION: region untuk cluster, misalnya, us-central1. Google Cloud

    Perhatikan bahwa menetapkan dataproc:dataproc.lineage.enabled=true tidak diperlukan agar Katalog REST Iceberg katalog runtime Lakehouse berfungsi dengan benar. Kode ini ditambahkan untuk pelacakan silsilah dalam contoh silsilah data di bawah.

  2. Hubungkan ke cluster menggunakan Notebook Jupyter. Anda dapat menggunakan notebook Vertex AI Workbench atau meluncurkan notebook langsung di cluster.

Mengonfigurasi sesi Spark

Di Notebook Jupyter, buat sesi Spark yang dikonfigurasi untuk menggunakan Katalog REST Iceberg katalog runtime Lakehouse.

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

catalog_name = "CATALOG_NAME"

spark = SparkSession.builder.appName("APP_NAME") \
  .config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.type', 'rest') \
  .config(f'spark.sql.catalog.{catalog_name}.uri', 'https://biglake.googleapis.com/iceberg/v1beta/restcatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.warehouse', 'gs://GCS_BUCKET') \
  .config(f'spark.sql.catalog.{catalog_name}.header.x-goog-user-project', 'PROJECT_ID') \
  .config(f'spark.sql.catalog.{catalog_name}.rest.auth.type', 'org.apache.iceberg.gcp.auth.GoogleAuthManager') \
  .config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.gcp.gcs.GCSFileIO') \
  .config(f'spark.sql.catalog.{catalog_name}.rest-metrics-reporting-enabled', 'false') \
  .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
  .config('spark.sql.defaultCatalog', catalog_name) \
  .getOrCreate()

Ganti kode berikut:

  • CATALOG_NAME: nama untuk katalog Iceberg Anda, misalnya, iceberg_catalog.
  • APP_NAME: nama aplikasi Spark Anda.
  • GCS_BUCKET: bucket Cloud Storage untuk menyimpan data tabel Iceberg Anda.
  • PROJECT_ID: ID Google Cloud project Anda.

Mengelola data dengan Spark SQL

Setelah mengonfigurasi sesi Spark, gunakan Spark SQL untuk melakukan operasi pengelolaan data.

  1. Membuat namespace. Di Katalog REST Iceberg katalog runtime Lakehouse, namespace sesuai dengan set data BigQuery.

    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME")
    spark.sql("USE NAMESPACE_NAME")
    

    Ganti NAMESPACE_NAME dengan nama untuk namespace Anda, misalnya, spark_lakehouse.

  2. Buat tabel dasar dalam format Iceberg dan masukkan data.

    spark.sql("DROP TABLE IF EXISTS base_table PURGE")
    spark.sql("CREATE TABLE base_table (id LONG) USING iceberg")
    spark.sql("INSERT INTO base_table VALUES 0, 1, 2, 3, 4")
    spark.sql("SELECT * FROM base_table").show()
    

    Outputnya mirip dengan hal berikut ini:

       +---+
       | id|
       +---+
       |  0|
       |  1|
       |  2|
       |  3|
       |  4|
       +---+
    
  3. Buat tabel kedua untuk data baru.

    spark.sql("DROP TABLE IF EXISTS newdata PURGE")
    spark.sql("CREATE TABLE newdata(id LONG) USING iceberg")
    spark.sql("INSERT INTO newdata VALUES 3, 4, 5, 6")
    spark.sql("SELECT * FROM newdata").show()
    

    Outputnya mirip dengan hal berikut ini:

       +---+
       | id|
       +---+
       |  3|
       |  4|
       |  5|
       |  6|
       +---+
    
  4. Gabungkan data baru ke dalam tabel dasar.

    spark.sql("""MERGE INTO base_table USING newdata
               ON base_table.id = newdata.id
               WHEN MATCHED THEN
                 UPDATE SET base_table.id = newdata.id
               WHEN NOT MATCHED THEN INSERT * """)
    spark.sql("SELECT * FROM base_table").show()
    

    Outputnya mirip dengan hal berikut ini:

       +---+
       | id|
       +---+
       |  0|
       |  1|
       |  2|
       |  3|
       |  4|
       |  5|
       |  6|
       +---+
    
  5. Perbarui data dalam tabel dasar.

    spark.sql(
         "UPDATE base_table SET id = (id + 100) WHERE (id % 2 == 0)"
     )
    spark.sql("SELECT * FROM base_table").show()
    

    Outputnya mirip dengan hal berikut ini:

       +---+
       | id|
       +---+
       |  3|
       |104|
       |  5|
       |106|
       |100|
       |102|
       |  1|
       +---+
    
  6. Hapus data dari tabel dasar.

    spark.sql("DELETE FROM base_table WHERE (id % 2 == 0)")
    spark.sql("SELECT * FROM base_table").show()
    

    Outputnya mirip dengan hal berikut ini:

       +---+
       | id|
       +---+
       |  3|
       |  5|
       |  1|
       +---+
    

Membuat kueri snapshot historis

Ambil versi tabel sebelumnya dengan membuat kueri ID snapshot tertentu. Operasi ini juga dikenal sebagai lintas waktu.

  1. Ambil ID snapshot versi tabel sebelum operasi MERGE, UPDATE, dan DELETE.

    snapshot_ids = spark.sql(
         "SELECT snapshot_id FROM `NAMESPACE_NAME`.`base_table`.snapshots"
    ).collect()
    oldest_snapshot_id = snapshot_ids[1]["snapshot_id"]
    

    Ganti NAMESPACE_NAME dengan namespace yang Anda buat.

  2. Buat kueri tabel menggunakan ID snapshot yang diambil.

    df = (
         spark.read.format("iceberg")
         .option("versionAsOf", oldest_snapshot_id)
         .load("base_table")
     )
    df.show()
    

    Output menunjukkan status tabel setelah operasi MERGE, tetapi sebelum operasi UPDATE atau DELETE.

       +---+
       | id|
       +---+
       |  0|
       |  1|
       |  2|
       |  3|
       |  4|
       |  5|
       |  6|
       +---+
    

Menemukan silsilah data

Anda dapat melacak pergerakan data antara tabel Katalog REST Iceberg katalog runtime Lakehouse dengan silsilah data, yang tersedia di Managed Service untuk Apache Spark 2.2 dan versi image yang lebih baru.

Contoh silsilah data

  1. Buat tabel Iceberg sumber dan target, lalu salin data.

    spark.sql("DROP TABLE IF EXISTS source_table PURGE")
    spark.sql("DROP TABLE IF EXISTS target_table PURGE")
    spark.sql("CREATE TABLE source_table (id LONG) USING iceberg")
    spark.sql("""CREATE TABLE target_table
      USING ICEBERG
      AS SELECT max(id) as top_id FROM source_table
      """)
    
  2. Di Google Cloud konsol, buka halaman Knowledge Catalog Penelusuran.

    Buka Penelusuran

  3. Telusuri salah satu tabel, lalu klik tab Lineage:

    Contoh silsilah data di halaman Knowledge Catalog di konsol Google Cloud .
    Contoh grafik silsilah data di halaman Knowledge Catalog di Google Cloud konsol.

    Silsilah data mengenali representasi logis (tabel katalog runtime Lakehouse) dan fisik (Cloud Storage) dari tabel Katalog REST Iceberg katalog runtime Lakehouse.

Masalah umum silsilah data

Di beberapa cluster Managed Service untuk Apache Spark, silsilah data lengkap mungkin tidak dibuat karena masalah library OpenLineage. Solusi: di konfigurasi sesi Spark, tetapkan properti spark.sql.catalog.{catalog_name}.uri ke https://biglake.googleapis.com/iceberg/v1beta/restcatalog.

Langkah berikutnya