Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本页介绍如何在 Cloud Composer 环境中使用 Airflow 运算符在 BigQuery 中创建和修改 BigLake Iceberg 表。
关于 BigQuery 中的 BigLake Iceberg 表
BigQuery 中的 BigLake Iceberg 表为在 Google Cloud上构建开放格式湖仓一体提供了基础。BigQuery 中的 BigLake Iceberg 表提供与标准 BigQuery 表相同的全代管式体验,但将数据存储在客户拥有的存储分区中。BigQuery 中的 BigLake Iceberg 表支持开放式 Iceberg 表格式,可在单个数据副本上与开源和第三方计算引擎实现更好的互操作性。
准备工作
请务必熟悉 BigQuery 中 BigLake Iceberg 表的限制。通过 Airflow 运算符处理 BigLake Iceberg 表时,也存在相同的限制。
为 BigLake Iceberg 表将要位于的 Cloud Storage 存储桶创建 Cloud 资源连接。
确保 IAM 权限按如下方式分配:
环境的服务账号必须具有在 BigQuery 中使用 BigLake Iceberg 表的 IAM 角色。 按照适用于 Apache Iceberg 的 BigQuery BigLake 表中所述的说明操作。
与 Cloud 资源连接关联的服务账号必须具有在 Cloud Storage 中读取和写入数据的 IAM 角色。按照适用于 Apache Iceberg 的 BigQuery BigLake 表中所述的说明操作。
在 BigQuery 中创建 BigLake Iceberg 表
如需在 BigQuery 中创建 BigLake Iceberg 表,请使用 BigQueryCreateTableOperator,就像创建其他 BigQuery 表一样。在 biglakeConfiguration 字段中,提供表的配置。
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateTableOperator
with DAG(
"bq_iceberg_dag",
start_date=datetime.datetime(2025, 1, 1),
schedule=None,
) as dag:
create_iceberg_table = BigQueryCreateTableOperator(
task_id="create_iceberg_table",
project_id="PROJECT_ID",
dataset_id="DATASET_ID",
table_id="TABLE_NAME",
table_resource={
"schema": {
"fields": [
{"name": "order_id", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "customer_id", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "amount", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "created_at", "type": "TIMESTAMP", "mode": "REQUIRED"},
]
},
"biglakeConfiguration": {
"connectionId": "CONNECTION_NAME",
"storageUri": "STORAGE_URI",
"fileFormat": "PARQUET",
"tableFormat": "ICEBERG",
}
}
)
替换以下内容:
PROJECT_ID:项目 ID。DATASET_ID:现有数据集。TABLE_NAME:您要创建的表的名称。CONNECTION_NAME:云资源连接的名称,格式为projects/PROJECT_ID/locations/REGION/connections/CONNECTION_ID。STORAGE_URI:表的完全限定的 Cloud Storage URI。例如gs://example-bucket/iceberg-table。
在 BigQuery 中查询 BigLake Iceberg 表
创建 BigLake Iceberg 表后,您可以像往常一样使用 BigQueryInsertJobOperator 查询该表。操作员无需专门针对 BigLake Iceberg 表进行额外配置。
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
with DAG(
"bq_iceberg_dag_query",
start_date=datetime.datetime(2025, 1, 1),
schedule=None,
) as dag:
insert_values = BigQueryInsertJobOperator(
task_id="iceberg_insert_values",
configuration={
"query": {
"query": f"""
INSERT INTO `TABLE_ID` (order_id, customer_id, amount, created_at)
VALUES
(101, 19, 1, TIMESTAMP '2025-09-15 10:15:00+00'),
(102, 35, 2, TIMESTAMP '2025-09-14 10:15:00+00'),
(103, 36, 3, TIMESTAMP '2025-09-12 10:15:00+00'),
(104, 37, 4, TIMESTAMP '2025-09-11 10:15:00+00')
""",
"useLegacySql": False,
}
}
)
替换以下内容:
TABLE_ID,格式为PROJECT_ID.DATASET_ID.TABLE_NAME。