在 BigQuery 中创建和查询 BigLake Iceberg 表

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本页介绍如何在 Cloud Composer 环境中使用 Airflow 运算符在 BigQuery 中创建和修改 BigLake Iceberg 表。

关于 BigQuery 中的 BigLake Iceberg 表

BigQuery 中的 BigLake Iceberg 表为在 Google Cloud上构建开放格式湖仓一体提供了基础。BigQuery 中的 BigLake Iceberg 表提供与标准 BigQuery 表相同的全代管式体验,但将数据存储在客户拥有的存储分区中。BigQuery 中的 BigLake Iceberg 表支持开放式 Iceberg 表格式,可在单个数据副本上与开源和第三方计算引擎实现更好的互操作性。

准备工作

在 BigQuery 中创建 BigLake Iceberg 表

如需在 BigQuery 中创建 BigLake Iceberg 表,请使用 BigQueryCreateTableOperator,就像创建其他 BigQuery 表一样。在 biglakeConfiguration 字段中,提供表的配置。

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateTableOperator

with DAG(
  "bq_iceberg_dag",
  start_date=datetime.datetime(2025, 1, 1),
  schedule=None,
  ) as dag:

  create_iceberg_table = BigQueryCreateTableOperator(
    task_id="create_iceberg_table",
    project_id="PROJECT_ID",
    dataset_id="DATASET_ID",
    table_id="TABLE_NAME",
    table_resource={
      "schema": {
        "fields": [
          {"name": "order_id", "type": "INTEGER", "mode": "REQUIRED"},
          {"name": "customer_id", "type": "INTEGER", "mode": "REQUIRED"},
          {"name": "amount", "type": "INTEGER", "mode": "REQUIRED"},
          {"name": "created_at", "type": "TIMESTAMP", "mode": "REQUIRED"},
        ]
      },
      "biglakeConfiguration": {
        "connectionId": "CONNECTION_NAME",
        "storageUri": "STORAGE_URI",
        "fileFormat": "PARQUET",
        "tableFormat": "ICEBERG",
      }
    }
  )

替换以下内容:

  • PROJECT_ID项目 ID
  • DATASET_ID:现有数据集。
  • TABLE_NAME:您要创建的表的名称。
  • CONNECTION_NAME云资源连接的名称,格式为 projects/PROJECT_ID/locations/REGION/connections/CONNECTION_ID
  • STORAGE_URI:表的完全限定的 Cloud Storage URI。例如 gs://example-bucket/iceberg-table

在 BigQuery 中查询 BigLake Iceberg 表

创建 BigLake Iceberg 表后,您可以像往常一样使用 BigQueryInsertJobOperator 查询该表。操作员无需专门针对 BigLake Iceberg 表进行额外配置。

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

with DAG(
  "bq_iceberg_dag_query",
  start_date=datetime.datetime(2025, 1, 1),
  schedule=None,
  ) as dag:

  insert_values = BigQueryInsertJobOperator(
    task_id="iceberg_insert_values",
    configuration={
      "query": {
        "query": f"""
          INSERT INTO `TABLE_ID` (order_id, customer_id, amount, created_at)
          VALUES
            (101, 19, 1, TIMESTAMP '2025-09-15 10:15:00+00'),
            (102, 35, 2, TIMESTAMP '2025-09-14 10:15:00+00'),
            (103, 36, 3, TIMESTAMP '2025-09-12 10:15:00+00'),
            (104, 37, 4, TIMESTAMP '2025-09-11 10:15:00+00')
        """,
        "useLegacySql": False,
        }
      }
  )

替换以下内容:

  • TABLE_ID,格式为 PROJECT_ID.DATASET_ID.TABLE_NAME

后续步骤