Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Esta página explica como criar e modificar tabelas Iceberg do BigLake no BigQuery através de operadores do Airflow no seu ambiente do Cloud Composer.
Acerca das tabelas Iceberg do BigLake no BigQuery
As tabelas Iceberg do BigLake no BigQuery fornecem a base para criar data lakes de formato aberto no Google Cloud. As tabelas Iceberg do BigLake no BigQuery oferecem a mesma experiência totalmente gerida que as tabelas padrão do BigQuery, mas armazenam dados em contentores de armazenamento pertencentes ao cliente. As tabelas Iceberg do BigLake no BigQuery suportam o formato de tabela Iceberg aberto para uma melhor interoperabilidade com motores de computação de código aberto e de terceiros numa única cópia de dados.
Antes de começar
Certifique-se de que conhece as limitações das tabelas Iceberg do BigLake no BigQuery. As mesmas limitações aplicam-se quando trabalha com tabelas Iceberg do BigLake através de operadores do Airflow.
Crie uma ligação de recursos na nuvem para o contentor do Cloud Storage onde a tabela Iceberg do BigLake vai estar localizada.
Certifique-se de que as autorizações de IAM estão atribuídas da seguinte forma:
A conta de serviço do seu ambiente tem de ter funções de IAM para trabalhar com tabelas Iceberg do BigLake no BigQuery. Siga as instruções descritas no artigo Tabelas do BigLake para o Apache Iceberg no BigQuery.
A conta de serviço associada à ligação de recursos da nuvem tem de ter funções do IAM para ler e escrever dados no Cloud Storage. Siga as instruções descritas no artigo Tabelas do BigLake para o Apache Iceberg no BigQuery.
Crie uma tabela Iceberg do BigLake no BigQuery
Para criar uma tabela Iceberg do BigLake no BigQuery, use o
BigQueryCreateTableOperator da mesma forma que para outras tabelas do
BigQuery. No campo biglakeConfiguration, indique a configuração da tabela.
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateTableOperator
with DAG(
"bq_iceberg_dag",
start_date=datetime.datetime(2025, 1, 1),
schedule=None,
) as dag:
create_iceberg_table = BigQueryCreateTableOperator(
task_id="create_iceberg_table",
project_id="PROJECT_ID",
dataset_id="DATASET_ID",
table_id="TABLE_NAME",
table_resource={
"schema": {
"fields": [
{"name": "order_id", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "customer_id", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "amount", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "created_at", "type": "TIMESTAMP", "mode": "REQUIRED"},
]
},
"biglakeConfiguration": {
"connectionId": "CONNECTION_NAME",
"storageUri": "STORAGE_URI",
"fileFormat": "PARQUET",
"tableFormat": "ICEBERG",
}
}
)
Substitua o seguinte:
PROJECT_ID: o ID do projeto.DATASET_ID: um conjunto de dados existente.TABLE_NAME: o nome da tabela que está a criar.CONNECTION_NAME: o nome da ligação de recursos da nuvem no formatoprojects/PROJECT_ID/locations/REGION/connections/CONNECTION_ID.STORAGE_URI: um URI do Cloud Storage totalmente qualificado para a tabela. Por exemplo,gs://example-bucket/iceberg-table.
Consulte uma tabela Iceberg do BigLake no BigQuery
Depois de criar uma tabela Iceberg do BigLake, pode consultá-la com o BigQueryInsertJobOperator como habitualmente. O operador não precisa de configuração adicional especificamente para tabelas Iceberg do BigLake.
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
with DAG(
"bq_iceberg_dag_query",
start_date=datetime.datetime(2025, 1, 1),
schedule=None,
) as dag:
insert_values = BigQueryInsertJobOperator(
task_id="iceberg_insert_values",
configuration={
"query": {
"query": f"""
INSERT INTO `TABLE_ID` (order_id, customer_id, amount, created_at)
VALUES
(101, 19, 1, TIMESTAMP '2025-09-15 10:15:00+00'),
(102, 35, 2, TIMESTAMP '2025-09-14 10:15:00+00'),
(103, 36, 3, TIMESTAMP '2025-09-12 10:15:00+00'),
(104, 37, 4, TIMESTAMP '2025-09-11 10:15:00+00')
""",
"useLegacySql": False,
}
}
)
Substitua o seguinte:
TABLE_IDcom o ID da tabela, no formatoPROJECT_ID.DATASET_ID.TABLE_NAME.