Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Auf dieser Seite wird beschrieben, wie Sie BigLake Iceberg-Tabellen in BigQuery mit Airflow-Operatoren in Ihrer Cloud Composer-Umgebung erstellen und ändern.
BigLake Iceberg-Tabellen in BigQuery
BigLake Iceberg-Tabellen in BigQuery bilden die Grundlage für die Erstellung von Lakehouses im offenen Format auf Google Cloud. BigLake-Iceberg-Tabellen in BigQuery bieten dieselbe vollständig verwaltete Umgebung wie Standard-BigQuery-Tabellen, speichern Daten aber in kundeneigenen Speicher-Buckets. BigLake Iceberg-Tabellen in BigQuery unterstützen das offene Iceberg-Tabellenformat für eine bessere Interoperabilität mit Open-Source- und Drittanbieter-Compute-Engines für eine einzelne Kopie von Daten.
Hinweise
Machen Sie sich mit den Einschränkungen für BigLake-Iceberg-Tabellen in BigQuery vertraut. Dieselben Einschränkungen gelten, wenn Sie über Airflow-Operatoren mit BigLake-Iceberg-Tabellen arbeiten.
Erstellen Sie eine Cloud-Ressourcenverbindung für den Cloud Storage-Bucket, in dem sich Ihre BigLake-Iceberg-Tabelle befindet.
Achten Sie darauf, dass IAM-Berechtigungen wie folgt zugewiesen werden:
Das Dienstkonto Ihrer Umgebung muss IAM-Rollen für die Arbeit mit BigLake-Iceberg-Tabellen in BigQuery haben. Folgen Sie der Anleitung unter BigLake-Tabellen für Apache Iceberg in BigQuery.
Das mit der Cloud Resource-Verbindung verknüpfte Dienstkonto muss IAM-Rollen zum Lesen und Schreiben von Daten in Cloud Storage haben. Folgen Sie der Anleitung unter BigLake-Tabellen für Apache Iceberg in BigQuery.
BigLake Iceberg-Tabelle in BigQuery erstellen
Wenn Sie eine BigLake Iceberg-Tabelle in BigQuery erstellen möchten, verwenden Sie BigQueryCreateTableOperator auf dieselbe Weise wie für andere BigQuery-Tabellen. Geben Sie im Feld biglakeConfiguration die Konfiguration für die Tabelle an.
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateTableOperator
with DAG(
"bq_iceberg_dag",
start_date=datetime.datetime(2025, 1, 1),
schedule=None,
) as dag:
create_iceberg_table = BigQueryCreateTableOperator(
task_id="create_iceberg_table",
project_id="PROJECT_ID",
dataset_id="DATASET_ID",
table_id="TABLE_NAME",
table_resource={
"schema": {
"fields": [
{"name": "order_id", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "customer_id", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "amount", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "created_at", "type": "TIMESTAMP", "mode": "REQUIRED"},
]
},
"biglakeConfiguration": {
"connectionId": "CONNECTION_NAME",
"storageUri": "STORAGE_URI",
"fileFormat": "PARQUET",
"tableFormat": "ICEBERG",
}
}
)
Ersetzen Sie Folgendes:
PROJECT_ID: die Projekt-ID.DATASET_ID: ein vorhandenes Dataset.TABLE_NAME: Der Name der Tabelle, die Sie erstellen.CONNECTION_NAME: der Name der Cloud-Ressourcenverbindung im Formatprojects/PROJECT_ID/locations/REGION/connections/CONNECTION_ID.STORAGE_URI: ein vollständig qualifizierter Cloud Storage-URI für die Tabelle. Beispiel:gs://example-bucket/iceberg-table.
BigLake Iceberg-Tabelle in BigQuery abfragen
Nachdem Sie eine BigLake Iceberg-Tabelle erstellt haben, können Sie sie wie gewohnt mit BigQueryInsertJobOperator abfragen. Für BigLake Iceberg-Tabellen ist keine zusätzliche Konfiguration durch den Operator erforderlich.
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
with DAG(
"bq_iceberg_dag_query",
start_date=datetime.datetime(2025, 1, 1),
schedule=None,
) as dag:
insert_values = BigQueryInsertJobOperator(
task_id="iceberg_insert_values",
configuration={
"query": {
"query": f"""
INSERT INTO `TABLE_ID` (order_id, customer_id, amount, created_at)
VALUES
(101, 19, 1, TIMESTAMP '2025-09-15 10:15:00+00'),
(102, 35, 2, TIMESTAMP '2025-09-14 10:15:00+00'),
(103, 36, 3, TIMESTAMP '2025-09-12 10:15:00+00'),
(104, 37, 4, TIMESTAMP '2025-09-11 10:15:00+00')
""",
"useLegacySql": False,
}
}
)
Ersetzen Sie Folgendes:
TABLE_IDdurch die Tabellen-ID im FormatPROJECT_ID.DATASET_ID.TABLE_NAME.