Pub/Sub 싱크 커넥터는 Kafka 주제에서 Pub/Sub 주제로 메시지를 스트리밍합니다. 이를 통해 Kafka 기반 애플리케이션을 Pub/Sub와 통합하여 이벤트 기반 아키텍처와 실시간 데이터 처리를 촉진할 수 있습니다.
시작하기 전에
Pub/Sub 싱크 커넥터를 만들기 전에 다음 사항을 확인하세요.
Connect 클러스터의 Apache Kafka용 관리형 서비스 클러스터를 만듭니다. Connect 클러스터와 연결된 기본 Kafka 클러스터입니다. 커넥터 파이프라인의 한쪽 끝을 형성하는 소스 클러스터이기도 합니다.
Pub/Sub 싱크 커넥터를 호스팅할 Connect 클러스터를 만듭니다.
소스 클러스터 내에서 Kafka 주제를 만들고 구성합니다. 데이터가 이 Kafka 주제에서 대상 Pub/Sub 주제로 이동합니다.
필수 역할 및 권한
Pub/Sub 싱크 커넥터를 만드는 데 필요한 권한을 얻으려면 관리자에게 Connect 클러스터가 포함된 프로젝트에 대한 다음 IAM 역할을 부여해 달라고 요청하세요.
-
관리형 Kafka 커넥터 편집자 (
roles/managedkafka.connectorEditor) -
Pub/Sub:
Pub/Sub 게시자 (
roles/pubsub.publisher)
역할 부여에 대한 자세한 내용은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.
이러한 사전 정의된 역할에는 Pub/Sub 싱크 커넥터를 만드는 데 필요한 권한이 포함되어 있습니다. 필요한 정확한 권한을 보려면 필수 권한 섹션을 펼치세요.
필수 권한
Pub/Sub 싱크 커넥터를 만들려면 다음 권한이 필요합니다.
-
상위 연결 클러스터에 커넥터 생성 권한을 부여합니다.
managedkafka.connectors.create
커스텀 역할이나 다른 사전 정의된 역할을 사용하여 이 권한을 부여받을 수도 있습니다.
관리형 Kafka 커넥터 편집자 역할에 대한 자세한 내용은 Apache Kafka용 관리형 서비스 사전 정의된 역할을 참고하세요.
Apache Kafka용 관리형 서비스 클러스터가 Connect 클러스터와 동일한 프로젝트에 있는 경우 추가 권한이 필요하지 않습니다. Connect 클러스터가 다른 프로젝트에 있는 경우 다른 프로젝트에서 Connect 클러스터 만들기를 참고하세요.
Pub/Sub 주제에 게시할 권한 부여
service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com 형식을 따르는 Connect 클러스터 서비스 계정에는 Pub/Sub 주제에 메시지를 게시할 권한이 필요합니다. 이렇게 하려면 Pub/Sub 주제가 포함된 프로젝트에서 Connect 클러스터 서비스 계정에 Pub/Sub 게시자 역할 (roles/pubsub.publisher)을 부여합니다.
Pub/Sub 싱크 커넥터 작동 방식
Pub/Sub 싱크 커넥터는 하나 이상의 Kafka 주제에서 메시지를 가져와 Pub/Sub 주제에 게시합니다.
Pub/Sub 싱크 커넥터가 데이터를 복사하는 방식에 대한 자세한 내용은 다음을 참고하세요.
커넥터는 소스 클러스터 내의 하나 이상의 Kafka 주제에서 메시지를 소비합니다.
커넥터는
cps.topic구성 속성을 사용하여 지정된 타겟 Pub/Sub 주제 ID에 메시지를 씁니다. 필수 속성입니다.커넥터는 Pub/Sub 주제가 포함된 Google Cloud 프로젝트가
cps.project구성 속성을 사용하여 지정되도록 요구합니다. 필수 속성입니다.커넥터는
cps.endpoint속성을 사용하여 지정된 맞춤 Pub/Sub 엔드포인트를 선택적으로 사용할 수도 있습니다. 기본 엔드포인트는"pubsub.googleapis.com:443"입니다.성능을 최적화하기 위해 커넥터는 Pub/Sub에 메시지를 게시하기 전에 메시지를 버퍼링합니다.
maxBufferSize,maxBufferBytes,maxDelayThresholdMs,maxOutstandingRequestBytes,maxOutstandingMessages을 구성하여 버퍼링을 제어할 수 있습니다.Kafka 레코드에는 헤더, 키, 값의 세 가지 구성요소가 있습니다. 커넥터는 키 및 값 변환기를 사용하여 Kafka 메시지 데이터를 Pub/Sub에서 예상하는 형식으로 변환합니다. 구조체 또는 맵 값 스키마를 사용할 때
messageBodyName속성은 Pub/Sub 메시지 본문으로 사용할 필드 또는 키를 지정합니다.커넥터는
metadata.publish속성을true로 설정하여 Kafka 주제, 파티션, 오프셋, 타임스탬프를 메시지 속성으로 포함할 수 있습니다.커넥터는
headers.publish속성을true로 설정하여 Kafka 메시지 헤더를 Pub/Sub 메시지 속성으로 포함할 수 있습니다.커넥터는
orderingKeySource속성을 사용하여 Pub/Sub 메시지의 순서 지정 키를 포함할 수 있습니다. 값의 옵션에는"none"(기본값),"key","partition"가 포함됩니다.tasks.max속성은 커넥터의 동시 로드 수준을 제어합니다.tasks.max를 늘리면 처리량이 향상될 수 있지만 실제 병렬 처리는 Kafka 주제의 파티션 수로 제한됩니다.
Pub/Sub 싱크 커넥터의 속성
Pub/Sub 싱크 커넥터를 만들 때는 다음 속성을 지정해야 합니다.
커넥터 이름
Connect 클러스터 내 커넥터의 고유한 이름입니다. 리소스 이름 지정에 대한 가이드라인은 Apache Kafka용 관리형 서비스 리소스 이름 지정 가이드라인을 참고하세요.
커넥터 플러그인 유형
커넥터 플러그인 유형으로 Pub/Sub 싱크를 선택합니다. 이는 Kafka에서 Pub/Sub로의 데이터 흐름 방향과 사용되는 특정 커넥터 구현을 결정합니다. 사용자 인터페이스를 사용하여 커넥터를 구성하지 않는 경우 커넥터 클래스도 지정해야 합니다.
Kafka 주제
커넥터가 메시지를 소비하는 Kafka 주제입니다.
하나 이상의 주제를 지정하거나 정규 표현식을 사용하여 여러 주제를 일치시킬 수 있습니다. 예를 들어 'topic'으로 시작하는 모든 주제를 일치시키려면 topic.*를 사용합니다. 이러한 주제는 Connect 클러스터와 연결된 Apache Kafka용 관리형 서비스 클러스터 내에 있어야 합니다.
Pub/Sub 주제
커넥터가 메시지를 게시하는 기존 Pub/Sub 주제입니다. 시작하기 전에에 설명된 대로 Connect 클러스터 서비스 계정에 주제 프로젝트에 대한 roles/pubsub.publisher 역할이 있는지 확인합니다.
구성
이 섹션에서는 커넥터별 추가 구성 속성을 지정할 수 있습니다.
Kafka 주제의 데이터는 Avro, JSON, 원시 바이트 등 다양한 형식일 수 있으므로 구성의 핵심 부분은 변환기를 지정하는 것입니다. 변환기는 Kafka 주제에 사용된 형식의 데이터를 Kafka Connect의 표준화된 내부 형식으로 변환합니다. 그런 다음 Pub/Sub 싱크 커넥터가 이 내부 데이터를 가져와 Pub/Sub에 필요한 형식으로 변환한 후 작성합니다.
Kafka Connect의 변환기 역할, 지원되는 변환기 유형, 일반적인 구성 옵션에 관한 일반적인 내용은 변환기를 참고하세요.
다음은 Pub/Sub 싱크 커넥터와 관련된 몇 가지 구성입니다.
cps.project: Pub/Sub 주제가 포함된 Google Cloud 프로젝트 ID를 지정합니다.cps.topic: 데이터가 게시되는 Pub/Sub 주제를 지정합니다.cps.endpoint: 사용할 Pub/Sub 엔드포인트를 지정합니다.
이 커넥터에 사용할 수 있는 구성 속성 목록은 Pub/Sub 싱크 커넥터 구성을 참고하세요.
Pub/Sub 싱크 커넥터 만들기
커넥터를 만들기 전에 Pub/Sub 싱크 커넥터의 속성 문서를 검토하세요.
콘솔
Google Cloud 콘솔에서 클러스터 연결 페이지로 이동합니다.
커넥터를 만들려는 Connect 클러스터를 클릭합니다.
클러스터 연결 세부정보 페이지가 표시됩니다.
커넥터 만들기를 클릭합니다.
Kafka 커넥터 만들기 페이지가 표시됩니다.
커넥터 이름에 문자열을 입력합니다.
커넥터 이름을 지정하는 방법에 대한 가이드라인은 Apache Kafka용 관리형 서비스 리소스 이름 지정 가이드라인을 참고하세요.
커넥터 플러그인에서 Pub/Sub 싱크를 선택합니다.
주제에서 Kafka 주제 목록 선택 또는 주제 정규식 사용을 선택합니다. 그런 다음 이 커넥터가 메시지를 사용하는 Kafka 주제를 선택하거나 입력합니다. 이러한 주제는 연결된 Kafka 클러스터에 있습니다.
Cloud Pub/Sub 주제 선택에서 이 커넥터가 메시지를 게시할 Pub/Sub 주제를 선택합니다. 주제는 전체 리소스 이름 형식(
projects/{project}/topics/{topic})으로 표시됩니다.(선택사항) 구성 섹션에서 추가 설정을 구성합니다. 이전 섹션에서 설명한 대로
tasks.max,key.converter,value.converter과 같은 속성을 지정합니다.작업 재시작 정책을 선택합니다. 자세한 내용은 작업 다시 시작 정책을 참고하세요.
만들기를 클릭합니다.
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
gcloud managed-kafka connectors create명령어를 실행합니다.gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILE다음을 바꿉니다.
CONNECTOR_ID: 커넥터의 ID 또는 이름입니다. 커넥터 이름을 지정하는 방법에 대한 가이드라인은 Apache Kafka용 관리형 서비스 리소스 이름 지정 가이드라인을 참고하세요. 커넥터의 이름은 변경할 수 없습니다.
LOCATION: 커넥터를 만드는 위치입니다. 이 위치는 Connect 클러스터를 만든 위치와 동일해야 합니다.
CONNECT_CLUSTER_ID: 커넥터가 생성된 Connect 클러스터의 ID입니다.
CONFIG_FILE: BigQuery 싱크 커넥터의 YAML 구성 파일 경로입니다.
다음은 Pub/Sub 싱크 커넥터의 구성 파일 예시입니다.
connector.class: "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector" name: "CPS_SINK_CONNECTOR_ID" tasks.max: "1" topics: "GMK_TOPIC_ID" value.converter: "org.apache.kafka.connect.storage.StringConverter" key.converter: "org.apache.kafka.connect.storage.StringConverter" cps.topic: "CPS_TOPIC_ID" cps.project: "GCP_PROJECT_ID"다음을 바꿉니다.
CPS_SINK_CONNECTOR_ID: Pub/Sub 싱크 커넥터의 ID 또는 이름입니다. 커넥터 이름을 지정하는 방법에 대한 가이드라인은 Apache Kafka용 관리형 서비스 리소스 이름 지정 가이드라인을 참고하세요. 커넥터의 이름은 변경할 수 없습니다.
GMK_TOPIC_ID: Pub/Sub 싱크 커넥터가 데이터를 읽어오는 Managed Service for Apache Kafka 주제의 ID입니다.
CPS_TOPIC_ID: 데이터가 게시되는 Pub/Sub 주제의 ID입니다.
GCP_PROJECT_ID: Pub/Sub 주제가 있는 Google Cloud프로젝트의 ID입니다.
Terraform
Terraform 리소스를 사용하여 커넥터를 만들 수 있습니다.
Terraform 구성을 적용하거나 삭제하는 방법은 기본 Terraform 명령어를 참조하세요.
Go
이 샘플을 사용해 보기 전에 클라이언트 라이브러리 설치의 Go 설정 안내를 따르세요. 자세한 내용은 Apache Kafka용 관리형 서비스 Go API 참조 문서를 참고하세요.
Managed Service for Apache Kafka에 인증하려면 애플리케이션 기본 사용자 인증 정보(ADC)를 설정합니다. 자세한 내용은 로컬 개발 환경의 ADC 설정을 참고하세요.
자바
이 샘플을 시도하기 전에 클라이언트 라이브러리 설치의 Java 설정 안내를 따르세요. 자세한 내용은 Apache Kafka용 관리형 서비스 Java API 참조 문서를 참고하세요.
Managed Service for Apache Kafka에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 ADC 설정을 참고하세요.
Python
이 샘플을 시도하기 전에 클라이언트 라이브러리 설치의 Python 설정 안내를 따르세요. 자세한 내용은 Apache Kafka용 관리형 서비스 Python API 참조 문서를 참고하세요.
Managed Service for Apache Kafka에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 ADC 설정을 참고하세요.
커넥터를 만든 후에는 커넥터를 수정, 삭제, 일시중지, 중지 또는 다시 시작할 수 있습니다.