Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Cette page explique comment créer et modifier des tables BigLake Iceberg dans BigQuery à l'aide d'opérateurs Airflow dans votre environnement Cloud Composer.
À propos des tables BigLake Iceberg dans BigQuery
Les tables BigLake Iceberg dans BigQuery constituent la base de la création de lakehouses au format ouvert sur Google Cloud. Les tables BigLake Iceberg dans BigQuery offrent la même expérience entièrement gérée que les tables BigQuery standards, mais stockent les données dans des buckets de stockage détenus par le client. Les tables BigLake Iceberg dans BigQuery sont compatibles avec le format de table Iceberg ouvert pour une meilleure interopérabilité avec les moteurs de calcul Open Source et tiers sur une seule copie des données.
Avant de commencer
Assurez-vous de connaître les limites des tables BigLake Iceberg dans BigQuery. Les mêmes limites s'appliquent lorsque vous utilisez des tables BigLake Iceberg avec des opérateurs Airflow.
Créez une connexion de ressource Cloud pour le bucket Cloud Storage dans lequel se trouvera votre table BigLake Iceberg.
Assurez-vous que les autorisations IAM sont attribuées comme suit :
Le compte de service de votre environnement doit disposer de rôles IAM pour pouvoir utiliser les tables BigLake Iceberg dans BigQuery. Suivez les instructions décrites dans Tables BigLake pour Apache Iceberg dans BigQuery.
Le compte de service associé à la connexion à la ressource Cloud doit disposer de rôles IAM pour lire et écrire des données dans Cloud Storage. Suivez les instructions décrites dans Tables BigLake pour Apache Iceberg dans BigQuery.
Créer une table BigLake Iceberg dans BigQuery
Pour créer une table BigLake Iceberg dans BigQuery, utilisez BigQueryCreateTableOperator de la même manière que pour les autres tables BigQuery. Dans le champ biglakeConfiguration, indiquez la configuration de la table.
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateTableOperator
with DAG(
"bq_iceberg_dag",
start_date=datetime.datetime(2025, 1, 1),
schedule=None,
) as dag:
create_iceberg_table = BigQueryCreateTableOperator(
task_id="create_iceberg_table",
project_id="PROJECT_ID",
dataset_id="DATASET_ID",
table_id="TABLE_NAME",
table_resource={
"schema": {
"fields": [
{"name": "order_id", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "customer_id", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "amount", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "created_at", "type": "TIMESTAMP", "mode": "REQUIRED"},
]
},
"biglakeConfiguration": {
"connectionId": "CONNECTION_NAME",
"storageUri": "STORAGE_URI",
"fileFormat": "PARQUET",
"tableFormat": "ICEBERG",
}
}
)
Remplacez les éléments suivants :
PROJECT_ID: ID du projet.DATASET_ID: ensemble de données existant.TABLE_NAME: nom de la table que vous créez.CONNECTION_NAME: nom de la connexion à la ressource cloud au formatprojects/PROJECT_ID/locations/REGION/connections/CONNECTION_ID.STORAGE_URI: URI Cloud Storage complet pour le tableau. Exemple :gs://example-bucket/iceberg-table.
Interroger une table BigLake Iceberg dans BigQuery
Une fois la table BigLake Iceberg créée, vous pouvez l'interroger avec BigQueryInsertJobOperator comme d'habitude. L'opérateur n'a pas besoin de configuration supplémentaire spécifique aux tables BigLake Iceberg.
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
with DAG(
"bq_iceberg_dag_query",
start_date=datetime.datetime(2025, 1, 1),
schedule=None,
) as dag:
insert_values = BigQueryInsertJobOperator(
task_id="iceberg_insert_values",
configuration={
"query": {
"query": f"""
INSERT INTO `TABLE_ID` (order_id, customer_id, amount, created_at)
VALUES
(101, 19, 1, TIMESTAMP '2025-09-15 10:15:00+00'),
(102, 35, 2, TIMESTAMP '2025-09-14 10:15:00+00'),
(103, 36, 3, TIMESTAMP '2025-09-12 10:15:00+00'),
(104, 37, 4, TIMESTAMP '2025-09-11 10:15:00+00')
""",
"useLegacySql": False,
}
}
)
Remplacez les éléments suivants :
TABLE_IDavec l'ID de la table, au formatPROJECT_ID.DATASET_ID.TABLE_NAME.