Managed Airflow (Gen 3) | Managed Airflow (Gen 2) | Managed Airflow (Legacy Gen 1)
Halaman ini menjelaskan cara membuat dan mengubah tabel BigLake Iceberg di BigQuery menggunakan operator Airflow di lingkungan Managed Airflow Anda.
Tentang tabel BigLake Iceberg di BigQuery
Tabel BigLake Iceberg di BigQuery menyediakan dasar untuk membangun lakehouse format terbuka Google Cloud. Tabel BigLake Iceberg di BigQuery menawarkan pengalaman yang sama dan dikelola sepenuhnya seperti tabel BigQuery standar, tetapi menyimpan data di bucket penyimpanan milik pelanggan. Tabel BigLake Iceberg di BigQuery mendukung format tabel Iceberg terbuka untuk interoperabilitas yang lebih baik dengan mesin komputasi open source dan pihak ketiga pada satu salinan data.
Sebelum memulai
Pastikan Anda memahami batasan untuk tabel BigLake Iceberg di BigQuery. Batasan yang sama berlaku saat menggunakan tabel BigLake Iceberg melalui operator Airflow.
Buat koneksi Resource Cloud untuk bucket Cloud Storage tempat tabel BigLake Iceberg Anda akan berada.
Pastikan izin IAM ditetapkan sebagai berikut:
Akun layanan lingkungan Anda harus memiliki peran IAM untuk menggunakan tabel BigLake Iceberg di BigQuery. Ikuti petunjuk yang diuraikan dalam Tabel BigLake untuk Apache Iceberg di BigQuery.
Akun layanan yang terkait dengan koneksi Resource Cloud harus memiliki peran IAM untuk membaca dan menulis data di Cloud Storage. Ikuti petunjuk yang diuraikan dalam Tabel BigLake untuk Apache Iceberg di BigQuery.
Membuat tabel BigLake Iceberg di BigQuery
Untuk membuat tabel BigLake Iceberg di BigQuery, gunakan BigQueryCreateTableOperator dengan cara yang sama seperti untuk tabel BigQuery lainnya. Di kolom biglakeConfiguration, berikan konfigurasi untuk tabel.
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateTableOperator
with DAG(
"bq_iceberg_dag",
start_date=datetime.datetime(2025, 1, 1),
schedule=None,
) as dag:
create_iceberg_table = BigQueryCreateTableOperator(
task_id="create_iceberg_table",
project_id="PROJECT_ID",
dataset_id="DATASET_ID",
table_id="TABLE_NAME",
table_resource={
"schema": {
"fields": [
{"name": "order_id", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "customer_id", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "amount", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "created_at", "type": "TIMESTAMP", "mode": "REQUIRED"},
]
},
"biglakeConfiguration": {
"connectionId": "CONNECTION_NAME",
"storageUri": "STORAGE_URI",
"fileFormat": "PARQUET",
"tableFormat": "ICEBERG",
}
}
)
Ganti kode berikut:
PROJECT_ID: Project ID.DATASET_ID: set data yang sudah ada.TABLE_NAME: nama tabel yang Anda buat.CONNECTION_NAME: nama koneksi Resource Cloud dalam formatprojects/PROJECT_ID/locations/REGION/connections/CONNECTION_ID.STORAGE_URI: Cloud Storage URI yang sepenuhnya memenuhi syarat untuk tabel. Misalnya,gs://example-bucket/iceberg-table.
Menjalankan kueri pada tabel BigLake Iceberg di BigQuery
Setelah membuat tabel BigLake Iceberg, Anda dapat menjalankan kueri tabel tersebut dengan BigQueryInsertJobOperator seperti biasa. Operator tidak memerlukan konfigurasi tambahan khusus untuk tabel BigLake Iceberg.
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
with DAG(
"bq_iceberg_dag_query",
start_date=datetime.datetime(2025, 1, 1),
schedule=None,
) as dag:
insert_values = BigQueryInsertJobOperator(
task_id="iceberg_insert_values",
configuration={
"query": {
"query": f"""
INSERT INTO `TABLE_ID` (order_id, customer_id, amount, created_at)
VALUES
(101, 19, 1, TIMESTAMP '2025-09-15 10:15:00+00'),
(102, 35, 2, TIMESTAMP '2025-09-14 10:15:00+00'),
(103, 36, 3, TIMESTAMP '2025-09-12 10:15:00+00'),
(104, 37, 4, TIMESTAMP '2025-09-11 10:15:00+00')
""",
"useLegacySql": False,
}
}
)
Ganti kode berikut:
TABLE_IDdengan ID tabel, dalam formatPROJECT_ID.DATASET_ID.TABLE_NAME.