BigQuery에서 BigLake Iceberg 테이블 만들기 및 쿼리

관리형 Airflow (3세대) | 관리형 Airflow (2세대) | 관리형 Airflow (기존 1세대)

이 페이지에서는 관리형 Airflow 환경에서 Airflow 연산자를 사용하여 BigQuery에서 BigLake Iceberg 테이블을 만들고 수정하는 방법을 설명합니다.

BigQuery의 BigLake Iceberg 테이블 정보

BigQuery의 BigLake Iceberg 테이블 에서 개방형 형식의 레이크하우스를 빌드하기 위한 기반을 제공합니다 Google Cloud. BigQuery의 BigLake Iceberg 테이블은 표준 BigQuery 테이블과 동일한 완전 관리형 환경을 제공하지만 고객 소유 스토리지 버킷에 데이터를 저장합니다. BigQuery의 BigLake Iceberg 테이블은 단일 데이터 사본에서 오픈소스 및 서드 파티 컴퓨팅 엔진과의 상호 운용성을 개선하기 위해 개방형 Iceberg 테이블 형식을 지원합니다.

시작하기 전에

BigQuery에서 BigLake Iceberg 테이블 만들기

BigQuery에서 BigLake Iceberg 테이블을 만들려면 다른 BigQuery 테이블과 마찬가지로 BigQueryCreateTableOperator를 사용합니다. biglakeConfiguration 필드에 테이블 구성을 제공합니다.

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateTableOperator

with DAG(
  "bq_iceberg_dag",
  start_date=datetime.datetime(2025, 1, 1),
  schedule=None,
  ) as dag:

  create_iceberg_table = BigQueryCreateTableOperator(
    task_id="create_iceberg_table",
    project_id="PROJECT_ID",
    dataset_id="DATASET_ID",
    table_id="TABLE_NAME",
    table_resource={
      "schema": {
        "fields": [
          {"name": "order_id", "type": "INTEGER", "mode": "REQUIRED"},
          {"name": "customer_id", "type": "INTEGER", "mode": "REQUIRED"},
          {"name": "amount", "type": "INTEGER", "mode": "REQUIRED"},
          {"name": "created_at", "type": "TIMESTAMP", "mode": "REQUIRED"},
        ]
      },
      "biglakeConfiguration": {
        "connectionId": "CONNECTION_NAME",
        "storageUri": "STORAGE_URI",
        "fileFormat": "PARQUET",
        "tableFormat": "ICEBERG",
      }
    }
  )

다음을 바꿉니다.

  • PROJECT_ID: 프로젝트 ID
  • DATASET_ID: 기존 데이터 세트
  • TABLE_NAME: 생성할 테이블의 이름
  • CONNECTION_NAME: Cloud 리소스 연결 이름 projects/PROJECT_ID/locations/REGION/connections/CONNECTION_ID 형식
  • STORAGE_URI: 테이블의 정규화된 Cloud Storage URI 예를 들어 gs://example-bucket/iceberg-table입니다.

BigQuery에서 BigLake Iceberg 테이블 쿼리

BigLake Iceberg 테이블을 만든 후에는 평소와 같이 BigQueryInsertJobOperator를 사용하여 쿼리할 수 있습니다. 연산자에는 BigLake Iceberg 테이블을 위한 추가 구성이 필요하지 않습니다.

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

with DAG(
  "bq_iceberg_dag_query",
  start_date=datetime.datetime(2025, 1, 1),
  schedule=None,
  ) as dag:

  insert_values = BigQueryInsertJobOperator(
    task_id="iceberg_insert_values",
    configuration={
      "query": {
        "query": f"""
          INSERT INTO `TABLE_ID` (order_id, customer_id, amount, created_at)
          VALUES
            (101, 19, 1, TIMESTAMP '2025-09-15 10:15:00+00'),
            (102, 35, 2, TIMESTAMP '2025-09-14 10:15:00+00'),
            (103, 36, 3, TIMESTAMP '2025-09-12 10:15:00+00'),
            (104, 37, 4, TIMESTAMP '2025-09-11 10:15:00+00')
        """,
        "useLegacySql": False,
        }
      }
  )

다음을 바꿉니다.

  • TABLE_IDPROJECT_ID.DATASET_ID.TABLE_NAME 형식의 테이블 ID로 바꿉니다.

다음 단계