在 BigQuery 中建立及查詢 BigLake Iceberg 資料表

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本頁面說明如何在 Cloud Composer 環境中使用 Airflow 運算子,在 BigQuery 中建立及修改 BigLake Iceberg 資料表。

關於 BigQuery 中的 BigLake Iceberg 資料表

BigQuery 中的 BigLake Iceberg 資料表是您在 Google Cloud上建構開放格式 lakehouse 的基礎。BigQuery 中的 BigLake Iceberg 資料表提供與標準 BigQuery 資料表相同的全代管體驗,但資料會儲存在客戶擁有的儲存空間值區中。BigQuery 中的 BigLake Iceberg 資料表支援開放式 Iceberg 資料表格式,可與單一資料副本上的開放原始碼和第三方運算引擎互通。

事前準備

在 BigQuery 中建立 BigLake Iceberg 資料表

如要在 BigQuery 中建立 BigLake Iceberg 資料表,請使用 BigQueryCreateTableOperator,方法與其他 BigQuery 資料表相同。在 biglakeConfiguration 欄位中,提供表格的設定。

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateTableOperator

with DAG(
  "bq_iceberg_dag",
  start_date=datetime.datetime(2025, 1, 1),
  schedule=None,
  ) as dag:

  create_iceberg_table = BigQueryCreateTableOperator(
    task_id="create_iceberg_table",
    project_id="PROJECT_ID",
    dataset_id="DATASET_ID",
    table_id="TABLE_NAME",
    table_resource={
      "schema": {
        "fields": [
          {"name": "order_id", "type": "INTEGER", "mode": "REQUIRED"},
          {"name": "customer_id", "type": "INTEGER", "mode": "REQUIRED"},
          {"name": "amount", "type": "INTEGER", "mode": "REQUIRED"},
          {"name": "created_at", "type": "TIMESTAMP", "mode": "REQUIRED"},
        ]
      },
      "biglakeConfiguration": {
        "connectionId": "CONNECTION_NAME",
        "storageUri": "STORAGE_URI",
        "fileFormat": "PARQUET",
        "tableFormat": "ICEBERG",
      }
    }
  )

更改下列內容:

  • PROJECT_ID專案 ID
  • DATASET_ID:現有資料集。
  • TABLE_NAME:您要建立的資料表名稱。
  • CONNECTION_NAMECloud Resource 連線的名稱,格式為 projects/PROJECT_ID/locations/REGION/connections/CONNECTION_ID
  • STORAGE_URI:資料表的完整 Cloud Storage URI。例如:gs://example-bucket/iceberg-table

在 BigQuery 中查詢 BigLake Iceberg 資料表

建立 BigLake Iceberg 資料表後,您就可以照常使用 BigQueryInsertJobOperator 查詢。運算子不需要為 BigLake Iceberg 資料表進行額外設定。

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

with DAG(
  "bq_iceberg_dag_query",
  start_date=datetime.datetime(2025, 1, 1),
  schedule=None,
  ) as dag:

  insert_values = BigQueryInsertJobOperator(
    task_id="iceberg_insert_values",
    configuration={
      "query": {
        "query": f"""
          INSERT INTO `TABLE_ID` (order_id, customer_id, amount, created_at)
          VALUES
            (101, 19, 1, TIMESTAMP '2025-09-15 10:15:00+00'),
            (102, 35, 2, TIMESTAMP '2025-09-14 10:15:00+00'),
            (103, 36, 3, TIMESTAMP '2025-09-12 10:15:00+00'),
            (104, 37, 4, TIMESTAMP '2025-09-11 10:15:00+00')
        """,
        "useLegacySql": False,
        }
      }
  )

更改下列內容:

  • TABLE_ID,並以 PROJECT_ID.DATASET_ID.TABLE_NAME 格式提供資料表 ID。

後續步驟