BigQuery에서 BigLake Iceberg 테이블 만들기 및 쿼리

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

이 페이지에서는 Cloud Composer 환경에서 Airflow 연산자를 사용하여 BigQuery에서 BigLake Iceberg 테이블을 만들고 수정하는 방법을 설명합니다.

BigQuery의 BigLake Iceberg 테이블 정보

BigQuery의 BigLake Iceberg 테이블은 Google Cloud에서 개방형 형식의 레이크하우스를 빌드하기 위한 기반을 제공합니다. BigQuery의 BigLake Iceberg 테이블은 표준 BigQuery 테이블과 동일한 완전 관리형 환경을 제공하지만 고객 소유 스토리지 버킷에 데이터를 저장합니다. BigQuery의 BigLake Iceberg 테이블은 단일 데이터 사본에서 오픈소스 및 서드 파티 컴퓨팅 엔진과의 상호 운용성을 개선하기 위해 개방형 Iceberg 테이블 형식을 지원합니다.

시작하기 전에

BigQuery에서 BigLake Iceberg 테이블 만들기

BigQuery에서 BigLake Iceberg 테이블을 만들려면 다른 BigQuery 테이블과 마찬가지로 BigQueryCreateTableOperator를 사용합니다. biglakeConfiguration 필드에 표의 구성을 제공합니다.

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateTableOperator

with DAG(
  "bq_iceberg_dag",
  start_date=datetime.datetime(2025, 1, 1),
  schedule=None,
  ) as dag:

  create_iceberg_table = BigQueryCreateTableOperator(
    task_id="create_iceberg_table",
    project_id="PROJECT_ID",
    dataset_id="DATASET_ID",
    table_id="TABLE_NAME",
    table_resource={
      "schema": {
        "fields": [
          {"name": "order_id", "type": "INTEGER", "mode": "REQUIRED"},
          {"name": "customer_id", "type": "INTEGER", "mode": "REQUIRED"},
          {"name": "amount", "type": "INTEGER", "mode": "REQUIRED"},
          {"name": "created_at", "type": "TIMESTAMP", "mode": "REQUIRED"},
        ]
      },
      "biglakeConfiguration": {
        "connectionId": "CONNECTION_NAME",
        "storageUri": "STORAGE_URI",
        "fileFormat": "PARQUET",
        "tableFormat": "ICEBERG",
      }
    }
  )

다음을 바꿉니다.

  • PROJECT_ID: 프로젝트 ID
  • DATASET_ID: 기존 데이터 세트
  • TABLE_NAME: 생성할 테이블의 이름
  • CONNECTION_NAME: projects/PROJECT_ID/locations/REGION/connections/CONNECTION_ID 형식의 클라우드 리소스 연결의 이름입니다.
  • STORAGE_URI: 테이블의 정규화된 Cloud Storage URI. 예를 들면 gs://example-bucket/iceberg-table입니다.

BigQuery에서 BigLake Iceberg 테이블 쿼리

BigLake Iceberg 테이블을 만든 후에는 평소와 같이 BigQueryInsertJobOperator로 쿼리할 수 있습니다. 운영자는 BigLake Iceberg 테이블을 위해 특별히 추가 구성을 할 필요가 없습니다.

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

with DAG(
  "bq_iceberg_dag_query",
  start_date=datetime.datetime(2025, 1, 1),
  schedule=None,
  ) as dag:

  insert_values = BigQueryInsertJobOperator(
    task_id="iceberg_insert_values",
    configuration={
      "query": {
        "query": f"""
          INSERT INTO `TABLE_ID` (order_id, customer_id, amount, created_at)
          VALUES
            (101, 19, 1, TIMESTAMP '2025-09-15 10:15:00+00'),
            (102, 35, 2, TIMESTAMP '2025-09-14 10:15:00+00'),
            (103, 36, 3, TIMESTAMP '2025-09-12 10:15:00+00'),
            (104, 37, 4, TIMESTAMP '2025-09-11 10:15:00+00')
        """,
        "useLegacySql": False,
        }
      }
  )

다음을 바꿉니다.

  • TABLE_ID를 테이블 ID로 바꿉니다(PROJECT_ID.DATASET_ID.TABLE_NAME 형식).

다음 단계