BigQuery 싱크 커넥터를 사용하면 Kafka에서 BigQuery로 데이터를 스트리밍하여 BigQuery 내에서 실시간 데이터 수집 및 분석을 할 수 있습니다. BigQuery 싱크 커넥터는 하나 이상의 Kafka 주제에서 레코드를 소비하고 단일 BigQuery 데이터 세트 내의 하나 이상의 테이블에 데이터를 씁니다.
시작하기 전에
BigQuery 싱크 커넥터를 만들기 전에 다음이 있는지 확인하세요.
Connect 클러스터의 Apache Kafka용 관리형 서비스 클러스터를 만듭니다. 이 클러스터는 Connect 클러스터와 연결된 기본 Kafka 클러스터입니다. 이 클러스터는 BigQuery 싱크 커넥터 파이프라인의 한쪽 끝을 형성하는 소스 클러스터이기도 합니다.
BigQuery 싱크 커넥터를 호스팅할 Connect 클러스터를 만듭니다.
Kafka에서 스트리밍된 데이터를 저장할 BigQuery 데이터 세트를 만듭니다.
소스 클러스터 내에서 Kafka 주제를 만들고 구성합니다. 데이터가 이 Kafka 주제에서 대상 BigQuery 데이터 세트로 이동합니다.
필수 역할 및 권한
BigQuery 싱크 커넥터를 만드는 데 필요한 권한을 얻으려면 관리자에게 프로젝트에 대한 관리형 Kafka 커넥터 편집자 (roles/managedkafka.connectorEditor) IAM 역할을 부여해 달라고 요청하세요.
역할 부여에 대한 자세한 내용은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.
이 사전 정의된 역할에는 BigQuery 싱크 커넥터를 만드는 데 필요한 권한이 포함되어 있습니다. 필요한 정확한 권한을 보려면 필수 권한 섹션을 펼치세요.
필수 권한
BigQuery 싱크 커넥터를 만들려면 다음 권한이 필요합니다.
-
상위 연결 클러스터에 커넥터 생성 권한을 부여합니다.
managedkafka.connectors.create
커스텀 역할이나 다른 사전 정의된 역할을 사용하여 이 권한을 부여받을 수도 있습니다.
관리형 Kafka 커넥터 편집자 역할에 대한 자세한 내용은 Apache Kafka용 관리형 서비스 사전 정의된 역할을 참고하세요.
Apache Kafka용 관리형 서비스 클러스터가 Connect 클러스터와 동일한 프로젝트에 있는 경우 추가 권한이 필요하지 않습니다. 클러스터가 다른 프로젝트에 있는 경우 다른 프로젝트에서 Connect 클러스터 만들기를 참고하세요.
BigQuery 테이블에 쓸 수 있는 권한 부여
service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com 형식을 따르는 Connect 클러스터 서비스 계정에는 BigQuery 테이블에 쓸 수 있는 권한이 필요합니다. 이렇게 하려면 BigQuery 테이블이 포함된 프로젝트의 Connect 클러스터 서비스 계정에 BigQuery 데이터 편집자 (roles/bigquery.dataEditor) 역할을 부여합니다.
BigQuery 싱크 커넥터의 스키마
BigQuery 싱크 커넥터는 구성된 값 변환기(value.converter)를 사용하여 Kafka 레코드 값을 필드로 파싱합니다. 그런 다음 필드를 BigQuery 테이블의 이름이 같은 열에 씁니다.
커넥터를 작동하려면 스키마가 필요합니다. 스키마는 다음과 같은 방법으로 제공할 수 있습니다.
- 메시지 기반 스키마: 스키마가 각 메시지의 일부로 포함됩니다.
- 테이블 기반 스키마: 커넥터가 BigQuery 테이블 스키마에서 메시지 스키마를 추론합니다.
- 스키마 레지스트리: 커넥터가 Apache Kafka용 관리형 서비스 스키마 레지스트리(미리보기)와 같은 스키마 레지스트리에서 스키마를 읽습니다.
다음 섹션에서는 이러한 옵션을 설명합니다.
메시지 기반 스키마
이 모드에서는 각 Kafka 레코드에 JSON 스키마가 포함됩니다. 커넥터는 스키마를 사용하여 레코드 데이터를 BigQuery 테이블 행으로 씁니다.
메시지 기반 스키마를 사용하려면 커넥터에서 다음 속성을 설정하세요.
value.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter.schemas.enable=true
Kafka 레코드 값의 예:
{
"schema": {
"type": "struct",
"fields": [
{
"field": "user",
"type": "string",
"optional": false
},
{
"field": "age",
"type": "int64",
"optional": false
}
]
},
"payload": {
"user": "userId",
"age": 30
}
}
대상 테이블이 이미 있는 경우 BigQuery 테이블 스키마가 삽입된 메시지 스키마와 호환되어야 합니다. autoCreateTables=true인 경우 커넥터는 필요한 경우 대상 테이블을 자동으로 만듭니다. 자세한 내용은 테이블 만들기를 참고하세요.
메시지 스키마가 변경될 때 커넥터가 BigQuery 테이블 스키마를 업데이트하도록 하려면 allowNewBigQueryFields, allowSchemaUnionization 또는 allowBigQueryRequiredFieldRelaxation를 true로 설정합니다.
표 기반 스키마
이 모드에서 Kafka 레코드에는 명시적 스키마가 없는 일반 JSON 데이터가 포함됩니다. 커넥터는 대상 테이블에서 스키마를 추론합니다.
요구사항:
- BigQuery 테이블이 이미 있어야 합니다.
- Kafka 레코드 데이터는 테이블 스키마와 호환되어야 합니다.
- 이 모드는 수신 메시지에 기반한 동적 스키마 업데이트를 지원하지 않습니다.
표 기반 스키마를 사용하려면 커넥터에서 다음 속성을 설정하세요.
value.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter.schemas.enable=falsebigQueryPartitionDecorator=false
BigQuery 테이블이 일별 파티션으로 시간 기반 파티션 나누기를 사용하는 경우 bigQueryPartitionDecorator는 true일 수 있습니다. 그렇지 않으면 이 속성을 false로 설정합니다.
Kafka 레코드 값의 예:
{
"user": "userId",
"age": 30
}
스키마 레지스트리
이 모드에서는 각 Kafka 레코드에 Apache Avro 데이터가 포함되고 메시지 스키마가 스키마 레지스트리에 저장됩니다.
스키마 레지스트리와 함께 BigQuery 싱크 커넥터를 사용하려면 커넥터에서 다음 속성을 설정하세요.
value.converter=io.confluent.connect.avro.AvroConvertervalue.converter.schema.registry.url=SCHEMA_REGISTRY_URL
SCHEMA_REGISTRY_URL을 스키마 레지스트리의 URL로 바꿉니다.
Apache Kafka용 관리형 서비스 스키마 레지스트리와 함께 커넥터를 사용하려면 다음 속성을 설정하세요.
value.converter.bearer.auth.credentials.source=GCP
자세한 내용은 스키마 레지스트리와 함께 Kafka Connect 사용을 참고하세요.
BigQuery의 Apache Iceberg용 BigLake 테이블
BigQuery 싱크 커넥터는 BigQuery의 Apache Iceberg용 BigLake 테이블(이하 BigQuery의 BigLake Iceberg 테이블)을 싱크 타겟으로 지원합니다.
BigQuery의 BigLake Iceberg 테이블은 Google Cloud에서 개방형 형식의 레이크하우스를 빌드하기 위한 기반을 제공합니다. BigQuery의 BigLake Iceberg 테이블은 BigQuery 테이블과 동일한 완전 관리형 환경을 제공하지만, Parquet을 사용하여 고객 소유 스토리지 버킷에 데이터를 저장하여 Apache Iceberg 개방형 테이블 형식과 상호 운용할 수 있습니다.
Apache Iceberg 테이블을 만드는 방법은 Apache Iceberg 테이블 만들기를 참고하세요.
BigQuery 싱크 커넥터 만들기
콘솔
Google Cloud 콘솔에서 클러스터 연결 페이지로 이동합니다.
커넥터를 만들려는 Connect 클러스터를 클릭합니다.
커넥터 만들기를 클릭합니다.
커넥터 이름에 문자열을 입력합니다.
커넥터 이름을 지정하는 방법에 대한 가이드라인은 Apache Kafka용 관리형 서비스 리소스 이름 지정 가이드라인을 참고하세요.
커넥터 플러그인에서 BigQuery 싱크를 선택합니다.
주제 섹션에서 읽어올 Kafka 주제를 지정합니다. 주제 이름과 일치시킬 주제 목록 또는 정규 표현식을 지정할 수 있습니다.
옵션 1: Kafka 주제 목록 선택을 선택합니다. Kafka 주제 목록에서 주제를 하나 이상 선택합니다. 확인을 클릭합니다.
옵션 2: 주제 정규식 사용을 선택합니다. 주제 정규식 필드에 정규 표현식을 입력합니다.
데이터 세트를 클릭하고 BigQuery 데이터 세트를 지정합니다. 기존 데이터 세트를 선택하거나 새 데이터 세트를 만들 수 있습니다.
선택사항: 구성 상자에서 구성 속성을 추가하거나 기본 속성을 수정합니다. 자세한 내용은 커넥터 구성을 참고하세요.
작업 재시작 정책을 선택합니다. 자세한 내용은 작업 다시 시작 정책을 참고하세요.
만들기를 클릭합니다.
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
gcloud managed-kafka connectors create명령어를 실행합니다.gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILE다음을 바꿉니다.
CONNECTOR_ID: 커넥터의 ID 또는 이름입니다. 커넥터 이름을 지정하는 방법에 대한 가이드라인은 Apache Kafka용 관리형 서비스 리소스 이름 지정 가이드라인을 참고하세요. 커넥터의 이름은 변경할 수 없습니다.
LOCATION: 커넥터를 만드는 위치입니다. 이 위치는 Connect 클러스터를 만든 위치와 동일해야 합니다.
CONNECT_CLUSTER_ID: 커넥터가 생성된 Connect 클러스터의 ID입니다.
CONFIG_FILE: BigQuery 싱크 커넥터의 YAML 구성 파일 경로입니다.
다음은 BigQuery 싱크 커넥터의 구성 파일 예시입니다.
name: "BQ_SINK_CONNECTOR_ID" project: "GCP_PROJECT_ID" topics: "GMK_TOPIC_ID" tasks.max: 3 connector.class: "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector" key.converter: "org.apache.kafka.connect.storage.StringConverter" value.converter: "org.apache.kafka.connect.json.JsonConverter" value.converter.schemas.enable: "false" defaultDataset: "BQ_DATASET_ID"다음을 바꿉니다.
BQ_SINK_CONNECTOR_ID: BigQuery 싱크 커넥터의 ID 또는 이름입니다. 커넥터 이름을 지정하는 방법에 대한 가이드라인은 Apache Kafka용 관리형 서비스 리소스 이름 지정 가이드라인을 참고하세요. 커넥터의 이름은 변경할 수 없습니다.
GCP_PROJECT_ID: BigQuery 데이터 세트가 있는 Google Cloud프로젝트의 ID입니다.
GMK_TOPIC_ID: 데이터가 BigQuery 싱크 커넥터로 흐르는 Managed Service for Apache Kafka 주제의 ID입니다.
BQ_DATASET_ID: 파이프라인의 싱크 역할을 하는 BigQuery 데이터 세트의 ID입니다.
Terraform
Terraform 리소스를 사용하여 커넥터를 만들 수 있습니다.
Terraform 구성을 적용하거나 삭제하는 방법은 기본 Terraform 명령어를 참조하세요.
Go
이 샘플을 사용해 보기 전에 클라이언트 라이브러리 설치의 Go 설정 안내를 따르세요. 자세한 내용은 Apache Kafka용 관리형 서비스 Go API 참조 문서를 참고하세요.
Managed Service for Apache Kafka에 인증하려면 애플리케이션 기본 사용자 인증 정보(ADC)를 설정합니다. 자세한 내용은 로컬 개발 환경의 ADC 설정을 참고하세요.
자바
이 샘플을 시도하기 전에 클라이언트 라이브러리 설치의 Java 설정 안내를 따르세요. 자세한 내용은 Apache Kafka용 관리형 서비스 Java API 참조 문서를 참고하세요.
Managed Service for Apache Kafka에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 ADC 설정을 참고하세요.
Python
이 샘플을 시도하기 전에 클라이언트 라이브러리 설치의 Python 설정 안내를 따르세요. 자세한 내용은 Apache Kafka용 관리형 서비스 Python API 참조 문서를 참고하세요.
Managed Service for Apache Kafka에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 ADC 설정을 참고하세요.
커넥터를 만든 후에는 커넥터를 수정, 삭제, 일시중지, 중지 또는 다시 시작할 수 있습니다.
커넥터 구성
이 섹션에서는 커넥터에서 설정할 수 있는 몇 가지 구성 속성을 설명합니다. 이 커넥터에만 해당하는 속성의 전체 목록은 BigQuery 싱크 커넥터 구성을 참고하세요.
테이블 이름
기본적으로 커넥터는 주제 이름을 BigQuery 테이블 이름으로 사용합니다. 다른 테이블 이름을 사용하려면 다음 형식으로 topic2TableMap 속성을 설정하세요.
topic2TableMap=TOPIC_1:TABLE_1,TOPIC_2:TABLE_2,...
표 생성
BigQuery 싱크 커넥터는 대상 테이블이 없는 경우 대상 테이블을 만들 수 있습니다.
autoCreateTables=true인 경우 커넥터는 존재하지 않는 BigQuery 테이블을 만들려고 시도합니다. 이 설정은 기본 동작입니다.autoCreateTables=false인 경우 커넥터는 테이블을 만들지 않습니다. 대상 테이블이 없으면 오류가 발생합니다.
autoCreateTables이 true인 경우 커넥터가 새 테이블을 만들고 구성하는 방식을 더 세부적으로 제어하기 위해 다음 구성 속성을 사용할 수 있습니다.
allBQFieldsNullableclusteringPartitionFieldNamesconvertDoubleSpecialValuespartitionExpirationMssanitizeFieldNamessanitizeTopicstimestampPartitionFieldName
이러한 속성에 대한 자세한 내용은 BigQuery 싱크 커넥터 구성을 참고하세요.
Kafka 메타데이터
각각 kafkaDataFieldName 및 kafkaKeyFieldName 필드를 구성하여 메타데이터 정보 및 키 정보와 같은 Kafka의 추가 데이터를 BigQuery 테이블에 매핑할 수 있습니다. 메타데이터 정보의 예로는 Kafka 주제, 파티션, 오프셋, 삽입 시간이 있습니다.