Créer et interroger des tables BigLake Iceberg dans BigQuery

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Cette page explique comment créer et modifier des tables BigLake Iceberg dans BigQuery à l'aide d'opérateurs Airflow dans votre environnement Cloud Composer.

À propos des tables BigLake Iceberg dans BigQuery

Les tables BigLake Iceberg dans BigQuery constituent la base de la création de lakehouses au format ouvert sur Google Cloud. Les tables BigLake Iceberg dans BigQuery offrent la même expérience entièrement gérée que les tables BigQuery standards, mais stockent les données dans des buckets de stockage détenus par le client. Les tables BigLake Iceberg dans BigQuery sont compatibles avec le format de table Iceberg ouvert pour une meilleure interopérabilité avec les moteurs de calcul Open Source et tiers sur une seule copie des données.

Avant de commencer

Créer une table BigLake Iceberg dans BigQuery

Pour créer une table BigLake Iceberg dans BigQuery, utilisez BigQueryCreateTableOperator de la même manière que pour les autres tables BigQuery. Dans le champ biglakeConfiguration, indiquez la configuration de la table.

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateTableOperator

with DAG(
  "bq_iceberg_dag",
  start_date=datetime.datetime(2025, 1, 1),
  schedule=None,
  ) as dag:

  create_iceberg_table = BigQueryCreateTableOperator(
    task_id="create_iceberg_table",
    project_id="PROJECT_ID",
    dataset_id="DATASET_ID",
    table_id="TABLE_NAME",
    table_resource={
      "schema": {
        "fields": [
          {"name": "order_id", "type": "INTEGER", "mode": "REQUIRED"},
          {"name": "customer_id", "type": "INTEGER", "mode": "REQUIRED"},
          {"name": "amount", "type": "INTEGER", "mode": "REQUIRED"},
          {"name": "created_at", "type": "TIMESTAMP", "mode": "REQUIRED"},
        ]
      },
      "biglakeConfiguration": {
        "connectionId": "CONNECTION_NAME",
        "storageUri": "STORAGE_URI",
        "fileFormat": "PARQUET",
        "tableFormat": "ICEBERG",
      }
    }
  )

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet.
  • DATASET_ID : ensemble de données existant.
  • TABLE_NAME : nom de la table que vous créez.
  • CONNECTION_NAME : nom de la connexion à la ressource cloud au format projects/PROJECT_ID/locations/REGION/connections/CONNECTION_ID.
  • STORAGE_URI : URI Cloud Storage complet pour le tableau. Exemple : gs://example-bucket/iceberg-table.

Interroger une table BigLake Iceberg dans BigQuery

Une fois la table BigLake Iceberg créée, vous pouvez l'interroger avec BigQueryInsertJobOperator comme d'habitude. L'opérateur n'a pas besoin de configuration supplémentaire spécifique aux tables BigLake Iceberg.

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

with DAG(
  "bq_iceberg_dag_query",
  start_date=datetime.datetime(2025, 1, 1),
  schedule=None,
  ) as dag:

  insert_values = BigQueryInsertJobOperator(
    task_id="iceberg_insert_values",
    configuration={
      "query": {
        "query": f"""
          INSERT INTO `TABLE_ID` (order_id, customer_id, amount, created_at)
          VALUES
            (101, 19, 1, TIMESTAMP '2025-09-15 10:15:00+00'),
            (102, 35, 2, TIMESTAMP '2025-09-14 10:15:00+00'),
            (103, 36, 3, TIMESTAMP '2025-09-12 10:15:00+00'),
            (104, 37, 4, TIMESTAMP '2025-09-11 10:15:00+00')
        """,
        "useLegacySql": False,
        }
      }
  )

Remplacez les éléments suivants :

  • TABLE_ID avec l'ID de la table, au format PROJECT_ID.DATASET_ID.TABLE_NAME.

Étapes suivantes