Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
En esta página, se explica cómo crear y modificar tablas de BigLake Iceberg en BigQuery con operadores de Airflow en tu entorno de Cloud Composer.
Acerca de las tablas de BigLake Iceberg en BigQuery
Las tablas de BigLake Iceberg en BigQuery proporcionan la base para compilar lakehouses de formato abierto en Google Cloud. Las tablas de BigLake Iceberg en BigQuery ofrecen la misma experiencia completamente administrada que las tablas estándar de BigQuery, pero almacenan datos en buckets de almacenamiento que pertenecen al cliente. Las tablas de BigLake Iceberg en BigQuery admiten el formato de tabla de Iceberg abierto para una mejor interoperabilidad con motores de procesamiento de código abierto y de terceros en una sola copia de los datos.
Antes de comenzar
Asegúrate de conocer las limitaciones de las tablas de BigLake Iceberg en BigQuery. Las mismas limitaciones se aplican cuando se trabaja con tablas de BigLake Iceberg a través de operadores de Airflow.
Crea una conexión de recursos de Cloud para el bucket de Cloud Storage en el que se ubicará tu tabla de Iceberg de BigLake.
Asegúrate de que los permisos de IAM se asignen de la siguiente manera:
La cuenta de servicio de tu entorno debe tener roles de IAM para trabajar con tablas de BigLake Iceberg en BigQuery. Sigue las instrucciones que se describen en Tablas de BigLake para Apache Iceberg en BigQuery.
La cuenta de servicio asociada con la conexión de recursos de Cloud debe tener roles de IAM para leer y escribir datos en Cloud Storage. Sigue las instrucciones que se describen en Tablas de BigLake para Apache Iceberg en BigQuery.
Crea una tabla de BigLake Iceberg en BigQuery
Para crear una tabla de BigLake Iceberg en BigQuery, usa BigQueryCreateTableOperator de la misma manera que para otras tablas de BigQuery. En el campo biglakeConfiguration, proporciona la configuración de la tabla.
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateTableOperator
with DAG(
"bq_iceberg_dag",
start_date=datetime.datetime(2025, 1, 1),
schedule=None,
) as dag:
create_iceberg_table = BigQueryCreateTableOperator(
task_id="create_iceberg_table",
project_id="PROJECT_ID",
dataset_id="DATASET_ID",
table_id="TABLE_NAME",
table_resource={
"schema": {
"fields": [
{"name": "order_id", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "customer_id", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "amount", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "created_at", "type": "TIMESTAMP", "mode": "REQUIRED"},
]
},
"biglakeConfiguration": {
"connectionId": "CONNECTION_NAME",
"storageUri": "STORAGE_URI",
"fileFormat": "PARQUET",
"tableFormat": "ICEBERG",
}
}
)
Reemplaza lo siguiente:
PROJECT_ID: Es el ID del proyecto.DATASET_ID: Es un conjunto de datos existente.TABLE_NAME: es el nombre de la tabla que crearás.CONNECTION_NAME: Es el nombre de la conexión de recursos de nube en formatoprojects/PROJECT_ID/locations/REGION/connections/CONNECTION_ID.STORAGE_URI: Es un URI de Cloud Storage completamente calificado para la tabla. Por ejemplo,gs://example-bucket/iceberg-table.
Consulta una tabla de BigLake Iceberg en BigQuery
Después de crear una tabla de BigLake Iceberg, puedes consultarla con BigQueryInsertJobOperator como de costumbre. El operador no necesita configuración adicional específica para las tablas de BigLake Iceberg.
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
with DAG(
"bq_iceberg_dag_query",
start_date=datetime.datetime(2025, 1, 1),
schedule=None,
) as dag:
insert_values = BigQueryInsertJobOperator(
task_id="iceberg_insert_values",
configuration={
"query": {
"query": f"""
INSERT INTO `TABLE_ID` (order_id, customer_id, amount, created_at)
VALUES
(101, 19, 1, TIMESTAMP '2025-09-15 10:15:00+00'),
(102, 35, 2, TIMESTAMP '2025-09-14 10:15:00+00'),
(103, 36, 3, TIMESTAMP '2025-09-12 10:15:00+00'),
(104, 37, 4, TIMESTAMP '2025-09-11 10:15:00+00')
""",
"useLegacySql": False,
}
}
)
Reemplaza lo siguiente:
TABLE_IDcon el ID de la tabla, en el formatoPROJECT_ID.DATASET_ID.TABLE_NAME