Membuat lakehouse dengan Spark dan metastore BigLake

Arsitektur lakehouse menggabungkan fleksibilitas data lake dengan fitur pengelolaan data dari data warehouse. Dokumen ini menunjukkan cara menyiapkan lakehouse di Google Cloud. Anda menggunakan Apache Iceberg sebagai format tabel, Managed Service for Apache Spark untuk pemrosesan, dan Katalog REST Iceberg metastore BigLake untuk pengelolaan metadata terpadu.

Arsitektur ini menggunakan format tabel terbuka seperti Iceberg untuk menambahkan kemampuan data warehouse, seperti transaksi dan evolusi skema, ke data di Cloud Storage. Pendekatan ini menciptakan satu sumber tepercaya untuk data Anda yang dapat diakses oleh berbagai mesin.

Sebelum memulai

  1. Login ke akun Google Cloud Anda. Jika Anda baru menggunakan Google Cloud, buat akun untuk mengevaluasi performa produk kami dalam skenario dunia nyata. Pelanggan baru juga mendapatkan kredit gratis senilai $300 untuk menjalankan, menguji, dan men-deploy workload.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that you have the permissions required to complete this guide.

  4. Verify that billing is enabled for your Google Cloud project.

  5. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  6. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  7. Verify that you have the permissions required to complete this guide.

  8. Verify that billing is enabled for your Google Cloud project.

  9. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  10. Buat bucket Cloud Storage untuk menyimpan data Iceberg.

Peran yang diperlukan

Peran IAM tertentu diperlukan untuk menjalankan contoh di halaman ini. Bergantung pada kebijakan organisasi, peran ini mungkin sudah diberikan. Untuk memeriksa pemberian peran, lihat Apakah Anda perlu memberikan peran?.

Untuk mengetahui informasi selengkapnya tentang pemberian peran, lihat Mengelola akses ke project,folder, dan organisasi.

Peran pengguna

Untuk mendapatkan izin yang Anda perlukan untuk membuat cluster Managed Service for Apache Spark, minta administrator untuk memberi Anda peran IAM berikut:

Peran akun layanan

Untuk memastikan bahwa akun layanan default Compute Engine memiliki izin yang diperlukan untuk membuat cluster Managed Service for Apache Spark, minta administrator Anda untuk memberikan peran IAM Dataproc Worker (roles/dataproc.worker) kepada akun layanan default Compute Engine di project.

Membuat cluster Managed Service untuk Apache Spark

Buat cluster Managed Service for Apache Spark dengan komponen opsional Iceberg dan Jupyter.

  1. Untuk membuat cluster, jalankan perintah gcloud berikut:

    gcloud dataproc clusters create CLUSTER_NAME \
        --project=PROJECT_ID \
        --region=REGION \
        --image-version=2.3-debian12 \
        --optional-components=ICEBERG,JUPYTER \
        --enable-component-gateway
    

    Ganti kode berikut:

    • CLUSTER_NAME: nama untuk cluster Anda.
    • PROJECT_ID: Google Cloud Project ID Anda.
    • REGION: Google Cloud region untuk cluster, misalnya, us-central1.
  2. Hubungkan ke cluster menggunakan Jupyter Notebook. Anda dapat menggunakan notebook Vertex AI Workbench atau meluncurkan notebook langsung di cluster.

Mengonfigurasi sesi Spark

Di Jupyter Notebook, buat sesi Spark yang dikonfigurasi untuk menggunakan Katalog REST Iceberg BigLake Metastore.

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

catalog_name = "CATALOG_NAME"

spark = SparkSession.builder.appName("APP_NAME") \
  .config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.type', 'rest') \
  .config(f'spark.sql.catalog.{catalog_name}.uri', 'https://biglake.googleapis.com/iceberg/v1beta/restcatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.warehouse', 'gs://GCS_BUCKET') \
  .config(f'spark.sql.catalog.{catalog_name}.header.x-goog-user-project', 'PROJECT_ID') \
  .config(f'spark.sql.catalog.{catalog_name}.rest.auth.type', 'org.apache.iceberg.gcp.auth.GoogleAuthManager') \
  .config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.gcp.gcs.GCSFileIO') \
  .config(f'spark.sql.catalog.{catalog_name}.rest-metrics-reporting-enabled', 'false') \
  .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
  .config('spark.sql.defaultCatalog', catalog_name) \
  .getOrCreate()

Ganti kode berikut:

  • CATALOG_NAME: nama untuk katalog Iceberg Anda, misalnya, iceberg_catalog.
  • APP_NAME: nama aplikasi Spark Anda.
  • GCS_BUCKET: bucket Cloud Storage untuk menyimpan data tabel Iceberg Anda.
  • PROJECT_ID: Google Cloud Project ID Anda.

Mengelola data dengan Spark SQL

Setelah mengonfigurasi sesi Spark, gunakan Spark SQL untuk melakukan operasi pengelolaan data.

  1. Membuat namespace. Di Katalog REST Iceberg BigLake Metastore, namespace sesuai dengan set data BigQuery.

    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME")
    spark.sql("USE NAMESPACE_NAME")
    

    Ganti NAMESPACE_NAME dengan nama untuk namespace Anda, misalnya, spark_lakehouse.

  2. Buat tabel dasar dalam format Iceberg dan masukkan data.

    spark.sql("DROP TABLE IF EXISTS base_table PURGE")
    spark.sql("CREATE TABLE base_table (id LONG) USING iceberg")
    spark.sql("INSERT INTO base_table VALUES 0, 1, 2, 3, 4")
    spark.sql("SELECT * FROM base_table").show()
    

    Outputnya mirip dengan hal berikut ini:

       +---+
       | id|
       +---+
       |  0|
       |  1|
       |  2|
       |  3|
       |  4|
       +---+
    
  3. Buat tabel kedua untuk data baru.

    spark.sql("DROP TABLE IF EXISTS newdata PURGE")
    spark.sql("CREATE TABLE newdata(id LONG) USING iceberg")
    spark.sql("INSERT INTO newdata VALUES 3, 4, 5, 6")
    spark.sql("SELECT * FROM newdata").show()
    

    Outputnya mirip dengan hal berikut ini:

       +---+
       | id|
       +---+
       |  3|
       |  4|
       |  5|
       |  6|
       +---+
    
  4. Gabungkan data baru ke dalam tabel dasar.

    spark.sql("""MERGE INTO base_table USING newdata
               ON base_table.id = newdata.id
               WHEN MATCHED THEN
                 UPDATE SET base_table.id = newdata.id
               WHEN NOT MATCHED THEN INSERT * """)
    spark.sql("SELECT * FROM base_table").show()
    

    Outputnya mirip dengan hal berikut ini:

       +---+
       | id|
       +---+
       |  0|
       |  1|
       |  2|
       |  3|
       |  4|
       |  5|
       |  6|
       +---+
    
  5. Memperbarui data dalam tabel dasar.

    spark.sql(
         "UPDATE base_table SET id = (id + 100) WHERE (id % 2 == 0)"
     )
    spark.sql("SELECT * FROM base_table").show()
    

    Outputnya mirip dengan hal berikut ini:

       +---+
       | id|
       +---+
       |  3|
       |104|
       |  5|
       |106|
       |100|
       |102|
       |  1|
       +---+
    
  6. Menghapus data dari tabel dasar.

    spark.sql("DELETE FROM base_table WHERE (id % 2 == 0)")
    spark.sql("SELECT * FROM base_table").show()
    

    Outputnya mirip dengan hal berikut ini:

       +---+
       | id|
       +---+
       |  3|
       |  5|
       |  1|
       +---+
    

Mengueri snapshot historis

Mengambil versi tabel sebelumnya dengan membuat kueri ID snapshot tertentu. Operasi ini juga dikenal sebagai time travel.

  1. Ambil ID snapshot versi tabel sebelum operasi MERGE, UPDATE, dan DELETE.

    snapshot_ids = spark.sql(
         "SELECT snapshot_id FROM `NAMESPACE_NAME`.`base_table`.snapshots"
    ).collect()
    oldest_snapshot_id = snapshot_ids[1]["snapshot_id"]
    

    Ganti NAMESPACE_NAME dengan namespace yang Anda buat.

  2. Buat kueri tabel menggunakan ID snapshot yang diambil.

    df = (
         spark.read.format("iceberg")
         .option("versionAsOf", oldest_snapshot_id)
         .load("base_table")
     )
    df.show()
    

    Output menampilkan status tabel setelah operasi MERGE, tetapi sebelum operasi UPDATE atau DELETE.

       +---+
       | id|
       +---+
       |  0|
       |  1|
       |  2|
       |  3|
       |  4|
       |  5|
       |  6|
       +---+
    

Langkah berikutnya