연결 클러스터 만들기

Connect 클러스터는 기존 Kafka 배포에서 Apache Kafka용 Google Cloud 관리형 서비스 클러스터로 데이터를 이동하거나 Apache Kafka용 관리형 서비스 클러스터에서 다른 Google Cloud 서비스 또는 다른 Kafka 클러스터로 데이터를 이동하는 데 도움이 되는 커넥터 환경을 제공합니다. 보조 Kafka 클러스터는 또 다른 Apache Kafka용 Google Cloud 관리형 서비스 클러스터, 자체 관리형 클러스터 또는 온프레미스 클러스터일 수 있습니다.

시작하기 전에

Apache Kafka용 관리형 서비스 클러스터를 이미 만들었는지 확인합니다. Connect 클러스터가 연결될 Apache Kafka용 관리형 서비스 클러스터의 이름이 필요합니다.

각 Connect 클러스터는 Apache Kafka용 관리형 서비스 클러스터와 연결됩니다. 이 클러스터는 Connect 클러스터에서 실행되는 커넥터의 상태를 저장합니다.

Connect 클러스터를 만드는 데 필요한 역할 및 권한

Connect 클러스터를 만드는 데 필요한 권한을 얻으려면 관리자에게 프로젝트에 대한 관리형 Kafka Connect 클러스터 편집자 (roles/managedkafka.connectClusterEditor) IAM 역할을 부여해 달라고 요청하세요. 역할 부여에 대한 자세한 내용은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.

이 사전 정의된 역할에는 Connect 클러스터를 만드는 데 필요한 권한이 포함되어 있습니다. 필요한 정확한 권한을 보려면 필수 권한 섹션을 펼치세요.

필수 권한

Connect 클러스터를 만들려면 다음 권한이 필요합니다.

  • 지정된 위치에 Connect 클러스터 생성 권한을 부여합니다. managedkafka.connectClusters.create

커스텀 역할이나 다른 사전 정의된 역할을 사용하여 이 권한을 부여받을 수도 있습니다.

이 역할에 대한 자세한 내용은 Apache Kafka용 관리형 서비스 사전 정의된 역할을 참고하세요.

필수 ACL 보안 주체

기본적으로 Apache Kafka용 관리형 서비스 클러스터는 ACL이 구성되지 않은 경우 Connect 클러스터가 리소스에 액세스하도록 허용합니다. 기본 설정인 allow.everyone.if.no.acl.foundtrue로 설정하면 됩니다.

하지만 Apache Kafka용 관리형 서비스 클러스터에 ACL이 구성된 경우 Connect 클러스터에는 리소스에 대한 읽기 및 쓰기 권한이 자동으로 부여되지 않습니다. 수동으로 부여해야 합니다.

ACL에서 주 구성원으로 사용되는 Connect 클러스터 서비스 계정은 User:service-{consumer project number}@gcp-sa-managedkafka.iam.gserviceaccount.com 형식을 따릅니다.

Kafka 클러스터에 ACL을 구성한 경우 다음 명령어를 사용하여 Connect 클러스터에 주제에 대한 읽기 및 쓰기 권한과 소비자 그룹에 대한 읽기 권한을 부여합니다.

/bin/kafka-acls.sh \
    --bootstrap-server BOOTSTRAP_ADDR \
    --command-config PATH_TO_CLIENT_PROPERTIES \
    --add \
    --allow-principal User:service-{consumer project number}@gcp-sa-managedkafka.iam.gserviceaccount.com \
    --operation READ --operation WRITE --topic *
/bin/kafka-acls.sh \
    --bootstrap-server BOOTSTRAP_ADDR \
    --command-config PATH_TO_CLIENT_PROPERTIES \
    --add \
    --allow-principal User:service-{consumer project number}@gcp-sa-managedkafka.iam.gserviceaccount.com \
    --operation READ --group *

이러한 명령어에 대한 자세한 내용은 세부적인 액세스 제어를 위해 Apache Kafka ACL 구성을 참고하세요.

다른 프로젝트에서 Connect 클러스터 만들기

Connect 클러스터를 만들면 동일한 프로젝트에 있는 Apache Kafka용 관리형 서비스 클러스터와 동일한 서비스 에이전트를 공유합니다. 이 Apache Kafka용 관리형 서비스 클러스터가 Connect 클러스터에 연결된 기본 Kafka 클러스터로 지정된 경우 추가 권한이 필요하지 않습니다.

서비스 에이전트는 service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com 형식입니다. 프로젝트 번호는 Connect 클러스터와 Apache Kafka용 관리형 서비스 클러스터가 포함된 프로젝트의 번호입니다.

Connect 클러스터가 A 프로젝트에 있고 연결된 Apache Kafka용 관리형 서비스 클러스터가 B 프로젝트에 있는 경우 다음 단계를 따르세요.

  1. 프로젝트 A와 프로젝트 B 모두에 Managed Kafka API가 사용 설정되어 있는지 확인합니다.

    API 사용 설정하기

  2. A 프로젝트에서 Connect 클러스터의 서비스 에이전트를 식별합니다.

    서비스 에이전트는 service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com 형식입니다.

  3. B 프로젝트에서 Connect 클러스터의 서비스 계정에 관리형 Kafka 클라이언트 역할 (roles/managedkafka.client)을 부여합니다.

    이 역할은 Managed Service for Apache Kafka 클러스터에 연결하고 데이터를 읽고 쓰는 등의 작업을 실행하는 데 필요한 권한을 부여합니다.

    역할을 부여하는 방법에 대한 자세한 내용은 서비스 에이전트 만들기 및 역할 부여를 참고하세요.

권한을 부여할 때는 항상 최소 권한의 원칙을 따르세요. 보안을 보장하고 무단 액세스를 방지하는 데 필요한 권한만 부여합니다.

연결 클러스터의 속성

이 섹션에서는 Connect 클러스터의 속성을 설명합니다.

Connect 클러스터 이름

생성 중인 Connect 클러스터의 이름입니다. Connect 클러스터 이름을 지정하는 방법에 대한 가이드라인은 Apache Kafka용 관리형 서비스 리소스 이름 지정 가이드라인을 참고하세요. 클러스터 이름은 변경할 수 없습니다.

기본 Kafka 클러스터

Connect 클러스터와 연결된 Apache Kafka용 관리형 서비스 클러스터입니다. 이 연결된 클러스터 (기본 클러스터)는 Connect 클러스터에서 실행되는 커넥터의 상태를 저장합니다. 일반적으로 기본 Apache Kafka용 관리형 서비스 클러스터는 Connect 클러스터에서 실행되는 모든 소스 커넥터의 대상이자 모든 싱크 커넥터의 입력으로도 사용됩니다.

단일 Managed Service for Apache Kafka 클러스터에는 여러 Connect 클러스터가 있을 수 있습니다. 다른 프로젝트에서 Apache Kafka용 관리형 서비스 클러스터를 선택하는 경우 적절한 권한이 구성되어 있는지 확인하세요.

Connect 클러스터를 만든 후에는 다른 Kafka 클러스터로 업데이트할 수 없습니다.

지연 시간 및 네트워크 비용을 위한 리전 코로케이션 이점

Apache Kafka용 관리형 서비스 및 Connect 클러스터를 동일한 리전에 공동 배치하면 지연 시간과 네트워크 비용이 줄어듭니다. 예를 들어 Apache Kafka용 관리형 서비스 클러스터가 region-a에 있고 싱크 커넥터를 사용하여 이 Apache Kafka용 관리형 서비스 클러스터 (소스)의 데이터를 region-a에도 있는 BigQuery 테이블 (싱크)에 쓴다고 가정해 보겠습니다. region-a에 Connect 클러스터를 배포하면 BigQuery 쓰기 작업의 지연 시간이 최소화되고 Apache Kafka용 관리형 서비스 클러스터와 Connect 클러스터 간의 리전 간 네트워크 전송 비용이 발생하지 않습니다.

다중 시스템 지연 시간 및 비용 고려사항

Kafka Connect는 커넥터를 사용하여 시스템 간에 데이터를 이동합니다. 커넥터의 한쪽은 항상 Apache Kafka용 관리형 서비스 클러스터와 상호작용합니다. 단일 Kafka Connect 클러스터는 여러 커넥터를 실행할 수 있으며 각 커넥터는 소스 (시스템에서 데이터를 가져옴) 또는 싱크 (시스템에 데이터를 푸시) 역할을 합니다.

Apache Kafka용 관리형 서비스 클러스터와 동일한 리전에 있는 Connect 클러스터는 클러스터 간 통신 지연 시간이 짧다는 이점이 있지만 각 커넥터는 BigQuery 테이블이나 다른 Kafka 클러스터와 같은 다른 시스템과도 상호작용합니다. Connect 클러스터와 Managed Service for Apache Kafka 클러스터가 동일한 위치에 있더라도 다른 시스템은 다른 리전에 있을 수 있습니다. 이로 인해 지연 시간과 비용이 증가합니다. 전체 파이프라인 지연 시간은 Apache Kafka용 관리형 서비스 클러스터, Connect 클러스터, 소스 또는 싱크 시스템 등 세 시스템의 위치에 따라 달라집니다.

예를 들어 Apache Kafka용 관리형 서비스 클러스터가 region-a에 있고, Connect 클러스터가 region-b에 있으며, region-c의 버킷에 Cloud Storage 커넥터를 사용하는 경우 네트워크 홉 2개 (region-a에서 region-b로, region-b에서 region-c로 또는 커넥터 방향에 따라 그 반대)에 대한 요금이 청구됩니다.

지연 시간과 비용을 모두 최적화하려면 Connect 클러스터 배치를 계획할 때 관련된 모든 리전을 신중하게 고려하세요.

용량 구성

용량을 구성하려면 Connect 클러스터의 각 vCPU에 대한 vCPU 수와 메모리 양을 구성해야 합니다. 연결 클러스터를 만든 후 용량을 업데이트할 수 있습니다. 용량 구성의 속성은 다음과 같습니다.

  • vCPU: Connect 클러스터에 할당된 vCPU 수입니다. 최솟값은 vCPU 3개입니다.

  • 메모리: 각 vCPU에 할당된 메모리 양입니다. vCPU당 1GiB에서 8GiB 사이로 프로비저닝해야 합니다. 클러스터를 만든 후에는 이러한 한도 내에서 메모리 양을 늘리거나 줄일 수 있습니다.

    예를 들어 vCPU가 6개인 클러스터를 만드는 경우 클러스터에 할당할 수 있는 최소 메모리는 6GiB (vCPU당 1GiB)이고 최대 메모리는 48GiB (vCPU당 8GiB)입니다.

Connect 클러스터의 각 작업자에 할당된 vCPU와 메모리는 클러스터의 성능, 용량, 비용에 큰 영향을 미칩니다. vCPU와 메모리가 Connect 클러스터에 미치는 영향은 다음과 같습니다.

vCPU 수

  • Kafka Connect는 커넥터의 작업을 태스크로 나눕니다. 각 태스크는 데이터를 병렬로 처리할 수 있습니다. vCPU가 많을수록 더 많은 작업을 동시에 실행할 수 있으므로 처리량이 높아집니다.

  • vCPU를 늘리면 Connect 클러스터의 비용이 증가합니다.

메모리

  • Kafka Connect는 커넥터와 Apache Kafka용 관리형 서비스 간에 데이터가 흐를 때 데이터를 버퍼링하기 위해 메모리를 사용합니다. 메모리가 클수록 버퍼가 커집니다. 메모리가 크면 특히 대량 데이터 스트림의 처리량을 개선할 수 있습니다. 매우 큰 메시지나 레코드를 처리하는 커넥터는 OutOfMemoryError 예외가 발생하지 않고 처리할 수 있을 만큼 충분한 메모리가 필요합니다.

  • 메모리가 많을수록 Connect 클러스터의 비용이 증가합니다.

  • 변환 로직을 많이 사용하는 경우 메모리 할당이 더 많이 필요합니다.

목표는 Connect 클러스터에 적합한 용량 구성을 선택하는 것입니다. 이를 위해서는 Connect 클러스터에서 처리할 수 있는 처리량을 이해해야 합니다.

작업자 (기본) 서브넷

작업자 서브넷(기본 서브넷이라고도 함)은 VPC 네트워크를 Connect 클러스터에 연결합니다. 이 서브넷을 사용하면 클러스터 작업자가 소비자 네트워크의 소스 및 싱크 엔드포인트(예: Managed Service for Apache Kafka 클러스터 또는 자체 호스팅 Kafka 클러스터)에 도달할 수 있습니다.

다음은 작업자 서브넷 구성에 관한 몇 가지 요구사항입니다.

  • 작업자 서브넷은 필수입니다.

  • 서브넷은 Connect 클러스터와 동일한 리전에 있어야 합니다.

  • 서브넷은 연결된 서브넷 목록에 있는 기본 Kafka 클러스터 중 하나와 동일한 상위 VPC에 있어야 합니다.

  • 서브넷 CIDR 범위의 최소 크기는 /22 (1024개 주소)여야 합니다.

클러스터 작업자에는 Private Service Connect 인터페이스를 사용하여 작업자 서브넷의 IP 주소가 할당됩니다. 작업자는 서브넷의 VPC 네트워크에서 액세스할 수 있는 모든 네트워크 대상에 연결할 수 있으며, 다음 요구사항을 충족해야 합니다.

  • 엔드포인트는 172.16.0.0/14 CIDR 범위에 속하면 안 됩니다. 이 범위는 Apache Kafka Connect용 관리형 서비스 내부용으로 예약되어 있습니다.
  • 방화벽 규칙이 트래픽을 허용해야 합니다. 네트워크 연결에 대한 보안 구성을 참고하세요.
  • 인터넷 트래픽의 경우 Cloud NAT를 구성해야 합니다. 예를 들어 MirrorMaker 커넥터가 인터넷을 통해 액세스할 수 있는 Kafka 클러스터에서 데이터를 복제하려면 Cloud NAT가 필요합니다.
  • 작업자 서브넷 VPC와 다른 VPC에 있는 Private Service Connect 엔드포인트에 액세스하려면 지원되는 소비자 구성 (예: NCC)을 사용해야 합니다. 자세한 내용은 엔드포인트를 통해 게시된 서비스 액세스 정보를 참고하세요.

변환 가능한 DNS 도메인

확인 가능한 DNS 도메인(DNS 도메인 이름이라고도 함)을 사용하면 소비자 VPC 네트워크의 DNS 주소를 테넌트 VPC에서 사용할 수 있습니다. 이를 통해 Connect 클러스터가 DNS 이름을 IP 주소로 변환하여 MirrorMaker 커넥터의 다른 Kafka 클러스터를 비롯한 다른 서비스와의 통신을 용이하게 할 수 있습니다.

해결 가능한 DNS 도메인의 경우 Apache Kafka용 관리형 서비스 클러스터를 선택할 수 있습니다. 기본 Apache Kafka용 관리형 서비스 클러스터의 부트스트랩 주소가 변환 가능한 DNS 도메인 목록에 자동으로 포함되므로 기본 Apache Kafka용 관리형 서비스 클러스터의 DNS 도메인 이름을 구성할 필요가 없습니다.

하지만 DNS 도메인을 수동으로 지정할 수도 있습니다. 외부 Kafka 클러스터를 선택하는 경우 DNS 도메인을 수동으로 지정해야 합니다. 기본 Managed Service for Apache Kafka 클러스터의 DNS 도메인이 자동으로 포함됩니다. 다른 Kafka 클러스터에서는 여전히 DNS 도메인을 구성해야 합니다.

Secret Manager 리소스

작업자에 로드할 Secret Manager를 지정합니다. 이러한 보안 비밀은 Secret Manager에 안전하게 저장되고 Connect 클러스터에서 사용할 수 있습니다.

커넥터 구성에서 Secret Manager를 선택적으로 사용할 수 있습니다. 예를 들어 Connect 클러스터에 키 파일을 로드하고 커넥터가 파일을 읽도록 할 수 있습니다. Secret Manager는 작업자에 파일로 마운트됩니다.

클러스터 연결은 Secret Manager와 직접 통합됩니다. Secret Manager를 사용하여 보안 비밀을 저장하고 관리해야 합니다.

보안 비밀을 지정하는 형식은 projects/{PROJECT_ID}/secrets/{SECRET_NAME}/versions/{VERSION_ID}입니다.

  • PROJECT_ID: Secret Manager 보안 비밀이 있는 프로젝트의 ID입니다.

  • SECRET_NAME: Secret Manager의 보안 비밀 이름

  • VERSION_ID: 보안 비밀의 특정 버전 번호입니다. '1', '2', '3'과 같은 숫자입니다.

단일 Connect 클러스터에 최대 32개의 보안 비밀을 로드할 수 있습니다.

Connect 워커를 실행하는 서비스 에이전트에 사용하려는 보안 비밀에 대한 secretmanager.secretAccessor 역할 (Secret Manager 보안 비밀 접근자)이 있는지 확인합니다. 이 역할을 사용하면 Connect 클러스터가 Secret Manager에서 보안 비밀 값을 가져올 수 있습니다.

라벨

라벨은 구성 및 식별에 도움이 되는 키-값 쌍입니다. 이를 통해 Connect 클러스터를 정리할 수 있습니다. 각 Connect 클러스터에 라벨을 연결한 후 라벨을 기준으로 리소스를 필터링할 수 있습니다. 라벨의 예는 environment:prod, application:web-app입니다.

Connect 클러스터 만들기

클러스터를 만들기 전에 연결 클러스터 속성 문서를 검토하세요.

Connect 클러스터를 만드는 데 20~30분 정도 걸립니다.

콘솔

  1. Google Cloud 콘솔에서 클러스터 연결 페이지로 이동합니다.

    클러스터 연결로 이동

  2. 만들기를 클릭합니다.

    Connect 클러스터 만들기 페이지가 열립니다.

  3. Connect 클러스터 이름에 문자열을 입력합니다.

    Connect 클러스터 이름을 지정하는 방법에 대한 자세한 내용은 Apache Kafka용 관리형 서비스 리소스 이름 지정 가이드라인을 참고하세요.

  4. 기본 Kafka 클러스터에서 메뉴의 Apache Kafka용 관리형 서비스 클러스터를 선택합니다.

    이 Apache Kafka용 관리형 서비스 클러스터가 실행하는 기능에 대한 자세한 내용은 기본 Kafka 클러스터를 참고하세요.

  5. 위치의 경우 리전 메뉴에서 지원되는 위치를 선택하거나 기본값을 유지합니다.

    올바른 위치를 선택하는 방법에 대한 자세한 내용은 기본 Kafka 클러스터를 참고하세요.

  6. 용량 구성에서 vCPU메모리 값을 입력하거나 기본값을 유지합니다.

    vCPUs에 클러스터의 가상 CPU 수를 입력합니다.

    메모리에 CPU당 메모리 양(GiB)을 입력합니다. CPU당 메모리가 8GiB를 초과하면 오류 메시지가 표시됩니다.

    Apache Kafka용 관리형 서비스 클러스터의 크기를 조정하는 방법을 자세히 알아보려면 용량 구성을 참고하세요.

  7. 네트워크 구성의 경우 네트워크 메뉴에서 기본 Apache Kafka용 관리형 서비스 클러스터의 네트워크를 선택하거나 유지합니다.

  8. 작업자 서브넷에서 메뉴의 서브넷을 선택하거나 유지합니다.

    서브넷 URI 경로 필드가 자동으로 채워집니다. 자세한 내용은 작업자 서브넷을 참고하세요.

  9. 변환 가능한 DNS 도메인의 경우 기본 Kafka 클러스터의 DNS 도메인이 변환 가능한 DNS 도메인으로 자동으로 추가됩니다.

    DNS 도메인을 추가하려면 필요한 경우 섹션을 펼칩니다.

  10. DNS 도메인 추가를 클릭합니다.

    메뉴에서 Kafka 클러스터를 선택합니다.

    DNS 도메인이 자동으로 채워집니다. 외부 Kafka 클러스터의 DNS 도메인 이름을 입력할 수도 있습니다.

    완료를 클릭합니다.

  11. Secret Manager 리소스의 경우 필요한 경우 섹션을 펼칩니다.

  12. 보안 비밀 리소스 추가를 클릭합니다.

  13. 보안 비밀 메뉴에서 보안 비밀을 선택하고 보안 비밀 버전 메뉴에서 버전을 선택합니다. 새 보안 비밀을 만들 수도 있습니다.

    Connect 작업자를 실행하는 서비스 에이전트에 사용하려는 보안 비밀에 대한 Secret Manager 보안 비밀 접근자 역할이 있는지 확인합니다. Secret Manager에 대한 자세한 내용은 Secret Manager 리소스를 참고하세요.

  14. 완료를 클릭합니다.

  15. 보안 비밀을 더 추가해야 하는 경우 보안 비밀 리소스 추가를 클릭합니다.

  16. 라벨의 경우 필요한 경우 섹션을 펼칩니다.

    프로젝트를 구성하려면 리소스에 키-값 쌍으로 임의의 라벨을 추가하세요.

    라벨 추가를 클릭하여 다양한 환경, 서비스, 소유자, 팀 등을 포함합니다.

  17. 만들기를 클릭합니다.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. gcloud managed-kafka connect-clusters create 명령어를 실행합니다.

    gcloud managed-kafka connect-clusters create CONNECT_CLUSTER_ID \
        --location=LOCATION \
        --cpu=CPU \
        --memory=MEMORY \
        --primary-subnet=WORKER_SUBNET \
        --kafka-cluster=KAFKA_CLUSTER \
        [--project=PROJECT_ID] \
        [--secret=SECRET] \
        [--dns-name=DNS_DOMAIN_NAME] \
        [--config-file=CONFIG_FILE] \
        [--labels=LABELS]
        [--async]
    

    다음을 바꿉니다.

    • CONNECT_CLUSTER_ID: Connect 클러스터의 ID 또는 이름입니다. Connect 클러스터 이름을 지정하는 방법에 대한 가이드라인은 Apache Kafka용 관리형 서비스 리소스 이름 지정 가이드라인을 참고하세요. Connect 클러스터의 이름은 변경할 수 없습니다.

    • LOCATION: Connect 클러스터를 만드는 위치입니다. 지원되는 Google Cloud리전이어야 합니다. 생성 후에는 Connect 클러스터의 위치를 변경할 수 없습니다. 사용 가능한 위치 목록은 Apache Kafka용 관리형 서비스 위치를 참고하세요. 위치 추천에 대한 자세한 내용은 기본 Kafka 클러스터를 참고하세요.

    • CPU: Connect 클러스터의 vCPU 수입니다. 최솟값은 vCPU 3개입니다. vCPU 수를 참고하세요.

    • MEMORY: Connect 클러스터의 메모리 양입니다. 'MB', 'MiB', 'GB', 'GiB', 'TB', 'TiB' 단위를 사용하세요. 예를 들어 '3GiB'입니다. vCPU당 1GiB에서 8GiB 사이로 프로비저닝해야 합니다. 메모리를 참고하세요.

    • WORKER_SUBNET: Connect 클러스터의 작업자 서브넷입니다.

      서브넷의 형식은 projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_ID입니다.

      작업자 서브넷은 Connect 클러스터와 동일한 리전에 있어야 합니다.

    • PROJECT_ID: (선택사항)Google Cloud 프로젝트의 ID입니다. 제공되지 않으면 현재 프로젝트가 사용됩니다.

    • KAFKA_CLUSTER: Connect 클러스터와 연결된 기본 Apache Kafka용 관리형 서비스 클러스터의 ID 또는 정규화된 이름입니다. Kafka 클러스터를 참고하세요. Kafka 클러스터의 형식은 projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID입니다.

      Connect 클러스터를 만든 후에는 다른 Kafka 클러스터로 업데이트할 수 없습니다.

    • SECRET: (선택사항) 작업자에 로드할 보안 비밀입니다. Secret Manager의 정확한 보안 비밀 버전을 제공해야 합니다. 별칭은 지원되지 않습니다. 하나의 클러스터에 최대 32개의 보안 비밀을 로드할 수 있습니다. 형식: projects/PROJECT_ID/secrets/SECRET_NAME/versions/VERSION_ID

    • DNS_DOMAIN_NAME: (선택사항) Connect Cluster에 표시할 서브넷의 DNS 도메인 이름입니다. Connect 클러스터는 IP 주소에 의존하는 대신 도메인 이름을 사용하여 리소스에 액세스할 수 있습니다. DNS 피어링을 참고하세요.

    • LABELS: (선택사항) 클러스터와 연결할 라벨입니다. 라벨 형식에 대한 자세한 내용은 라벨을 참고하세요. 추가할 라벨 KEY=VALUE 쌍의 목록입니다. 키는 소문자로 시작해야 하고 하이픈 (-), 밑줄 (_), 소문자, 숫자만 포함할 수 있습니다. 값은 하이픈(-), 밑줄(_), 소문자, 숫자만 포함해야 합니다.

    • CONFIG_FILE: (선택사항) 클러스터 또는 커넥터 기본값에서 재정의된 구성이 포함된 JSON 또는 YAML 파일의 경로입니다. 이 파일은 인라인 JSON 또는 YAML도 지원합니다.

    • --async: (선택사항) 진행 중인 작업이 완료될 때까지 기다리지 않고 즉시 반환합니다. --async 플래그를 사용하면 클러스터가 백그라운드에서 생성되는 동안 다른 작업을 계속할 수 있습니다. 이 플래그를 사용하지 않으면 시스템은 작업이 완료될 때까지 기다린 후 응답을 반환합니다. 다른 작업을 계속하려면 클러스터가 완전히 업데이트될 때까지 기다려야 합니다.

    다음과 비슷한 응답이 표시됩니다.

    Create request issued for: [sample-connectcluster]
    Check operation [projects/test-project/locations/us-east1/operations/operation-1753590328249-63ae19098cc06-64300a0a-06512d02] for status.
    

    OPERATION_ID를 저장하여 진행 상황을 추적합니다. 예를 들어 여기의 값은 operation-1753590328249-63ae19098cc06-64300a0a-06512d02입니다.

  3. Terraform

    Terraform 리소스를 사용하여 Connect 클러스터를 만들 수 있습니다.

    resource "google_managed_kafka_connect_cluster" "default" {
      provider           = google-beta
      project            = data.google_project.default.project_id
      connect_cluster_id = "my-connect-cluster-id"
      location           = "us-central1"
      kafka_cluster      = google_managed_kafka_cluster.default.id
      capacity_config {
        vcpu_count   = 12
        memory_bytes = 12884901888 # 12 GiB
      }
      gcp_config {
        access_config {
          network_configs {
            primary_subnet = google_compute_subnetwork.default.id
          }
        }
      }
    }

    Terraform 구성을 적용하거나 삭제하는 방법은 기본 Terraform 명령어를 참조하세요.

    Go

    이 샘플을 사용해 보기 전에 클라이언트 라이브러리 설치의 Go 설정 안내를 따르세요. 자세한 내용은 Apache Kafka용 관리형 서비스 Go API 참조 문서를 참고하세요.

    Managed Service for Apache Kafka에 인증하려면 애플리케이션 기본 사용자 인증 정보(ADC)를 설정합니다. 자세한 내용은 로컬 개발 환경의 ADC 설정을 참고하세요.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func createConnectCluster(w io.Writer, projectID, region, clusterID, kafkaCluster string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-connect-cluster"
    	// kafkaCluster := "projects/my-project-id/locations/us-central1/clusters/my-kafka-cluster"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	locationPath := fmt.Sprintf("projects/%s/locations/%s", projectID, region)
    	clusterPath := fmt.Sprintf("%s/connectClusters/%s", locationPath, clusterID)
    
    	// Capacity configuration with 12 vCPU and 12 GiB RAM
    	capacityConfig := &managedkafkapb.CapacityConfig{
    		VcpuCount:   12,
    		MemoryBytes: 12884901888, // 12 GiB in bytes
    	}
    
    	// Optionally, you can also specify accessible subnets and resolvable DNS
    	// domains as part of your network configuration. For example:
    	// networkConfigs := []*managedkafkapb.ConnectNetworkConfig{
    	// 	{
    	// 		PrimarySubnet:      primarySubnet,
    	// 		AdditionalSubnets:  []string{"subnet-1", "subnet-2"},
    	// 		DnsDomainNames:     []string{"domain-1", "domain-2"},
    	// 	},
    	// }
    
    	connectCluster := &managedkafkapb.ConnectCluster{
    		Name:           clusterPath,
    		KafkaCluster:   kafkaCluster,
    		CapacityConfig: capacityConfig,
    	}
    
    	req := &managedkafkapb.CreateConnectClusterRequest{
    		Parent:           locationPath,
    		ConnectClusterId: clusterID,
    		ConnectCluster:   connectCluster,
    	}
    	op, err := client.CreateConnectCluster(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateConnectCluster got err: %w", err)
    	}
    	// The duration of this operation can vary considerably, typically taking 5-15 minutes.
    	resp, err := op.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("op.Wait got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created connect cluster: %s\n", resp.Name)
    	return nil
    }
    

    자바

    이 샘플을 시도하기 전에 클라이언트 라이브러리 설치의 Java 설정 안내를 따르세요. 자세한 내용은 Apache Kafka용 관리형 서비스 Java API 참조 문서를 참고하세요.

    Managed Service for Apache Kafka에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 ADC 설정을 참고하세요.

    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.api.gax.longrunning.OperationSnapshot;
    import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
    import com.google.api.gax.retrying.RetrySettings;
    import com.google.api.gax.retrying.RetryingFuture;
    import com.google.api.gax.retrying.TimedRetryAlgorithm;
    import com.google.cloud.managedkafka.v1.CapacityConfig;
    import com.google.cloud.managedkafka.v1.ConnectAccessConfig;
    import com.google.cloud.managedkafka.v1.ConnectCluster;
    import com.google.cloud.managedkafka.v1.ConnectGcpConfig;
    import com.google.cloud.managedkafka.v1.ConnectNetworkConfig;
    import com.google.cloud.managedkafka.v1.CreateConnectClusterRequest;
    import com.google.cloud.managedkafka.v1.LocationName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectSettings;
    import com.google.cloud.managedkafka.v1.OperationMetadata;
    import java.time.Duration;
    import java.util.concurrent.ExecutionException;
    
    public class CreateConnectCluster {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-connect-cluster";
        String subnet = "my-subnet"; // e.g. projects/my-project/regions/my-region/subnetworks/my-subnet
        String kafkaCluster = "my-kafka-cluster"; // The Kafka cluster to connect to
        int cpu = 12;
        long memoryBytes = 12884901888L; // 12 GiB
        createConnectCluster(projectId, region, clusterId, subnet, kafkaCluster, cpu, memoryBytes);
      }
    
      public static void createConnectCluster(
          String projectId,
          String region,
          String clusterId,
          String subnet,
          String kafkaCluster,
          int cpu,
          long memoryBytes)
          throws Exception {
        CapacityConfig capacityConfig = CapacityConfig.newBuilder().setVcpuCount(cpu)
            .setMemoryBytes(memoryBytes).build();
        ConnectNetworkConfig networkConfig = ConnectNetworkConfig.newBuilder()
            .setPrimarySubnet(subnet)
            .build();
        // Optionally, you can also specify additional accessible subnets and resolvable
        // DNS domains as part of your network configuration. For example:
        // .addAllAdditionalSubnets(List.of("subnet-1", "subnet-2"))
        // .addAllDnsDomainNames(List.of("dns-1", "dns-2"))
        ConnectGcpConfig gcpConfig = ConnectGcpConfig.newBuilder()
            .setAccessConfig(ConnectAccessConfig.newBuilder().addNetworkConfigs(networkConfig).build())
            .build();
        ConnectCluster connectCluster = ConnectCluster.newBuilder()
            .setCapacityConfig(capacityConfig)
            .setGcpConfig(gcpConfig)
            .setKafkaCluster(kafkaCluster)
            .build();
    
        // Create the settings to configure the timeout for polling operations
        ManagedKafkaConnectSettings.Builder settingsBuilder = ManagedKafkaConnectSettings.newBuilder();
        TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
            RetrySettings.newBuilder()
                .setTotalTimeoutDuration(Duration.ofHours(1L))
                .build());
        settingsBuilder.createConnectClusterOperationSettings()
            .setPollingAlgorithm(timedRetryAlgorithm);
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient
            .create(settingsBuilder.build())) {
          CreateConnectClusterRequest request = CreateConnectClusterRequest.newBuilder()
              .setParent(LocationName.of(projectId, region).toString())
              .setConnectClusterId(clusterId)
              .setConnectCluster(connectCluster)
              .build();
    
          // The duration of this operation can vary considerably, typically taking
          // between 10-30 minutes.
          OperationFuture<ConnectCluster, OperationMetadata> future = managedKafkaConnectClient
              .createConnectClusterOperationCallable().futureCall(request);
    
          // Get the initial LRO and print details.
          OperationSnapshot operation = future.getInitialFuture().get();
          System.out.printf(
              "Connect cluster creation started. Operation name: %s\nDone: %s\nMetadata: %s\n",
              operation.getName(), operation.isDone(), future.getMetadata().get().toString());
    
          while (!future.isDone()) {
            // The pollingFuture gives us the most recent status of the operation
            RetryingFuture<OperationSnapshot> pollingFuture = future.getPollingFuture();
            OperationSnapshot currentOp = pollingFuture.getAttemptResult().get();
            System.out.printf("Polling Operation:\nName: %s\n Done: %s\n",
                currentOp.getName(),
                currentOp.isDone());
          }
    
          // NOTE: future.get() blocks completion until the operation is complete (isDone
          // = True)
          ConnectCluster response = future.get();
          System.out.printf("Created connect cluster: %s\n", response.getName());
        } catch (ExecutionException e) {
          System.err.printf("managedKafkaConnectClient.createConnectCluster got err: %s\n", 
              e.getMessage());
          throw e;
        }
      }
    }

    Python

    이 샘플을 시도하기 전에 클라이언트 라이브러리 설치의 Python 설정 안내를 따르세요. 자세한 내용은 Apache Kafka용 관리형 서비스 Python API 참조 문서를 참고하세요.

    Managed Service for Apache Kafka에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 ADC 설정을 참고하세요.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud import managedkafka_v1
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import ManagedKafkaConnectClient
    from google.cloud.managedkafka_v1.types import ConnectCluster, CreateConnectClusterRequest, ConnectNetworkConfig
    
    # TODO(developer): Update with your values.
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # kafka_cluster_id = "my-kafka-cluster"
    # primary_subnet = "projects/my-project-id/regions/us-central1/subnetworks/default"
    # cpu = 12
    # memory_bytes = 12884901888  # 12 GiB
    
    connect_client = ManagedKafkaConnectClient()
    kafka_client = managedkafka_v1.ManagedKafkaClient()
    
    parent = connect_client.common_location_path(project_id, region)
    kafka_cluster_path = kafka_client.cluster_path(project_id, region, kafka_cluster_id)
    
    connect_cluster = ConnectCluster()
    connect_cluster.name = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
    connect_cluster.kafka_cluster = kafka_cluster_path
    connect_cluster.capacity_config.vcpu_count = cpu
    connect_cluster.capacity_config.memory_bytes = memory_bytes
    connect_cluster.gcp_config.access_config.network_configs = [ConnectNetworkConfig(primary_subnet=primary_subnet)]
    # Optionally, you can also specify accessible subnets and resolvable DNS domains as part of your network configuration.
    # For example:
    # connect_cluster.gcp_config.access_config.network_configs = [
    #     ConnectNetworkConfig(
    #         primary_subnet=primary_subnet,
    #         additional_subnets=additional_subnets,
    #         dns_domain_names=dns_domain_names,
    #     )
    # ]
    
    request = CreateConnectClusterRequest(
        parent=parent,
        connect_cluster_id=connect_cluster_id,
        connect_cluster=connect_cluster,
    )
    
    try:
        operation = connect_client.create_connect_cluster(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        # Creating a Connect cluster can take 10-40 minutes.
        response = operation.result(timeout=3000)
        print("Created Connect cluster:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")
    

클러스터 생성 작업 모니터링

Connect 클러스터를 만들기 위해 gcloud CLI를 실행한 경우에만 다음 명령어를 실행할 수 있습니다.

  • Connect 클러스터를 만드는 데는 일반적으로 20~30분이 걸립니다. 클러스터 생성 진행 상황을 추적하기 위해 gcloud managed-kafka connect-clusters create 명령어는 장기 실행 작업 (LRO)을 사용하며, 다음 명령어를 사용하여 모니터링할 수 있습니다.

    gcloud managed-kafka operations describe OPERATION_ID \
        --location=LOCATION
    

    다음을 바꿉니다.

    • OPERATION_ID을 이전 섹션의 작업 ID 값으로 바꿉니다.
    • LOCATION을 이전 섹션의 위치 값으로 바꿉니다.

다음 단계

Apache Kafka®는 미국 및/또는 다른 국가에서 사용되는 Apache Software Foundation 또는 해당 계열사의 등록 상표입니다.