Membuat dan menjalankan kueri pada tabel BigLake Iceberg di BigQuery

Managed Airflow (Gen 3) | Managed Airflow (Gen 2) | Managed Airflow (Legacy Gen 1)

Halaman ini menjelaskan cara membuat dan mengubah tabel BigLake Iceberg di BigQuery menggunakan operator Airflow di lingkungan Managed Airflow Anda.

Tentang tabel BigLake Iceberg di BigQuery

Tabel BigLake Iceberg di BigQuery menyediakan dasar untuk membangun lakehouse format terbuka Google Cloud. Tabel BigLake Iceberg di BigQuery menawarkan pengalaman yang sama dan dikelola sepenuhnya seperti tabel BigQuery standar, tetapi menyimpan data di bucket penyimpanan milik pelanggan. Tabel BigLake Iceberg di BigQuery mendukung format tabel Iceberg terbuka untuk interoperabilitas yang lebih baik dengan mesin komputasi open source dan pihak ketiga pada satu salinan data.

Sebelum memulai

Membuat tabel BigLake Iceberg di BigQuery

Untuk membuat tabel BigLake Iceberg di BigQuery, gunakan BigQueryCreateTableOperator dengan cara yang sama seperti untuk tabel BigQuery lainnya. Di kolom biglakeConfiguration, berikan konfigurasi untuk tabel.

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateTableOperator

with DAG(
  "bq_iceberg_dag",
  start_date=datetime.datetime(2025, 1, 1),
  schedule=None,
  ) as dag:

  create_iceberg_table = BigQueryCreateTableOperator(
    task_id="create_iceberg_table",
    project_id="PROJECT_ID",
    dataset_id="DATASET_ID",
    table_id="TABLE_NAME",
    table_resource={
      "schema": {
        "fields": [
          {"name": "order_id", "type": "INTEGER", "mode": "REQUIRED"},
          {"name": "customer_id", "type": "INTEGER", "mode": "REQUIRED"},
          {"name": "amount", "type": "INTEGER", "mode": "REQUIRED"},
          {"name": "created_at", "type": "TIMESTAMP", "mode": "REQUIRED"},
        ]
      },
      "biglakeConfiguration": {
        "connectionId": "CONNECTION_NAME",
        "storageUri": "STORAGE_URI",
        "fileFormat": "PARQUET",
        "tableFormat": "ICEBERG",
      }
    }
  )

Ganti kode berikut:

  • PROJECT_ID: Project ID.
  • DATASET_ID: set data yang sudah ada.
  • TABLE_NAME: nama tabel yang Anda buat.
  • CONNECTION_NAME: nama koneksi Resource Cloud dalam format projects/PROJECT_ID/locations/REGION/connections/CONNECTION_ID.
  • STORAGE_URI: Cloud Storage URI yang sepenuhnya memenuhi syarat untuk tabel. Misalnya, gs://example-bucket/iceberg-table.

Menjalankan kueri pada tabel BigLake Iceberg di BigQuery

Setelah membuat tabel BigLake Iceberg, Anda dapat menjalankan kueri tabel tersebut dengan BigQueryInsertJobOperator seperti biasa. Operator tidak memerlukan konfigurasi tambahan khusus untuk tabel BigLake Iceberg.

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

with DAG(
  "bq_iceberg_dag_query",
  start_date=datetime.datetime(2025, 1, 1),
  schedule=None,
  ) as dag:

  insert_values = BigQueryInsertJobOperator(
    task_id="iceberg_insert_values",
    configuration={
      "query": {
        "query": f"""
          INSERT INTO `TABLE_ID` (order_id, customer_id, amount, created_at)
          VALUES
            (101, 19, 1, TIMESTAMP '2025-09-15 10:15:00+00'),
            (102, 35, 2, TIMESTAMP '2025-09-14 10:15:00+00'),
            (103, 36, 3, TIMESTAMP '2025-09-12 10:15:00+00'),
            (104, 37, 4, TIMESTAMP '2025-09-11 10:15:00+00')
        """,
        "useLegacySql": False,
        }
      }
  )

Ganti kode berikut:

  • TABLE_ID dengan ID tabel, dalam format PROJECT_ID.DATASET_ID.TABLE_NAME.

Langkah berikutnya