Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
En esta página se explica cómo crear y modificar tablas de Iceberg de BigLake en BigQuery mediante operadores de Airflow en su entorno de Cloud Composer.
Acerca de las tablas de Iceberg de BigLake en BigQuery
Las tablas de BigLake Iceberg en BigQuery proporcionan la base para crear lakehouses de formato abierto en Google Cloud. Las tablas de Iceberg de BigLake en BigQuery ofrecen la misma experiencia totalmente gestionada que las tablas estándar de BigQuery, pero almacenan los datos en los cubos de almacenamiento propiedad del cliente. Las tablas de BigLake Iceberg en BigQuery admiten el formato de tabla Iceberg abierto para mejorar la interoperabilidad con motores de computación de código abierto y de terceros en una sola copia de los datos.
Antes de empezar
Asegúrate de que conoces las limitaciones de las tablas de Iceberg de BigLake en BigQuery. Se aplican las mismas limitaciones al trabajar con tablas de Iceberg de BigLake a través de operadores de Airflow.
Crea una conexión de recurso de Cloud para el segmento de Cloud Storage en el que se ubicará tu tabla Iceberg de BigLake.
Asegúrate de que los permisos de gestión de identidades y accesos se asignan de la siguiente manera:
La cuenta de servicio de tu entorno debe tener roles de gestión de identidades y accesos para trabajar con tablas Iceberg de BigLake en BigQuery. Sigue las instrucciones que se indican en el artículo sobre las tablas de BigLake para Apache Iceberg en BigQuery.
La cuenta de servicio asociada a la conexión de recursos de Cloud debe tener roles de gestión de identidades y accesos para leer y escribir datos en Cloud Storage. Sigue las instrucciones que se indican en el artículo sobre las tablas de BigLake para Apache Iceberg en BigQuery.
Crear una tabla de Iceberg de BigLake en BigQuery
Para crear una tabla de Iceberg de BigLake en BigQuery, usa BigQueryCreateTableOperator de la misma forma que con otras tablas de BigQuery. En el campo biglakeConfiguration, proporciona la configuración de la tabla.
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateTableOperator
with DAG(
"bq_iceberg_dag",
start_date=datetime.datetime(2025, 1, 1),
schedule=None,
) as dag:
create_iceberg_table = BigQueryCreateTableOperator(
task_id="create_iceberg_table",
project_id="PROJECT_ID",
dataset_id="DATASET_ID",
table_id="TABLE_NAME",
table_resource={
"schema": {
"fields": [
{"name": "order_id", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "customer_id", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "amount", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "created_at", "type": "TIMESTAMP", "mode": "REQUIRED"},
]
},
"biglakeConfiguration": {
"connectionId": "CONNECTION_NAME",
"storageUri": "STORAGE_URI",
"fileFormat": "PARQUET",
"tableFormat": "ICEBERG",
}
}
)
Haz los cambios siguientes:
PROJECT_ID: el ID de proyecto.DATASET_ID: un conjunto de datos.TABLE_NAME: el nombre de la tabla que vas a crear.CONNECTION_NAME: el nombre de la conexión de Cloud Resource en el formatoprojects/PROJECT_ID/locations/REGION/connections/CONNECTION_ID.STORAGE_URI: un URI de Cloud Storage completo de la tabla. Por ejemplo,gs://example-bucket/iceberg-table.
Consultar una tabla de Iceberg de BigLake en BigQuery
Después de crear una tabla de Iceberg de BigLake, puedes consultarla con BigQueryInsertJobOperator como de costumbre. El operador no necesita ninguna configuración adicional específica para las tablas de iceberg de BigLake.
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
with DAG(
"bq_iceberg_dag_query",
start_date=datetime.datetime(2025, 1, 1),
schedule=None,
) as dag:
insert_values = BigQueryInsertJobOperator(
task_id="iceberg_insert_values",
configuration={
"query": {
"query": f"""
INSERT INTO `TABLE_ID` (order_id, customer_id, amount, created_at)
VALUES
(101, 19, 1, TIMESTAMP '2025-09-15 10:15:00+00'),
(102, 35, 2, TIMESTAMP '2025-09-14 10:15:00+00'),
(103, 36, 3, TIMESTAMP '2025-09-12 10:15:00+00'),
(104, 37, 4, TIMESTAMP '2025-09-11 10:15:00+00')
""",
"useLegacySql": False,
}
}
)
Haz los cambios siguientes:
TABLE_IDcon el ID de la tabla, en el formatoPROJECT_ID.DATASET_ID.TABLE_NAME.