Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本頁面說明如何在 Cloud Composer 環境中使用 Airflow 運算子,在 BigQuery 中建立及修改 BigLake Iceberg 資料表。
關於 BigQuery 中的 BigLake Iceberg 資料表
BigQuery 中的 BigLake Iceberg 資料表是您在 Google Cloud上建構開放格式 lakehouse 的基礎。BigQuery 中的 BigLake Iceberg 資料表提供與標準 BigQuery 資料表相同的全代管體驗,但資料會儲存在客戶擁有的儲存空間值區中。BigQuery 中的 BigLake Iceberg 資料表支援開放式 Iceberg 資料表格式,可與單一資料副本上的開放原始碼和第三方運算引擎互通。
事前準備
請務必熟悉 BigQuery 中 BigLake Iceberg 資料表的限制。透過 Airflow 運算子使用 BigLake Iceberg 資料表時,也適用同樣的限制。
為 BigLake Iceberg 資料表所在的 Cloud Storage bucket 建立 Cloud 資源連線。
請確認已指派下列 IAM 權限:
環境的服務帳戶必須具備 IAM 角色,才能在 BigQuery 中使用 BigLake Iceberg 資料表。請按照「BigQuery 中的 Apache Iceberg 專用 BigLake 資料表」一文中的指示操作。
與 Cloud Resource 連線相關聯的服務帳戶必須具備 IAM 角色,才能在 Cloud Storage 中讀取及寫入資料。請按照「BigQuery 中的 Apache Iceberg 專用 BigLake 資料表」一文中的指示操作。
在 BigQuery 中建立 BigLake Iceberg 資料表
如要在 BigQuery 中建立 BigLake Iceberg 資料表,請使用 BigQueryCreateTableOperator,方法與其他 BigQuery 資料表相同。在 biglakeConfiguration 欄位中,提供表格的設定。
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateTableOperator
with DAG(
"bq_iceberg_dag",
start_date=datetime.datetime(2025, 1, 1),
schedule=None,
) as dag:
create_iceberg_table = BigQueryCreateTableOperator(
task_id="create_iceberg_table",
project_id="PROJECT_ID",
dataset_id="DATASET_ID",
table_id="TABLE_NAME",
table_resource={
"schema": {
"fields": [
{"name": "order_id", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "customer_id", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "amount", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "created_at", "type": "TIMESTAMP", "mode": "REQUIRED"},
]
},
"biglakeConfiguration": {
"connectionId": "CONNECTION_NAME",
"storageUri": "STORAGE_URI",
"fileFormat": "PARQUET",
"tableFormat": "ICEBERG",
}
}
)
更改下列內容:
PROJECT_ID:專案 ID。DATASET_ID:現有資料集。TABLE_NAME:您要建立的資料表名稱。CONNECTION_NAME:Cloud Resource 連線的名稱,格式為projects/PROJECT_ID/locations/REGION/connections/CONNECTION_ID。STORAGE_URI:資料表的完整 Cloud Storage URI。例如:gs://example-bucket/iceberg-table。
在 BigQuery 中查詢 BigLake Iceberg 資料表
建立 BigLake Iceberg 資料表後,您就可以照常使用 BigQueryInsertJobOperator 查詢。運算子不需要為 BigLake Iceberg 資料表進行額外設定。
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
with DAG(
"bq_iceberg_dag_query",
start_date=datetime.datetime(2025, 1, 1),
schedule=None,
) as dag:
insert_values = BigQueryInsertJobOperator(
task_id="iceberg_insert_values",
configuration={
"query": {
"query": f"""
INSERT INTO `TABLE_ID` (order_id, customer_id, amount, created_at)
VALUES
(101, 19, 1, TIMESTAMP '2025-09-15 10:15:00+00'),
(102, 35, 2, TIMESTAMP '2025-09-14 10:15:00+00'),
(103, 36, 3, TIMESTAMP '2025-09-12 10:15:00+00'),
(104, 37, 4, TIMESTAMP '2025-09-11 10:15:00+00')
""",
"useLegacySql": False,
}
}
)
更改下列內容:
TABLE_ID,並以PROJECT_ID.DATASET_ID.TABLE_NAME格式提供資料表 ID。