Creare ed eseguire query sulle tabelle BigLake Iceberg in BigQuery

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Questa pagina spiega come creare e modificare le tabelle BigLake Iceberg in BigQuery utilizzando gli operatori Airflow nell'ambiente Cloud Composer.

Informazioni sulle tabelle BigLake Iceberg in BigQuery

Le tabelle BigLake Iceberg in BigQuery forniscono le basi per la creazione di lakehouse in formato aperto su Google Cloud. Le tabelle BigLake Iceberg in BigQuery offrono la stessa esperienza completamente gestita delle tabelle BigQuery standard, ma archiviano i dati in bucket di archiviazione di proprietà del cliente. Le tabelle BigLake Iceberg in BigQuery supportano il formato di tabella Iceberg aperto per una migliore interoperabilità con motori di calcolo open source e di terze parti su una singola copia dei dati.

Prima di iniziare

Crea una tabella BigLake Iceberg in BigQuery

Per creare una tabella BigLake Iceberg in BigQuery, utilizza BigQueryCreateTableOperator nello stesso modo in cui utilizzi le altre tabelle BigQuery. Nel campo biglakeConfiguration, fornisci la configurazione per la tabella.

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateTableOperator

with DAG(
  "bq_iceberg_dag",
  start_date=datetime.datetime(2025, 1, 1),
  schedule=None,
  ) as dag:

  create_iceberg_table = BigQueryCreateTableOperator(
    task_id="create_iceberg_table",
    project_id="PROJECT_ID",
    dataset_id="DATASET_ID",
    table_id="TABLE_NAME",
    table_resource={
      "schema": {
        "fields": [
          {"name": "order_id", "type": "INTEGER", "mode": "REQUIRED"},
          {"name": "customer_id", "type": "INTEGER", "mode": "REQUIRED"},
          {"name": "amount", "type": "INTEGER", "mode": "REQUIRED"},
          {"name": "created_at", "type": "TIMESTAMP", "mode": "REQUIRED"},
        ]
      },
      "biglakeConfiguration": {
        "connectionId": "CONNECTION_NAME",
        "storageUri": "STORAGE_URI",
        "fileFormat": "PARQUET",
        "tableFormat": "ICEBERG",
      }
    }
  )

Sostituisci quanto segue:

  • PROJECT_ID: l'ID progetto.
  • DATASET_ID: un set di dati esistente.
  • TABLE_NAME: il nome della tabella che stai creando.
  • CONNECTION_NAME: il nome della connessione alla risorsa cloud nel formato projects/PROJECT_ID/locations/REGION/connections/CONNECTION_ID.
  • STORAGE_URI: un URI Cloud Storage completo per la tabella. Ad esempio gs://example-bucket/iceberg-table.

Esegui query su una tabella BigLake Iceberg in BigQuery

Dopo aver creato una tabella BigLake Iceberg, puoi eseguirvi query con BigQueryInsertJobOperator come di consueto. L'operatore non ha bisogno di una configurazione aggiuntiva specifica per le tabelle BigLake Iceberg.

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

with DAG(
  "bq_iceberg_dag_query",
  start_date=datetime.datetime(2025, 1, 1),
  schedule=None,
  ) as dag:

  insert_values = BigQueryInsertJobOperator(
    task_id="iceberg_insert_values",
    configuration={
      "query": {
        "query": f"""
          INSERT INTO `TABLE_ID` (order_id, customer_id, amount, created_at)
          VALUES
            (101, 19, 1, TIMESTAMP '2025-09-15 10:15:00+00'),
            (102, 35, 2, TIMESTAMP '2025-09-14 10:15:00+00'),
            (103, 36, 3, TIMESTAMP '2025-09-12 10:15:00+00'),
            (104, 37, 4, TIMESTAMP '2025-09-11 10:15:00+00')
        """,
        "useLegacySql": False,
        }
      }
  )

Sostituisci quanto segue:

  • TABLE_ID con l'ID tabella, nel formato PROJECT_ID.DATASET_ID.TABLE_NAME.

Passaggi successivi