Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Questa pagina spiega come creare e modificare le tabelle BigLake Iceberg in BigQuery utilizzando gli operatori Airflow nell'ambiente Cloud Composer.
Informazioni sulle tabelle BigLake Iceberg in BigQuery
Le tabelle BigLake Iceberg in BigQuery forniscono le basi per la creazione di lakehouse in formato aperto su Google Cloud. Le tabelle BigLake Iceberg in BigQuery offrono la stessa esperienza completamente gestita delle tabelle BigQuery standard, ma archiviano i dati in bucket di archiviazione di proprietà del cliente. Le tabelle BigLake Iceberg in BigQuery supportano il formato di tabella Iceberg aperto per una migliore interoperabilità con motori di calcolo open source e di terze parti su una singola copia dei dati.
Prima di iniziare
Assicurati di conoscere le limitazioni per le tabelle BigLake Iceberg in BigQuery. Le stesse limitazioni si applicano quando si lavora con le tabelle BigLake Iceberg tramite gli operatori Airflow.
Crea una connessione di risorsa Cloud per il bucket Cloud Storage in cui si troverà la tabella BigLake Iceberg.
Assicurati che le autorizzazioni IAM siano assegnate come segue:
Il service account del tuo ambiente deve disporre dei ruoli IAM per lavorare con le tabelle BigLake Iceberg in BigQuery. Segui le istruzioni descritte in Tabelle BigLake per Apache Iceberg in BigQuery.
Il service account associato alla connessione alla risorsa cloud deve disporre dei ruoli IAM per la lettura e la scrittura dei dati in Cloud Storage. Segui le istruzioni descritte in Tabelle BigLake per Apache Iceberg in BigQuery.
Crea una tabella BigLake Iceberg in BigQuery
Per creare una tabella BigLake Iceberg in BigQuery, utilizza
BigQueryCreateTableOperator nello stesso modo in cui utilizzi le altre tabelle
BigQuery. Nel campo biglakeConfiguration, fornisci la configurazione per la tabella.
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateTableOperator
with DAG(
"bq_iceberg_dag",
start_date=datetime.datetime(2025, 1, 1),
schedule=None,
) as dag:
create_iceberg_table = BigQueryCreateTableOperator(
task_id="create_iceberg_table",
project_id="PROJECT_ID",
dataset_id="DATASET_ID",
table_id="TABLE_NAME",
table_resource={
"schema": {
"fields": [
{"name": "order_id", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "customer_id", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "amount", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "created_at", "type": "TIMESTAMP", "mode": "REQUIRED"},
]
},
"biglakeConfiguration": {
"connectionId": "CONNECTION_NAME",
"storageUri": "STORAGE_URI",
"fileFormat": "PARQUET",
"tableFormat": "ICEBERG",
}
}
)
Sostituisci quanto segue:
PROJECT_ID: l'ID progetto.DATASET_ID: un set di dati esistente.TABLE_NAME: il nome della tabella che stai creando.CONNECTION_NAME: il nome della connessione alla risorsa cloud nel formatoprojects/PROJECT_ID/locations/REGION/connections/CONNECTION_ID.STORAGE_URI: un URI Cloud Storage completo per la tabella. Ad esempiogs://example-bucket/iceberg-table.
Esegui query su una tabella BigLake Iceberg in BigQuery
Dopo aver creato una tabella BigLake Iceberg, puoi eseguirvi query con BigQueryInsertJobOperator come di consueto. L'operatore non ha bisogno di una configurazione aggiuntiva specifica per le tabelle BigLake Iceberg.
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
with DAG(
"bq_iceberg_dag_query",
start_date=datetime.datetime(2025, 1, 1),
schedule=None,
) as dag:
insert_values = BigQueryInsertJobOperator(
task_id="iceberg_insert_values",
configuration={
"query": {
"query": f"""
INSERT INTO `TABLE_ID` (order_id, customer_id, amount, created_at)
VALUES
(101, 19, 1, TIMESTAMP '2025-09-15 10:15:00+00'),
(102, 35, 2, TIMESTAMP '2025-09-14 10:15:00+00'),
(103, 36, 3, TIMESTAMP '2025-09-12 10:15:00+00'),
(104, 37, 4, TIMESTAMP '2025-09-11 10:15:00+00')
""",
"useLegacySql": False,
}
}
)
Sostituisci quanto segue:
TABLE_IDcon l'ID tabella, nel formatoPROJECT_ID.DATASET_ID.TABLE_NAME.