Pub/Sub 소스 커넥터는 Pub/Sub에서 Kafka로 메시지를 스트리밍합니다. 이를 통해 Pub/Sub를 Kafka 기반 애플리케이션 및 데이터 파이프라인과 통합할 수 있습니다.
커넥터는 Pub/Sub 구독에서 메시지를 읽고 각 메시지를 Kafka 레코드로 변환한 후 레코드를 Kafka 주제에 씁니다. 기본적으로 커넥터는 다음과 같이 Kafka 레코드를 만듭니다.
- Kafka 레코드 키는
null입니다. - Kafka 레코드 값은 Pub/Sub 메시지 데이터를 바이트로 나타낸 것입니다.
- Kafka 레코드 헤더가 비어 있습니다.
하지만 이 동작은 구성할 수 있습니다. 자세한 내용은 커넥터 구성을 참고하세요.
시작하기 전에
Pub/Sub 소스 커넥터를 만들기 전에 다음 사항을 확인하세요.
구독이 있는 Pub/Sub 주제
Kafka 클러스터 내의 Kafka 주제입니다.
Connect cluster Connect 클러스터를 만들 때 Apache Kafka용 관리형 서비스 클러스터를 기본 Kafka 클러스터로 설정합니다.
필수 역할 및 권한
Pub/Sub 소스 커넥터를 만드는 데 필요한 권한을 얻으려면 관리자에게 Connect 클러스터가 포함된 프로젝트에 대한 관리형 Kafka 커넥터 편집자 (roles/managedkafka.connectorEditor) IAM 역할을 부여해 달라고 요청하세요.
역할 부여에 대한 자세한 내용은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.
이 사전 정의된 역할에는 Pub/Sub 소스 커넥터를 만드는 데 필요한 권한이 포함되어 있습니다. 필요한 정확한 권한을 보려면 필수 권한 섹션을 펼치세요.
필수 권한
Pub/Sub 소스 커넥터를 만들려면 다음 권한이 필요합니다.
-
상위 Connect 클러스터에 커넥터 생성 권한을 부여합니다.
managedkafka.connectors.create
커스텀 역할이나 다른 사전 정의된 역할을 사용하여 이 권한을 부여받을 수도 있습니다.
관리형 Kafka 커넥터 편집자 역할에 대한 자세한 내용은 Apache Kafka용 관리형 서비스 사전 정의된 역할을 참고하세요.
Apache Kafka용 관리형 서비스 클러스터가 Connect 클러스터와 동일한 프로젝트에 있는 경우 추가 권한이 필요하지 않습니다. Connect 클러스터가 다른 프로젝트에 있는 경우 다른 프로젝트에서 Connect 클러스터 만들기를 참고하세요.
Pub/Sub에서 읽기 권한 부여
관리형 Kafka 서비스 계정에는 Pub/Sub 구독에서 메시지를 읽을 수 있는 권한이 있어야 합니다. Pub/Sub 구독이 포함된 프로젝트의 서비스 계정에 다음 IAM 역할을 부여합니다.
- Pub/Sub 구독자 (
roles/pubsub.subscriber) - Pub/Sub 뷰어(
roles/pubsub.viewer)
관리 Kafka 서비스 계정의 형식은 service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com입니다.
PROJECT_NUMBER를 프로젝트 번호로 바꿉니다.
Pub/Sub 소스 커넥터 만들기
콘솔
Google Cloud 콘솔에서 클러스터 연결 페이지로 이동합니다.
커넥터를 만들려는 Connect 클러스터를 클릭합니다.
커넥터 만들기를 클릭합니다.
커넥터 이름에 문자열을 입력합니다.
커넥터 이름을 지정하는 방법에 대한 가이드라인은 Apache Kafka용 관리형 서비스 리소스 이름 지정 가이드라인을 참고하세요.
커넥터 플러그인에서 Pub/Sub 소스를 선택합니다.
Cloud Pub/Sub 구독 목록에서 Pub/Sub 구독을 선택합니다. 커넥터는 이 구독에서 메시지를 가져옵니다. 구독이 전체 리소스 이름(
projects/{project}/subscriptions/{subscription})으로 표시됩니다.Kafka 주제 목록에서 메시지가 작성되는 Kafka 주제를 선택합니다.
선택사항: 구성 상자에서 구성 속성을 추가하거나 기본 속성을 수정합니다. 자세한 내용은 커넥터 구성을 참고하세요.
작업 재시작 정책을 선택합니다. 자세한 내용은 작업 다시 시작 정책을 참고하세요.
만들기를 클릭합니다.
gcloud
gcloud managed-kafka connectors create명령어를 실행합니다.gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILE다음을 바꿉니다.
CONNECTOR_ID: 커넥터의 ID 또는 이름입니다. 커넥터 이름을 지정하는 방법에 대한 가이드라인은 Apache Kafka용 관리형 서비스 리소스 이름 지정 가이드라인을 참고하세요. 커넥터의 이름은 변경할 수 없습니다.
LOCATION: Connect 클러스터의 위치입니다.
CONNECT_CLUSTER_ID: 커넥터가 생성된 Connect 클러스터의 ID입니다.
CONFIG_FILE: YAML 또는 JSON 구성 파일의 경로입니다.
다음은 구성 파일의 예시입니다.
connector.class: "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
cps.project: "PROJECT_ID"
cps.subscription: "PUBSUB_SUBSCRIPTION_ID"
kafka.topic: "KAFKA_TOPIC_ID"
value.converter: "org.apache.kafka.connect.converters.ByteArrayConverter"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
tasks.max: "3"
다음을 바꿉니다.
PROJECT_ID: Pub/Sub 구독이 있는 Google Cloud 프로젝트의 ID입니다.
PUBSUB_SUBSCRIPTION_ID: 데이터를 가져올 Pub/Sub 구독의 ID입니다.
KAFKA_TOPIC_ID: 데이터가 작성되는 Kafka 주제의 ID입니다.
cps.project, cps.subscription, kafka.topic 구성 속성은 필수입니다. 추가 구성 옵션은 커넥터 구성을 참고하세요.
Terraform
Terraform 리소스를 사용하여 커넥터를 만들 수 있습니다.
Terraform 구성을 적용하거나 삭제하는 방법은 기본 Terraform 명령어를 참조하세요.
Go
이 샘플을 사용해 보기 전에 클라이언트 라이브러리 설치의 Go 설정 안내를 따르세요. 자세한 내용은 Apache Kafka용 관리형 서비스 Go API 참조 문서를 참고하세요.
Managed Service for Apache Kafka에 인증하려면 애플리케이션 기본 사용자 인증 정보(ADC)를 설정합니다. 자세한 내용은 로컬 개발 환경의 ADC 설정을 참고하세요.
자바
이 샘플을 시도하기 전에 클라이언트 라이브러리 설치의 Java 설정 안내를 따르세요. 자세한 내용은 Apache Kafka용 관리형 서비스 Java API 참조 문서를 참고하세요.
Managed Service for Apache Kafka에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 ADC 설정을 참고하세요.
Python
이 샘플을 시도하기 전에 클라이언트 라이브러리 설치의 Python 설정 안내를 따르세요. 자세한 내용은 Apache Kafka용 관리형 서비스 Python API 참조 문서를 참고하세요.
Managed Service for Apache Kafka에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 ADC 설정을 참고하세요.
커넥터를 만든 후에는 커넥터를 수정, 삭제, 일시중지, 중지 또는 다시 시작할 수 있습니다.
커넥터 구성
이 섹션에서는 커넥터에서 설정할 수 있는 몇 가지 구성 속성을 설명합니다.
이 커넥터에만 해당하는 속성의 전체 목록은 Pub/Sub 소스 커넥터 구성을 참고하세요.
풀 모드
가져오기 모드는 커넥터가 Pub/Sub 메시지를 가져오는 방법을 지정합니다. 다음 모드가 지원됩니다.
풀 모드 (기본값) 메시지가 일괄적으로 가져옵니다. 이 모드를 사용 설정하려면
cps.streamingPull.enabled=false.를 설정하세요. 배치 크기를 구성하려면cps.maxBatchSize속성을 설정하세요.풀 모드에 관한 자세한 내용은 풀 API를 참고하세요.
스트리밍 풀 모드 Pub/Sub에서 메시지를 가져올 때 최대 처리량과 최저 지연 시간을 지원합니다. 이 모드를 사용 설정하려면
cps.streamingPull.enabled=true를 설정합니다.스트리밍 풀 모드에 대한 자세한 내용은 StreamingPull API를 참고하세요.
스트리밍 풀이 사용 설정된 경우 다음 구성 속성을 설정하여 성능을 조정할 수 있습니다.
cps.streamingPull.flowControlBytes: 태스크당 미처리 메시지 바이트의 최대 수입니다.cps.streamingPull.flowControlMessages: 태스크당 미처리 메시지의 최대 수입니다.cps.streamingPull.maxAckExtensionMs: 커넥터가 구독 기한을 연장하는 최대 시간(밀리초)입니다.cps.streamingPull.maxMsPerAckExtension: 커넥터가 연장당 구독 기한을 연장하는 최대 시간(밀리초)입니다.cps.streamingPull.parallelStreams: 구독에서 메시지를 가져올 스트림 수입니다.
Pub/Sub 엔드포인트
기본적으로 커넥터는 전역 Pub/Sub 엔드포인트를 사용합니다. 엔드포인트를 지정하려면 cps.endpoint 속성을 엔드포인트 주소로 설정합니다.
엔드포인트에 대한 자세한 내용은 Pub/Sub 엔드포인트를 참고하세요.
Kafka 레코드
Pub/Sub 소스 커넥터는 Pub/Sub 메시지를 Kafka 레코드로 변환합니다. 다음 섹션에서는 변환 프로세스를 설명합니다.
레코드 키
키 변환기는 org.apache.kafka.connect.storage.StringConverter이어야 합니다.
기본적으로 레코드 키는
null입니다.Pub/Sub 메시지 속성을 키로 사용하려면
kafka.key.attribute을 속성 이름으로 설정합니다. 예를 들면kafka.key.attribute=username입니다.Pub/Sub 순서 키를 키로 사용하려면
kafka.key.attribute=orderingKey를 설정합니다.
레코드 헤더
기본적으로 레코드 헤더는 비어 있습니다.
kafka.record.headers가 true인 경우 Pub/Sub 메시지 속성은 레코드 헤더로 작성됩니다. 순서 키를 포함하려면 cps.makeOrderingKeyAttribute=true을 설정합니다.
레코드 값
kafka.record.headers가 true이거나 Pub/Sub 메시지에 커스텀 속성이 없는 경우 레코드 값은 메시지 데이터(바이트 배열)입니다.
값 변환기를 org.apache.kafka.connect.converters.ByteArrayConverter로 설정합니다.
그렇지 않고 kafka.record.headers가 false이며 메시지에 하나 이상의 맞춤 속성이 있는 경우 커넥터는 레코드 값을 struct로 씁니다. 값 변환기를 org.apache.kafka.connect.json.JsonConverter로 설정합니다.
struct에는 다음과 같은 필드가 포함됩니다.
message: Pub/Sub 메시지 데이터(바이트)각 Pub/Sub 메시지 속성의 필드입니다. 순서 지정 키를 포함하려면
cps.makeOrderingKeyAttribute=true를 설정합니다.
예를 들어 메시지에 username 속성이 있다고 가정해 보겠습니다.
{
"message":"MESSAGE_DATA",
"username":"Alice"
}
value.converter.schemas.enable이 true이면 struct에 페이로드와 스키마가 모두 포함됩니다.
{
"schema":
{
"type":"struct",
"fields": [
{
"type":"bytes",
"optional":false,
"field":"message"
},
{
"type":"string",
"optional":false,
"field":"username"
}
],
"optional":false
},
"payload": {
"message":"MESSAGE_DATA",
"username":"Alice"
}
}
Kafka 파티션
기본적으로 커넥터는 주제의 단일 파티션에 씁니다. 커넥터가 쓰는 파티션 수를 지정하려면 kafka.partition.count 속성을 설정합니다. 값은 주제의 파티션 수를 초과해서는 안 됩니다.
커넥터가 파티션에 메시지를 할당하는 방법을 지정하려면 kafka.partition.scheme 속성을 설정합니다. 자세한 내용은 Pub/Sub 소스 커넥터 구성을 참고하세요.