Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
이 페이지에서는 Cloud Composer 환경에서 Airflow 연산자를 사용하여 BigQuery에서 BigLake Iceberg 테이블을 만들고 수정하는 방법을 설명합니다.
BigQuery의 BigLake Iceberg 테이블 정보
BigQuery의 BigLake Iceberg 테이블은 Google Cloud에서 개방형 형식의 레이크하우스를 빌드하기 위한 기반을 제공합니다. BigQuery의 BigLake Iceberg 테이블은 표준 BigQuery 테이블과 동일한 완전 관리형 환경을 제공하지만 고객 소유 스토리지 버킷에 데이터를 저장합니다. BigQuery의 BigLake Iceberg 테이블은 단일 데이터 사본에서 오픈소스 및 서드 파티 컴퓨팅 엔진과의 상호 운용성을 개선하기 위해 개방형 Iceberg 테이블 형식을 지원합니다.
시작하기 전에
BigQuery의 BigLake Iceberg 테이블 제한사항을 숙지해야 합니다. Airflow 연산자를 통해 BigLake Iceberg 테이블을 사용할 때도 동일한 제한사항이 적용됩니다.
BigLake Iceberg 테이블이 위치할 Cloud Storage 버킷에 대한 Cloud 리소스 연결을 만듭니다.
IAM 권한이 다음과 같이 할당되어 있는지 확인합니다.
환경의 서비스 계정에는 BigQuery에서 BigLake Iceberg 테이블을 사용하는 데 필요한 IAM 역할이 있어야 합니다. BigQuery의 Apache Iceberg용 BigLake 테이블에 설명된 안내를 따르세요.
Cloud 리소스 연결과 연결된 서비스 계정에는 Cloud Storage에서 데이터를 읽고 쓰는 IAM 역할이 있어야 합니다. BigQuery의 Apache Iceberg용 BigLake 테이블에 설명된 안내를 따르세요.
BigQuery에서 BigLake Iceberg 테이블 만들기
BigQuery에서 BigLake Iceberg 테이블을 만들려면 다른 BigQuery 테이블과 마찬가지로 BigQueryCreateTableOperator를 사용합니다. biglakeConfiguration 필드에 표의 구성을 제공합니다.
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateTableOperator
with DAG(
"bq_iceberg_dag",
start_date=datetime.datetime(2025, 1, 1),
schedule=None,
) as dag:
create_iceberg_table = BigQueryCreateTableOperator(
task_id="create_iceberg_table",
project_id="PROJECT_ID",
dataset_id="DATASET_ID",
table_id="TABLE_NAME",
table_resource={
"schema": {
"fields": [
{"name": "order_id", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "customer_id", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "amount", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "created_at", "type": "TIMESTAMP", "mode": "REQUIRED"},
]
},
"biglakeConfiguration": {
"connectionId": "CONNECTION_NAME",
"storageUri": "STORAGE_URI",
"fileFormat": "PARQUET",
"tableFormat": "ICEBERG",
}
}
)
다음을 바꿉니다.
PROJECT_ID: 프로젝트 IDDATASET_ID: 기존 데이터 세트TABLE_NAME: 생성할 테이블의 이름CONNECTION_NAME:projects/PROJECT_ID/locations/REGION/connections/CONNECTION_ID형식의 클라우드 리소스 연결의 이름입니다.STORAGE_URI: 테이블의 정규화된 Cloud Storage URI. 예를 들면gs://example-bucket/iceberg-table입니다.
BigQuery에서 BigLake Iceberg 테이블 쿼리
BigLake Iceberg 테이블을 만든 후에는 평소와 같이 BigQueryInsertJobOperator로 쿼리할 수 있습니다. 운영자는 BigLake Iceberg 테이블을 위해 특별히 추가 구성을 할 필요가 없습니다.
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
with DAG(
"bq_iceberg_dag_query",
start_date=datetime.datetime(2025, 1, 1),
schedule=None,
) as dag:
insert_values = BigQueryInsertJobOperator(
task_id="iceberg_insert_values",
configuration={
"query": {
"query": f"""
INSERT INTO `TABLE_ID` (order_id, customer_id, amount, created_at)
VALUES
(101, 19, 1, TIMESTAMP '2025-09-15 10:15:00+00'),
(102, 35, 2, TIMESTAMP '2025-09-14 10:15:00+00'),
(103, 36, 3, TIMESTAMP '2025-09-12 10:15:00+00'),
(104, 37, 4, TIMESTAMP '2025-09-11 10:15:00+00')
""",
"useLegacySql": False,
}
}
)
다음을 바꿉니다.
TABLE_ID를 테이블 ID로 바꿉니다(PROJECT_ID.DATASET_ID.TABLE_NAME형식).