BigQuery で BigLake Iceberg テーブルを作成してクエリを実行する

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

このページでは、Cloud Composer 環境の Airflow オペレーターを使用して、BigQuery で BigLake Iceberg テーブルを作成および変更する方法について説明します。

BigQuery の BigLake Iceberg テーブルについて

BigQuery 内の BigLake Iceberg テーブルは、 Google Cloudでオープン形式のレイクハウスを構築するための基盤になります。BigQuery の BigLake Iceberg テーブルは、標準の BigQuery テーブルと同じフルマネージド エクスペリエンスを提供しますが、お客様所有のストレージ バケットにデータを保存します。BigQuery の BigLake Iceberg テーブルは、オープン Iceberg テーブル形式をサポートしているため、データの単一コピーでオープンソースとサードパーティのコンピューティング エンジンとの相互運用性が向上します。

始める前に

BigQuery で BigLake Iceberg テーブルを作成する

BigQuery で BigLake Iceberg テーブルを作成するには、他の BigQuery テーブルの場合と同じように BigQueryCreateTableOperator を使用します。biglakeConfiguration フィールドに、テーブルの構成を指定します。

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateTableOperator

with DAG(
  "bq_iceberg_dag",
  start_date=datetime.datetime(2025, 1, 1),
  schedule=None,
  ) as dag:

  create_iceberg_table = BigQueryCreateTableOperator(
    task_id="create_iceberg_table",
    project_id="PROJECT_ID",
    dataset_id="DATASET_ID",
    table_id="TABLE_NAME",
    table_resource={
      "schema": {
        "fields": [
          {"name": "order_id", "type": "INTEGER", "mode": "REQUIRED"},
          {"name": "customer_id", "type": "INTEGER", "mode": "REQUIRED"},
          {"name": "amount", "type": "INTEGER", "mode": "REQUIRED"},
          {"name": "created_at", "type": "TIMESTAMP", "mode": "REQUIRED"},
        ]
      },
      "biglakeConfiguration": {
        "connectionId": "CONNECTION_NAME",
        "storageUri": "STORAGE_URI",
        "fileFormat": "PARQUET",
        "tableFormat": "ICEBERG",
      }
    }
  )

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • DATASET_ID: 既存のデータセット。
  • TABLE_NAME: 作成するテーブルの名前。
  • CONNECTION_NAME: projects/PROJECT_ID/locations/REGION/connections/CONNECTION_ID 形式のクラウド リソース接続の名前。
  • STORAGE_URI: テーブルの完全修飾 Cloud Storage URI。例: gs://example-bucket/iceberg-table

BigQuery で BigLake Iceberg テーブルに対してクエリを実行する

BigLake Iceberg テーブルを作成したら、通常どおり BigQueryInsertJobOperator を使用してクエリを実行できます。オペレーターは、BigLake Iceberg テーブル専用の追加構成を必要としません。

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

with DAG(
  "bq_iceberg_dag_query",
  start_date=datetime.datetime(2025, 1, 1),
  schedule=None,
  ) as dag:

  insert_values = BigQueryInsertJobOperator(
    task_id="iceberg_insert_values",
    configuration={
      "query": {
        "query": f"""
          INSERT INTO `TABLE_ID` (order_id, customer_id, amount, created_at)
          VALUES
            (101, 19, 1, TIMESTAMP '2025-09-15 10:15:00+00'),
            (102, 35, 2, TIMESTAMP '2025-09-14 10:15:00+00'),
            (103, 36, 3, TIMESTAMP '2025-09-12 10:15:00+00'),
            (104, 37, 4, TIMESTAMP '2025-09-11 10:15:00+00')
        """,
        "useLegacySql": False,
        }
      }
  )

次のように置き換えます。

  • TABLE_ID はテーブル ID に置き換えます(PROJECT_ID.DATASET_ID.TABLE_NAME 形式)。

次のステップ