Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Halaman ini menjelaskan cara membuat dan mengubah tabel BigLake Iceberg di BigQuery menggunakan operator Airflow di lingkungan Cloud Composer Anda.
Tentang tabel BigLake Iceberg di BigQuery
Tabel BigLake Iceberg di BigQuery memberikan fondasi untuk membangun lakehouse berformat terbuka di Google Cloud. Tabel Iceberg BigLake di BigQuery menawarkan pengalaman terkelola sepenuhnya yang sama dengan tabel BigQuery standar, tetapi menyimpan data di bucket penyimpanan milik pelanggan. Tabel Iceberg BigLake di BigQuery mendukung format tabel Iceberg terbuka untuk interoperabilitas yang lebih baik dengan mesin komputasi open source dan pihak ketiga pada satu salinan data.
Sebelum memulai
Pastikan Anda memahami batasan untuk tabel BigLake Iceberg di BigQuery. Batasan yang sama berlaku saat menggunakan tabel BigLake Iceberg melalui operator Airflow.
Buat koneksi Resource Cloud untuk bucket Cloud Storage tempat tabel Iceberg BigLake Anda akan berada.
Pastikan izin IAM ditetapkan sebagai berikut:
Akun layanan lingkungan Anda harus memiliki peran IAM untuk bekerja dengan tabel Iceberg BigLake di BigQuery. Ikuti petunjuk yang diuraikan dalam Tabel BigLake untuk Apache Iceberg di BigQuery.
Akun layanan yang terkait dengan koneksi Cloud Resource harus memiliki peran IAM untuk membaca dan menulis data di Cloud Storage. Ikuti petunjuk yang diuraikan dalam Tabel BigLake untuk Apache Iceberg di BigQuery.
Membuat tabel BigLake Iceberg di BigQuery
Untuk membuat tabel BigLake Iceberg di BigQuery, gunakan
BigQueryCreateTableOperator dengan cara yang sama seperti untuk tabel
BigQuery lainnya. Di kolom biglakeConfiguration, berikan konfigurasi untuk
tabel.
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateTableOperator
with DAG(
"bq_iceberg_dag",
start_date=datetime.datetime(2025, 1, 1),
schedule=None,
) as dag:
create_iceberg_table = BigQueryCreateTableOperator(
task_id="create_iceberg_table",
project_id="PROJECT_ID",
dataset_id="DATASET_ID",
table_id="TABLE_NAME",
table_resource={
"schema": {
"fields": [
{"name": "order_id", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "customer_id", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "amount", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "created_at", "type": "TIMESTAMP", "mode": "REQUIRED"},
]
},
"biglakeConfiguration": {
"connectionId": "CONNECTION_NAME",
"storageUri": "STORAGE_URI",
"fileFormat": "PARQUET",
"tableFormat": "ICEBERG",
}
}
)
Ganti kode berikut:
PROJECT_ID: Project ID.DATASET_ID: set data yang sudah ada.TABLE_NAME: nama tabel yang Anda buat.CONNECTION_NAME: nama koneksi Resource Cloud dalam formatprojects/PROJECT_ID/locations/REGION/connections/CONNECTION_ID.STORAGE_URI: Cloud Storage URI yang sepenuhnya memenuhi syarat untuk tabel. Contohnya,gs://example-bucket/iceberg-table.
Menjalankan kueri pada tabel Iceberg BigLake di BigQuery
Setelah membuat tabel BigLake Iceberg, Anda dapat membuat kuerinya dengan BigQueryInsertJobOperator seperti biasa. Operator tidak memerlukan konfigurasi tambahan khusus untuk tabel BigLake Iceberg.
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
with DAG(
"bq_iceberg_dag_query",
start_date=datetime.datetime(2025, 1, 1),
schedule=None,
) as dag:
insert_values = BigQueryInsertJobOperator(
task_id="iceberg_insert_values",
configuration={
"query": {
"query": f"""
INSERT INTO `TABLE_ID` (order_id, customer_id, amount, created_at)
VALUES
(101, 19, 1, TIMESTAMP '2025-09-15 10:15:00+00'),
(102, 35, 2, TIMESTAMP '2025-09-14 10:15:00+00'),
(103, 36, 3, TIMESTAMP '2025-09-12 10:15:00+00'),
(104, 37, 4, TIMESTAMP '2025-09-11 10:15:00+00')
""",
"useLegacySql": False,
}
}
)
Ganti kode berikut:
TABLE_IDdengan ID tabel, dalam formatPROJECT_ID.DATASET_ID.TABLE_NAME.