Cloud Storage 싱크 커넥터를 사용하면 Kafka 주제에서 Cloud Storage 버킷으로 데이터를 스트리밍할 수 있습니다. 이는 비용 효율적이고 확장 가능한 방식으로 대량의 데이터를 저장하고 처리하는 데 유용합니다.
시작하기 전에
Cloud Storage 싱크 커넥터를 만들기 전에 다음을 확인하세요.
Connect 클러스터의 Apache Kafka용 관리형 서비스 클러스터를 만듭니다. Connect 클러스터와 연결된 기본 Kafka 클러스터입니다. 커넥터 파이프라인의 한쪽 끝을 형성하는 소스 클러스터이기도 합니다.
Cloud Storage 싱크 커넥터를 호스팅할 Connect 클러스터를 만듭니다.
Kafka에서 스트리밍된 데이터를 저장할 Cloud Storage 버킷을 만듭니다.
소스 클러스터 내에서 Kafka 주제를 만들고 구성합니다. 데이터가 이 Kafka 주제에서 대상 Cloud Storage 버킷으로 이동합니다.
필수 역할 및 권한
Cloud Storage 싱크 커넥터를 만드는 데 필요한 권한을 얻으려면 관리자에게 프로젝트에 대한 관리형 Kafka 커넥터 편집자 (roles/managedkafka.connectorEditor) IAM 역할을 부여해 달라고 요청하세요.
역할 부여에 대한 자세한 내용은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.
이 사전 정의된 역할에는 Cloud Storage 싱크 커넥터를 만드는 데 필요한 권한이 포함되어 있습니다. 필요한 정확한 권한을 보려면 필수 권한 섹션을 펼치세요.
필수 권한
Cloud Storage 싱크 커넥터를 만들려면 다음 권한이 필요합니다.
-
상위 연결 클러스터에 커넥터 생성 권한을 부여합니다.
managedkafka.connectors.create
커스텀 역할이나 다른 사전 정의된 역할을 사용하여 이 권한을 부여받을 수도 있습니다.
관리형 Kafka 커넥터 편집자 역할에 대한 자세한 내용은 Apache Kafka용 관리형 서비스 사전 정의된 역할을 참고하세요.
Apache Kafka용 관리형 서비스 클러스터가 Connect 클러스터와 동일한 프로젝트에 있는 경우 추가 권한이 필요하지 않습니다. Connect 클러스터가 다른 프로젝트에 있는 경우 다른 프로젝트에서 Connect 클러스터 만들기를 참고하세요.
Cloud Storage 버킷에 쓸 수 있는 권한 부여
service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com 형식을 따르는 Connect 클러스터 서비스 계정에는 다음 Cloud Storage 권한이 필요합니다.
storage.objects.createstorage.objects.delete
이렇게 하려면 Cloud Storage 버킷이 포함된 프로젝트에서 Connect 클러스터 서비스 계정에 스토리지 객체 사용자 (roles/storage.objectUser) 역할을 부여하세요.
Cloud Storage 싱크 커넥터 작동 방식
Cloud Storage 싱크 커넥터는 하나 이상의 Kafka 주제에서 데이터를 가져와 단일 Cloud Storage 버킷 내의 객체에 데이터를 씁니다.
Cloud Storage 싱크 커넥터가 데이터를 복사하는 방식에 대한 자세한 내용은 다음과 같습니다.
커넥터는 소스 클러스터 내의 하나 이상의 Kafka 주제에서 메시지를 소비합니다.
커넥터는 커넥터 구성에 지정된 타겟 Cloud Storage 버킷에 데이터를 씁니다.
커넥터는 커넥터 구성의 특정 속성을 참조하여 Cloud Storage 버킷에 데이터를 쓸 때 데이터를 포맷합니다. 기본적으로 출력 파일은 CSV 형식입니다.
format.output.type속성을 구성하여 JSON과 같은 다양한 출력 형식을 지정할 수 있습니다.커넥터는 Cloud Storage 버킷에 작성되는 파일의 이름도 지정합니다.
file.name.prefix및file.name.template속성을 사용하여 파일 이름을 맞춤설정할 수 있습니다. 예를 들어 파일 이름에 Kafka 주제 이름이나 메시지 키를 포함할 수 있습니다.Kafka 레코드에는 헤더, 키, 값의 세 가지 구성요소가 있습니다.
format.output.fields를 설정하여 헤더를 포함하면 출력 파일에 헤더를 포함할 수 있습니다. 예를 들면format.output.fields=value,headers입니다.key을 포함하도록format.output.fields을 설정하여 출력 파일에 키를 포함할 수 있습니다. 예를 들면format.output.fields=key,value,headers입니다.키는
file.name.template속성에key를 포함하여 레코드를 그룹화하는 데도 사용할 수 있습니다.
format.output.fields이 기본적으로value로 설정되어 있으므로 기본적으로 출력 파일에 값을 포함할 수 있습니다.커넥터는 변환되고 형식이 지정된 데이터를 지정된 Cloud Storage 버킷에 씁니다.
file.compression.type속성을 사용하여 파일 압축을 구성하면 커넥터가 Cloud Storage 버킷에 저장된 파일을 압축합니다.변환기 구성은
format.output.type속성으로 제한됩니다.예를 들어
format.output.type이csv로 설정된 경우 키 변환기는org.apache.kafka.connect.converters.ByteArrayConverter또는org.apache.kafka.connect.storage.StringConverter이어야 하고 값 변환기는org.apache.kafka.connect.converters.ByteArrayConverter이어야 합니다.format.output.type이json로 설정된 경우value.converter.schemas.enable속성이 true이더라도 값 및 키 스키마가 출력 파일의 데이터와 함께 작성되지 않습니다.
tasks.max속성은 커넥터의 동시 로드 수준을 제어합니다.tasks.max를 늘리면 처리량이 향상될 수 있지만 실제 병렬 처리는 Kafka 주제의 파티션 수에 의해 제한됩니다.
Cloud Storage 싱크 커넥터의 속성
Cloud Storage 싱크 커넥터를 만들 때 다음 속성을 지정합니다.
커넥터 이름
커넥터의 이름 또는 ID입니다. 리소스 이름을 지정하는 방법에 대한 가이드라인은 Apache Kafka용 관리형 서비스 리소스 이름 지정 가이드라인을 참고하세요. 이름은 변경할 수 없습니다.
커넥터 플러그인 유형
Google Cloud 콘솔에서 커넥터 플러그인 유형으로 Cloud Storage 싱크를 선택합니다. 사용자 인터페이스를 사용하여 커넥터를 구성하지 않는 경우 커넥터 클래스도 지정해야 합니다.
주제
커넥터가 메시지를 소비하는 Kafka 주제입니다.
하나 이상의 주제를 지정하거나 정규 표현식을 사용하여 여러 주제를 일치시킬 수 있습니다. 예를 들어 'topic'으로 시작하는 모든 주제를 일치시키려면 topic.*를 사용합니다. 이러한 주제는 Connect 클러스터와 연결된 Apache Kafka용 관리형 서비스 클러스터 내에 있어야 합니다.
Cloud Storage 버킷
데이터가 저장되는 Cloud Storage 버킷을 선택하거나 만듭니다.
구성
이 섹션에서는 Cloud Storage 싱크 커넥터의 추가 커넥터별 구성 속성을 지정할 수 있습니다.
Kafka 주제의 데이터는 Avro, JSON, 원시 바이트 등 다양한 형식일 수 있으므로 구성의 핵심 부분은 변환기를 지정하는 것입니다. 변환기는 Kafka 주제에 사용된 형식의 데이터를 Kafka Connect의 표준화된 내부 형식으로 변환합니다. 그런 다음 Cloud Storage 싱크 커넥터가 이 내부 데이터를 가져와서 쓰기 전에 Cloud Storage 버킷에 필요한 형식으로 변환합니다.
Kafka Connect의 변환기 역할, 지원되는 변환기 유형, 일반적인 구성 옵션에 관한 일반적인 내용은 변환기를 참고하세요.
다음은 Cloud Storage 싱크 커넥터와 관련된 몇 가지 구성입니다.
gcs.credentials.default: 실행 환경에서 Google Cloud 사용자 인증 정보를 자동으로 검색할지 여부입니다.true로 설정해야 합니다.gcs.bucket.name: 데이터가 작성되는 Cloud Storage 버킷의 이름을 지정합니다. 설정해야 합니다.file.compression.type: Cloud Storage 버킷에 저장된 파일의 압축 유형을 설정합니다. 예를 들어gzip,snappy,zstd,none가 있습니다. 기본값은none입니다.file.name.prefix: Cloud Storage 버킷에 저장된 각 파일의 이름에 추가할 접두사입니다. 기본값은 비어 있습니다.format.output.type: Cloud Storage 출력 파일에 데이터를 쓰는 데 사용되는 데이터 형식의 유형입니다. 지원되는 값은csv,json,jsonl,parquet입니다. 기본값은csv입니다.
이 커넥터에 사용할 수 있는 구성 속성 목록은 Cloud Storage 싱크 커넥터 구성을 참고하세요.
Cloud Storage 싱크 커넥터 만들기
커넥터를 만들기 전에 Cloud Storage 싱크 커넥터의 속성 문서를 검토하세요.
콘솔
Google Cloud 콘솔에서 클러스터 연결 페이지로 이동합니다.
커넥터를 만들려는 Connect 클러스터를 클릭합니다.
클러스터 연결 세부정보 페이지가 표시됩니다.
커넥터 만들기를 클릭합니다.
Kafka 커넥터 만들기 페이지가 표시됩니다.
커넥터 이름에 문자열을 입력합니다.
커넥터 이름을 지정하는 방법에 대한 가이드라인은 Apache Kafka용 관리형 서비스 리소스 이름 지정 가이드라인을 참고하세요.
커넥터 플러그인에서 Cloud Storage 싱크를 선택합니다.
데이터를 스트리밍할 수 있는 주제를 지정합니다.
데이터를 저장할 스토리지 버킷을 선택합니다.
(선택사항) 구성 섹션에서 추가 설정을 구성합니다.
작업 재시작 정책을 선택합니다. 자세한 내용은 작업 다시 시작 정책을 참고하세요.
만들기를 클릭합니다.
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
gcloud managed-kafka connectors create명령어를 실행합니다.gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILE다음을 바꿉니다.
CONNECTOR_ID: 커넥터의 ID 또는 이름입니다. 커넥터 이름을 지정하는 방법에 대한 가이드라인은 Apache Kafka용 관리형 서비스 리소스 이름 지정 가이드라인을 참고하세요. 커넥터의 이름은 변경할 수 없습니다.
LOCATION: 커넥터를 만드는 위치입니다. 이 위치는 Connect 클러스터를 만든 위치와 동일해야 합니다.
CONNECT_CLUSTER_ID: 커넥터가 생성된 Connect 클러스터의 ID입니다.
CONFIG_FILE: BigQuery 싱크 커넥터의 YAML 구성 파일 경로입니다.
다음은 Cloud Storage 싱크 커넥터의 구성 파일의 예입니다.
connector.class: "io.aiven.kafka.connect.gcs.GcsSinkConnector" tasks.max: "1" topics: "GMK_TOPIC_ID" gcs.bucket.name: "GCS_BUCKET_NAME" gcs.credentials.default: "true" format.output.type: "json" name: "GCS_SINK_CONNECTOR_ID" value.converter: "org.apache.kafka.connect.json.JsonConverter" value.converter.schemas.enable: "false" key.converter: "org.apache.kafka.connect.storage.StringConverter"다음을 바꿉니다.
GMK_TOPIC_ID: 데이터가 Cloud Storage 싱크 커넥터로 흐르는 Apache Kafka용 관리형 서비스 주제의 ID입니다.
GCS_BUCKET_NAME: 파이프라인의 싱크 역할을 하는 Cloud Storage 버킷의 이름입니다.
GCS_SINK_CONNECTOR_ID: Cloud Storage 싱크 커넥터의 ID 또는 이름입니다. 커넥터 이름을 지정하는 방법에 대한 가이드라인은 Apache Kafka용 관리형 서비스 리소스 이름 지정 가이드라인을 참고하세요. 커넥터의 이름은 변경할 수 없습니다.
Terraform
Terraform 리소스를 사용하여 커넥터를 만들 수 있습니다.
Terraform 구성을 적용하거나 삭제하는 방법은 기본 Terraform 명령어를 참조하세요.
Go
이 샘플을 사용해 보기 전에 클라이언트 라이브러리 설치의 Go 설정 안내를 따르세요. 자세한 내용은 Apache Kafka용 관리형 서비스 Go API 참조 문서를 참고하세요.
Managed Service for Apache Kafka에 인증하려면 애플리케이션 기본 사용자 인증 정보(ADC)를 설정합니다. 자세한 내용은 로컬 개발 환경의 ADC 설정을 참고하세요.
자바
이 샘플을 시도하기 전에 클라이언트 라이브러리 설치의 Java 설정 안내를 따르세요. 자세한 내용은 Apache Kafka용 관리형 서비스 Java API 참조 문서를 참고하세요.
Managed Service for Apache Kafka에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 ADC 설정을 참고하세요.
Python
이 샘플을 시도하기 전에 클라이언트 라이브러리 설치의 Python 설정 안내를 따르세요. 자세한 내용은 Apache Kafka용 관리형 서비스 Python API 참조 문서를 참고하세요.
Managed Service for Apache Kafka에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 ADC 설정을 참고하세요.
커넥터를 만든 후에는 커넥터를 수정, 삭제, 일시중지, 중지 또는 다시 시작할 수 있습니다.