Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
このページでは、Cloud Composer 環境の Airflow オペレーターを使用して、BigQuery で BigLake Iceberg テーブルを作成および変更する方法について説明します。
BigQuery の BigLake Iceberg テーブルについて
BigQuery 内の BigLake Iceberg テーブルは、 Google Cloudでオープン形式のレイクハウスを構築するための基盤になります。BigQuery の BigLake Iceberg テーブルは、標準の BigQuery テーブルと同じフルマネージド エクスペリエンスを提供しますが、お客様所有のストレージ バケットにデータを保存します。BigQuery の BigLake Iceberg テーブルは、オープン Iceberg テーブル形式をサポートしているため、データの単一コピーでオープンソースとサードパーティのコンピューティング エンジンとの相互運用性が向上します。
始める前に
BigQuery の BigLake Iceberg テーブルの制限事項をよく理解しておいてください。Airflow オペレーターを使用して BigLake Iceberg テーブルを操作する場合も、同じ制限が適用されます。
BigLake Iceberg テーブルが配置される Cloud Storage バケットのクラウド リソース接続を作成します。
IAM 権限が次のように割り当てられていることを確認します。
環境のサービス アカウントには、BigQuery で BigLake Iceberg テーブルを操作するための IAM ロールが必要です。BigQuery 内の Apache Iceberg 用 BigLake テーブルに記載されている手順に沿って操作します。
Cloud Resource 接続に関連付けられているサービス アカウントには、Cloud Storage でのデータの読み取りと書き込みを行うための IAM ロールが必要です。BigQuery 内の Apache Iceberg 用 BigLake テーブルに記載されている手順に沿って操作します。
BigQuery で BigLake Iceberg テーブルを作成する
BigQuery で BigLake Iceberg テーブルを作成するには、他の BigQuery テーブルの場合と同じように BigQueryCreateTableOperator を使用します。biglakeConfiguration フィールドに、テーブルの構成を指定します。
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateTableOperator
with DAG(
"bq_iceberg_dag",
start_date=datetime.datetime(2025, 1, 1),
schedule=None,
) as dag:
create_iceberg_table = BigQueryCreateTableOperator(
task_id="create_iceberg_table",
project_id="PROJECT_ID",
dataset_id="DATASET_ID",
table_id="TABLE_NAME",
table_resource={
"schema": {
"fields": [
{"name": "order_id", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "customer_id", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "amount", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "created_at", "type": "TIMESTAMP", "mode": "REQUIRED"},
]
},
"biglakeConfiguration": {
"connectionId": "CONNECTION_NAME",
"storageUri": "STORAGE_URI",
"fileFormat": "PARQUET",
"tableFormat": "ICEBERG",
}
}
)
次のように置き換えます。
PROJECT_ID: プロジェクト IDDATASET_ID: 既存のデータセット。TABLE_NAME: 作成するテーブルの名前。CONNECTION_NAME:projects/PROJECT_ID/locations/REGION/connections/CONNECTION_ID形式のクラウド リソース接続の名前。STORAGE_URI: テーブルの完全修飾 Cloud Storage URI。例:gs://example-bucket/iceberg-table
BigQuery で BigLake Iceberg テーブルに対してクエリを実行する
BigLake Iceberg テーブルを作成したら、通常どおり BigQueryInsertJobOperator を使用してクエリを実行できます。オペレーターは、BigLake Iceberg テーブル専用の追加構成を必要としません。
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
with DAG(
"bq_iceberg_dag_query",
start_date=datetime.datetime(2025, 1, 1),
schedule=None,
) as dag:
insert_values = BigQueryInsertJobOperator(
task_id="iceberg_insert_values",
configuration={
"query": {
"query": f"""
INSERT INTO `TABLE_ID` (order_id, customer_id, amount, created_at)
VALUES
(101, 19, 1, TIMESTAMP '2025-09-15 10:15:00+00'),
(102, 35, 2, TIMESTAMP '2025-09-14 10:15:00+00'),
(103, 36, 3, TIMESTAMP '2025-09-12 10:15:00+00'),
(104, 37, 4, TIMESTAMP '2025-09-11 10:15:00+00')
""",
"useLegacySql": False,
}
}
)
次のように置き換えます。
TABLE_IDはテーブル ID に置き換えます(PROJECT_ID.DATASET_ID.TABLE_NAME形式)。