Créer et interroger des tables BigLake Iceberg dans BigQuery

Airflow géré (3e génération) | Airflow géré (2e génération) | Airflow géré (1re génération héritée)

Cette page explique comment créer et modifier des tables BigLake Iceberg dans BigQuery à l'aide d'opérateurs Airflow dans votre environnement Airflow géré.

À propos des tables BigLake Iceberg dans BigQuery

Les tables BigLake Iceberg dans BigQuery constituent la base de la création de lakehouses au format ouvert sur Google Cloud. Les tables BigLake Iceberg dans BigQuery offrent la même expérience entièrement gérée que les tables BigQuery standards, mais stockent les données dans des buckets de stockage détenus par le client. Les tables BigLake Iceberg dans BigQuery sont compatibles avec le format de table Iceberg ouvert pour une meilleure interopérabilité avec les moteurs de calcul Open Source et tiers sur une seule copie des données.

Avant de commencer

Créer une table BigLake Iceberg dans BigQuery

Pour créer une table BigLake Iceberg dans BigQuery, utilisez BigQueryCreateTableOperator de la même manière que pour les autres tables BigQuery. Dans le champ biglakeConfiguration, fournissez la configuration de la table.

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateTableOperator

with DAG(
  "bq_iceberg_dag",
  start_date=datetime.datetime(2025, 1, 1),
  schedule=None,
  ) as dag:

  create_iceberg_table = BigQueryCreateTableOperator(
    task_id="create_iceberg_table",
    project_id="PROJECT_ID",
    dataset_id="DATASET_ID",
    table_id="TABLE_NAME",
    table_resource={
      "schema": {
        "fields": [
          {"name": "order_id", "type": "INTEGER", "mode": "REQUIRED"},
          {"name": "customer_id", "type": "INTEGER", "mode": "REQUIRED"},
          {"name": "amount", "type": "INTEGER", "mode": "REQUIRED"},
          {"name": "created_at", "type": "TIMESTAMP", "mode": "REQUIRED"},
        ]
      },
      "biglakeConfiguration": {
        "connectionId": "CONNECTION_NAME",
        "storageUri": "STORAGE_URI",
        "fileFormat": "PARQUET",
        "tableFormat": "ICEBERG",
      }
    }
  )

Remplacez les éléments suivants :

Interroger une table BigLake Iceberg dans BigQuery

Une fois la table BigLake Iceberg créée, vous pouvez l'interroger avec BigQueryInsertJobOperator comme d'habitude. L'opérateur n'a pas besoin de configuration supplémentaire spécifique aux tables BigLake Iceberg.

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

with DAG(
  "bq_iceberg_dag_query",
  start_date=datetime.datetime(2025, 1, 1),
  schedule=None,
  ) as dag:

  insert_values = BigQueryInsertJobOperator(
    task_id="iceberg_insert_values",
    configuration={
      "query": {
        "query": f"""
          INSERT INTO `TABLE_ID` (order_id, customer_id, amount, created_at)
          VALUES
            (101, 19, 1, TIMESTAMP '2025-09-15 10:15:00+00'),
            (102, 35, 2, TIMESTAMP '2025-09-14 10:15:00+00'),
            (103, 36, 3, TIMESTAMP '2025-09-12 10:15:00+00'),
            (104, 37, 4, TIMESTAMP '2025-09-11 10:15:00+00')
        """,
        "useLegacySql": False,
        }
      }
  )

Remplacez les éléments suivants :

  • TABLE_ID par l'ID de la table, au format PROJECT_ID.DATASET_ID.TABLE_NAME.

Étape suivante