Membuat dan menjalankan kueri pada tabel BigLake Iceberg di BigQuery

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Halaman ini menjelaskan cara membuat dan mengubah tabel BigLake Iceberg di BigQuery menggunakan operator Airflow di lingkungan Cloud Composer Anda.

Tentang tabel BigLake Iceberg di BigQuery

Tabel BigLake Iceberg di BigQuery memberikan fondasi untuk membangun lakehouse berformat terbuka di Google Cloud. Tabel Iceberg BigLake di BigQuery menawarkan pengalaman terkelola sepenuhnya yang sama dengan tabel BigQuery standar, tetapi menyimpan data di bucket penyimpanan milik pelanggan. Tabel Iceberg BigLake di BigQuery mendukung format tabel Iceberg terbuka untuk interoperabilitas yang lebih baik dengan mesin komputasi open source dan pihak ketiga pada satu salinan data.

Sebelum memulai

Membuat tabel BigLake Iceberg di BigQuery

Untuk membuat tabel BigLake Iceberg di BigQuery, gunakan BigQueryCreateTableOperator dengan cara yang sama seperti untuk tabel BigQuery lainnya. Di kolom biglakeConfiguration, berikan konfigurasi untuk tabel.

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateTableOperator

with DAG(
  "bq_iceberg_dag",
  start_date=datetime.datetime(2025, 1, 1),
  schedule=None,
  ) as dag:

  create_iceberg_table = BigQueryCreateTableOperator(
    task_id="create_iceberg_table",
    project_id="PROJECT_ID",
    dataset_id="DATASET_ID",
    table_id="TABLE_NAME",
    table_resource={
      "schema": {
        "fields": [
          {"name": "order_id", "type": "INTEGER", "mode": "REQUIRED"},
          {"name": "customer_id", "type": "INTEGER", "mode": "REQUIRED"},
          {"name": "amount", "type": "INTEGER", "mode": "REQUIRED"},
          {"name": "created_at", "type": "TIMESTAMP", "mode": "REQUIRED"},
        ]
      },
      "biglakeConfiguration": {
        "connectionId": "CONNECTION_NAME",
        "storageUri": "STORAGE_URI",
        "fileFormat": "PARQUET",
        "tableFormat": "ICEBERG",
      }
    }
  )

Ganti kode berikut:

  • PROJECT_ID: Project ID.
  • DATASET_ID: set data yang sudah ada.
  • TABLE_NAME: nama tabel yang Anda buat.
  • CONNECTION_NAME: nama koneksi Resource Cloud dalam format projects/PROJECT_ID/locations/REGION/connections/CONNECTION_ID.
  • STORAGE_URI: Cloud Storage URI yang sepenuhnya memenuhi syarat untuk tabel. Contohnya, gs://example-bucket/iceberg-table.

Menjalankan kueri pada tabel Iceberg BigLake di BigQuery

Setelah membuat tabel BigLake Iceberg, Anda dapat membuat kuerinya dengan BigQueryInsertJobOperator seperti biasa. Operator tidak memerlukan konfigurasi tambahan khusus untuk tabel BigLake Iceberg.

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

with DAG(
  "bq_iceberg_dag_query",
  start_date=datetime.datetime(2025, 1, 1),
  schedule=None,
  ) as dag:

  insert_values = BigQueryInsertJobOperator(
    task_id="iceberg_insert_values",
    configuration={
      "query": {
        "query": f"""
          INSERT INTO `TABLE_ID` (order_id, customer_id, amount, created_at)
          VALUES
            (101, 19, 1, TIMESTAMP '2025-09-15 10:15:00+00'),
            (102, 35, 2, TIMESTAMP '2025-09-14 10:15:00+00'),
            (103, 36, 3, TIMESTAMP '2025-09-12 10:15:00+00'),
            (104, 37, 4, TIMESTAMP '2025-09-11 10:15:00+00')
        """,
        "useLegacySql": False,
        }
      }
  )

Ganti kode berikut:

  • TABLE_ID dengan ID tabel, dalam format PROJECT_ID.DATASET_ID.TABLE_NAME.

Langkah berikutnya