관리형 Airflow (3세대) | 관리형 Airflow (2세대) | 관리형 Airflow (기존 1세대)
이 페이지에서는 관리형 Airflow 환경에서 Airflow 연산자를 사용하여 BigQuery에서 BigLake Iceberg 테이블을 만들고 수정하는 방법을 설명합니다.
BigQuery의 BigLake Iceberg 테이블 정보
BigQuery의 BigLake Iceberg 테이블 에서 개방형 형식의 레이크하우스를 빌드하기 위한 기반을 제공합니다 Google Cloud. BigQuery의 BigLake Iceberg 테이블은 표준 BigQuery 테이블과 동일한 완전 관리형 환경을 제공하지만 고객 소유 스토리지 버킷에 데이터를 저장합니다. BigQuery의 BigLake Iceberg 테이블은 단일 데이터 사본에서 오픈소스 및 서드 파티 컴퓨팅 엔진과의 상호 운용성을 개선하기 위해 개방형 Iceberg 테이블 형식을 지원합니다.
시작하기 전에
BigQuery의 BigLake Iceberg 테이블 제한사항을 숙지해야 합니다 . Airflow 연산자를 통해 BigLake Iceberg 테이블을 사용할 때도 동일한 제한사항이 적용됩니다.
BigLake Iceberg 테이블이 위치할 Cloud Storage 버킷의 Cloud 리소스 연결을 만듭니다.
IAM 권한이 다음과 같이 할당되어 있는지 확인합니다.
환경의 서비스 계정 에는 BigQuery에서 BigLake Iceberg 테이블을 사용하기 위한 IAM 역할이 있어야 합니다. BigQuery의 Apache Iceberg용 BigLake 테이블에 설명된 안내를 따르세요.
Cloud 리소스 연결과 연결된 서비스 계정 에는 Cloud Storage에서 데이터를 읽고 쓰기 위한 IAM 역할이 있어야 합니다. BigQuery의 Apache Iceberg용 BigLake 테이블에 설명된 안내를 따르세요.
BigQuery에서 BigLake Iceberg 테이블 만들기
BigQuery에서 BigLake Iceberg 테이블을 만들려면 다른 BigQuery 테이블과 마찬가지로 BigQueryCreateTableOperator를 사용합니다. biglakeConfiguration 필드에 테이블 구성을 제공합니다.
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateTableOperator
with DAG(
"bq_iceberg_dag",
start_date=datetime.datetime(2025, 1, 1),
schedule=None,
) as dag:
create_iceberg_table = BigQueryCreateTableOperator(
task_id="create_iceberg_table",
project_id="PROJECT_ID",
dataset_id="DATASET_ID",
table_id="TABLE_NAME",
table_resource={
"schema": {
"fields": [
{"name": "order_id", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "customer_id", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "amount", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "created_at", "type": "TIMESTAMP", "mode": "REQUIRED"},
]
},
"biglakeConfiguration": {
"connectionId": "CONNECTION_NAME",
"storageUri": "STORAGE_URI",
"fileFormat": "PARQUET",
"tableFormat": "ICEBERG",
}
}
)
다음을 바꿉니다.
PROJECT_ID: 프로젝트 IDDATASET_ID: 기존 데이터 세트TABLE_NAME: 생성할 테이블의 이름CONNECTION_NAME: Cloud 리소스 연결 이름projects/PROJECT_ID/locations/REGION/connections/CONNECTION_ID형식STORAGE_URI: 테이블의 정규화된 Cloud Storage URI 예를 들어gs://example-bucket/iceberg-table입니다.
BigQuery에서 BigLake Iceberg 테이블 쿼리
BigLake Iceberg 테이블을 만든 후에는 평소와 같이 BigQueryInsertJobOperator를 사용하여 쿼리할 수 있습니다. 연산자에는 BigLake Iceberg 테이블을 위한 추가 구성이 필요하지 않습니다.
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
with DAG(
"bq_iceberg_dag_query",
start_date=datetime.datetime(2025, 1, 1),
schedule=None,
) as dag:
insert_values = BigQueryInsertJobOperator(
task_id="iceberg_insert_values",
configuration={
"query": {
"query": f"""
INSERT INTO `TABLE_ID` (order_id, customer_id, amount, created_at)
VALUES
(101, 19, 1, TIMESTAMP '2025-09-15 10:15:00+00'),
(102, 35, 2, TIMESTAMP '2025-09-14 10:15:00+00'),
(103, 36, 3, TIMESTAMP '2025-09-12 10:15:00+00'),
(104, 37, 4, TIMESTAMP '2025-09-11 10:15:00+00')
""",
"useLegacySql": False,
}
}
)
다음을 바꿉니다.
TABLE_ID를PROJECT_ID.DATASET_ID.TABLE_NAME형식의 테이블 ID로 바꿉니다.