Airflow géré (3e génération) | Airflow géré (2e génération) | Airflow géré (1re génération héritée)
Cette page explique comment créer et modifier des tables BigLake Iceberg dans BigQuery à l'aide d'opérateurs Airflow dans votre environnement Airflow géré.
À propos des tables BigLake Iceberg dans BigQuery
Les tables BigLake Iceberg dans BigQuery constituent la base de la création de lakehouses au format ouvert sur Google Cloud. Les tables BigLake Iceberg dans BigQuery offrent la même expérience entièrement gérée que les tables BigQuery standards, mais stockent les données dans des buckets de stockage détenus par le client. Les tables BigLake Iceberg dans BigQuery sont compatibles avec le format de table Iceberg ouvert pour une meilleure interopérabilité avec les moteurs de calcul Open Source et tiers sur une seule copie des données.
Avant de commencer
Assurez-vous de connaître les limites des tables BigLake Iceberg dans BigQuery. Les mêmes limites s'appliquent lorsque vous utilisez des tables BigLake Iceberg via des opérateurs Airflow.
Créez une connexion de ressource Cloud pour le bucket Cloud Storage dans lequel se trouvera votre table BigLake Iceberg.
Assurez-vous que les autorisations IAM sont attribuées comme suit :
Le compte de service de votre environnement doit disposer de rôles IAM pour utiliser des tables BigLake Iceberg dans BigQuery. Suivez les instructions décrites dans Tables BigLake pour Apache Iceberg dans BigQuery.
Le compte de service associé à la connexion de ressource Cloud doit disposer de rôles IAM pour lire et écrire des données dans Cloud Storage. Suivez les instructions décrites dans Tables BigLake pour Apache Iceberg dans BigQuery.
Créer une table BigLake Iceberg dans BigQuery
Pour créer une table BigLake Iceberg dans BigQuery, utilisez BigQueryCreateTableOperator de la même manière que pour les autres tables BigQuery. Dans le champ biglakeConfiguration, fournissez la configuration de la table.
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateTableOperator
with DAG(
"bq_iceberg_dag",
start_date=datetime.datetime(2025, 1, 1),
schedule=None,
) as dag:
create_iceberg_table = BigQueryCreateTableOperator(
task_id="create_iceberg_table",
project_id="PROJECT_ID",
dataset_id="DATASET_ID",
table_id="TABLE_NAME",
table_resource={
"schema": {
"fields": [
{"name": "order_id", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "customer_id", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "amount", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "created_at", "type": "TIMESTAMP", "mode": "REQUIRED"},
]
},
"biglakeConfiguration": {
"connectionId": "CONNECTION_NAME",
"storageUri": "STORAGE_URI",
"fileFormat": "PARQUET",
"tableFormat": "ICEBERG",
}
}
)
Remplacez les éléments suivants :
PROJECT_ID: l' ID du projet.DATASET_ID: ensemble de données existant.TABLE_NAME: nom de la table que vous créez.CONNECTION_NAME: nom de la connexion de ressource Cloud au formatprojects/PROJECT_ID/locations/REGION/connections/CONNECTION_ID.STORAGE_URI: URI Cloud Storage complet pour la table. Exemple :gs://example-bucket/iceberg-table.
Interroger une table BigLake Iceberg dans BigQuery
Une fois la table BigLake Iceberg créée, vous pouvez l'interroger avec BigQueryInsertJobOperator comme d'habitude. L'opérateur n'a pas besoin de configuration supplémentaire spécifique aux tables BigLake Iceberg.
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
with DAG(
"bq_iceberg_dag_query",
start_date=datetime.datetime(2025, 1, 1),
schedule=None,
) as dag:
insert_values = BigQueryInsertJobOperator(
task_id="iceberg_insert_values",
configuration={
"query": {
"query": f"""
INSERT INTO `TABLE_ID` (order_id, customer_id, amount, created_at)
VALUES
(101, 19, 1, TIMESTAMP '2025-09-15 10:15:00+00'),
(102, 35, 2, TIMESTAMP '2025-09-14 10:15:00+00'),
(103, 36, 3, TIMESTAMP '2025-09-12 10:15:00+00'),
(104, 37, 4, TIMESTAMP '2025-09-11 10:15:00+00')
""",
"useLegacySql": False,
}
}
)
Remplacez les éléments suivants :
TABLE_IDpar l'ID de la table, au formatPROJECT_ID.DATASET_ID.TABLE_NAME.