연결 클러스터 만들기

Connect 클러스터는 기존 Kafka 배포에서 Google Cloud Managed Service for Apache Kafka 클러스터로 데이터를 이동하거나 Managed Service for Apache Kafka 클러스터에서 다른 Google Cloud 서비스 또는 다른 Kafka 클러스터로 데이터를 이동하는 데 도움이 되는 커넥터 환경을 제공합니다. 보조 Kafka 클러스터는 또 다른 Google Cloud Managed Service for Apache Kafka 클러스터, 자체 관리형 클러스터 또는 온프레미스 클러스터일 수 있습니다.

시작하기 전에

Managed Service for Apache Kafka 클러스터를 이미 만들었는지 확인합니다. Connect 클러스터가 연결될 Managed Service for Apache Kafka 클러스터의 이름이 필요합니다.

각 Connect 클러스터는 Managed Service for Apache Kafka 클러스터와 연결됩니다. 이 클러스터는 Connect 클러스터에서 실행되는 커넥터의 상태를 저장합니다.

Connect 클러스터를 만드는 데 필요한 역할 및 권한

Connect 클러스터를 만드는 데 필요한 권한을 얻으려면 관리자에게 프로젝트에 대한 관리형 Kafka Connect 클러스터 편집자 (roles/managedkafka.connectClusterEditor) IAM 역할을 부여해 달라고 요청하세요. 역할 부여에 대한 자세한 내용은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.

이 사전 정의된 역할에는 Connect 클러스터를 만드는 데 필요한 권한이 포함되어 있습니다. 필요한 정확한 권한을 보려면 필수 권한 섹션을 펼치세요.

필수 권한

Connect 클러스터를 만들려면 다음 권한이 필요합니다.

  • 지정된 위치에 Connect 클러스터 생성 권한을 부여합니다. managedkafka.connectClusters.create

커스텀 역할이나 다른 사전 정의된 역할을 사용하여 이 권한을 부여받을 수도 있습니다.

필수 ACL 보안 주체

기본적으로 Managed Service for Apache Kafka 클러스터는 ACL이 구성되지 않은 경우 Connect 클러스터가 리소스에 액세스하도록 허용합니다. 기본 설정인 allow.everyone.if.no.acl.foundtrue로 설정하면 됩니다.

하지만 Managed Service for Apache Kafka 클러스터에 ACL이 구성된 경우 Connect 클러스터에는 리소스에 대한 읽기 및 쓰기 권한이 자동으로 부여되지 않습니다. 수동으로 부여해야 합니다.

ACL에서 주 구성원으로 사용되는 Connect 클러스터 서비스 계정은 User:service-{consumer project number}@gcp-sa-managedkafka.iam.gserviceaccount.com 형식을 따릅니다.

Kafka 클러스터에 ACL을 구성한 경우 다음 명령어를 사용하여 Connect 클러스터에 주제에 대한 읽기 및 쓰기 권한과 소비자 그룹에 대한 읽기 권한을 부여합니다.

/bin/kafka-acls.sh \
    --bootstrap-server BOOTSTRAP_ADDR \
    --command-config PATH_TO_CLIENT_PROPERTIES \
    --add \
    --allow-principal User:service-{consumer project number}@gcp-sa-managedkafka.iam.gserviceaccount.com \
    --operation READ --operation WRITE --topic *
/bin/kafka-acls.sh \
    --bootstrap-server BOOTSTRAP_ADDR \
    --command-config PATH_TO_CLIENT_PROPERTIES \
    --add \
    --allow-principal User:service-{consumer project number}@gcp-sa-managedkafka.iam.gserviceaccount.com \
    --operation READ --group *

이러한 명령어에 대한 자세한 내용은 세부적인 액세스 제어를 위해 Apache Kafka ACL 구성을 참고하세요.

다른 프로젝트에서 Connect 클러스터 만들기

Managed Service for Apache Kafka는 서비스 에이전트를 사용하여Google Cloud 리소스에 액세스합니다. 서비스 에이전트는 클러스터를 만드는 프로젝트와 연결됩니다.

Managed Service for Apache Kafka 클러스터와 다른 프로젝트에서 Connect 클러스터를 만드는 경우 Connect 클러스터와 Kafka 클러스터는 각 프로젝트와 연결된 서비스 에이전트를 사용합니다. 이 경우 Connect 클러스터의 서비스 에이전트가 Kafka 클러스터의 프로젝트에 있는 Google Cloud 리소스에 액세스할 권한이 필요합니다.

필요한 권한을 부여하려면 Kafka 클러스터의 프로젝트에서 Connect 클러스터의 서비스 에이전트에 관리형 Kafka 서비스 에이전트 역할을 부여하세요. 예를 들어 프로젝트 kafka-project에 Kafka 클러스터를 만들고 프로젝트 connect-project에 Connect 클러스터를 만드는 경우 kafka-project의 관리형 Kafka 서비스 에이전트 역할을 connect-project와 연결된 서비스 에이전트에 부여합니다.

서비스 에이전트의 이메일 주소는 service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com 형식입니다. 여기서 PROJECT_NUMBER는 프로젝트 번호입니다. 역할을 부여하는 방법에 대한 자세한 내용은 서비스 에이전트 만들기 및 역할 부여를 참고하세요.

연결 클러스터의 속성

이 섹션에서는 Connect 클러스터의 속성을 설명합니다.

Connect 클러스터 이름

생성 중인 Connect 클러스터의 이름입니다. Connect 클러스터 이름을 지정하는 방법에 대한 가이드라인은 Apache Kafka용 관리형 서비스 리소스 이름 지정 가이드라인을 참고하세요. 클러스터 이름은 변경할 수 없습니다.

기본 Kafka 클러스터

Connect 클러스터와 연결된 Managed Service for Apache Kafka 클러스터입니다. 이 연결된 클러스터 (기본 클러스터)는 Connect 클러스터에서 실행되는 커넥터의 상태를 저장합니다. 일반적으로 기본 Managed Service for Apache Kafka 클러스터는 Connect 클러스터에서 실행되는 모든 소스 커넥터의 대상이자 모든 싱크 커넥터의 입력으로도 사용됩니다.

단일 Managed Service for Apache Kafka 클러스터에는 여러 Connect 클러스터가 있을 수 있습니다. 다른 프로젝트에서 Managed Service for Apache Kafka 클러스터를 선택하는 경우 적절한 권한이 구성되어 있는지 확인하세요.

Connect 클러스터를 만든 후에는 다른 Kafka 클러스터로 업데이트할 수 없습니다.

지연 시간 및 네트워크 비용에 대한 리전 공동 배치 이점

Managed Service for Apache Kafka 및 Connect 클러스터를 동일한 리전에 공동 배치하면 지연 시간과 네트워크 비용이 줄어듭니다. 예를 들어 Apache Kafka용 관리형 서비스 클러스터가 region-a에 있고 싱크 커넥터를 사용하여 이 Apache Kafka용 관리형 서비스 클러스터 (소스)의 데이터를 region-a에도 있는 BigQuery 테이블 (싱크)에 쓴다고 가정해 보겠습니다. region-a에 Connect 클러스터를 배포하면 이 배포 선택으로 BigQuery 쓰기 작업의 지연 시간이 최소화되고 Managed Service for Apache Kafka 클러스터와 Connect 클러스터 간의 리전 간 네트워크 전송 비용이 절감됩니다.

다중 시스템 지연 시간 및 비용 고려사항

Kafka Connect는 커넥터를 사용하여 시스템 간에 데이터를 이동합니다. 커넥터의 한쪽은 항상 Managed Service for Apache Kafka 클러스터와 상호작용합니다. 단일 Kafka Connect 클러스터는 여러 커넥터를 실행할 수 있으며, 각 커넥터는 소스 (시스템에서 데이터를 가져옴) 또는 싱크 (시스템에 데이터를 푸시) 역할을 합니다.

Managed Service for Apache Kafka 클러스터와 동일한 리전에 있는 Connect 클러스터는 클러스터 간 통신 지연 시간이 짧다는 이점이 있지만 각 커넥터는 BigQuery 테이블이나 다른 Kafka 클러스터와 같은 다른 시스템과도 상호작용합니다. Connect 클러스터와 Managed Service for Apache Kafka 클러스터가 동일한 위치에 있더라도 다른 시스템은 다른 리전에 있을 수 있습니다. 이로 인해 지연 시간과 비용이 증가합니다. 전체 파이프라인 지연 시간은 Managed Service for Apache Kafka 클러스터, Connect 클러스터, 소스 또는 싱크 시스템 등 세 시스템의 위치에 따라 달라집니다.

예를 들어 Managed Service for Apache Kafka 클러스터가 region-a에 있고, Connect 클러스터가 region-b에 있으며, region-c의 버킷에 Cloud Storage 커넥터를 사용하는 경우 네트워크 홉 2개 (region-a에서 region-b로, region-b에서 region-c로 또는 커넥터 방향에 따라 그 반대)에 대한 요금이 청구됩니다.

지연 시간과 비용을 모두 최적화하려면 Connect 클러스터 배치를 계획할 때 관련된 모든 리전을 신중하게 고려하세요.

용량 구성

용량을 구성하려면 Connect 클러스터의 각 vCPU에 대한 vCPU 수와 메모리 양을 구성해야 합니다. 연결 클러스터를 만든 후 용량을 업데이트할 수 있습니다. 용량 구성의 속성은 다음과 같습니다.

  • vCPU: Connect 클러스터에 할당된 vCPU 수입니다. 최솟값은 vCPU 3개입니다.

  • 메모리: 각 vCPU에 할당된 메모리 양입니다. vCPU당 1GiB에서 8GiB 사이로 프로비저닝해야 합니다. 클러스터를 만든 후에는 이러한 한도 내에서 메모리 양을 늘리거나 줄일 수 있습니다.

    예를 들어 vCPU가 6개인 클러스터를 만드는 경우 클러스터에 할당할 수 있는 최소 메모리는 6GiB (vCPU당 1GiB)이고 최대 메모리는 48GiB (vCPU당 8GiB)입니다.

Connect 클러스터의 각 작업자에 할당된 vCPU와 메모리는 클러스터의 성능, 용량, 비용에 큰 영향을 미칩니다. vCPU와 메모리가 Connect 클러스터에 미치는 영향은 다음과 같습니다.

vCPU 수

  • Kafka Connect는 커넥터의 작업을 태스크로 나눕니다. 각 태스크는 데이터를 병렬로 처리할 수 있습니다. vCPU가 많을수록 더 많은 작업을 동시에 실행할 수 있으므로 처리량이 높아집니다.

  • vCPU를 늘리면 Connect 클러스터의 비용이 증가합니다.

메모리

  • Kafka Connect는 커넥터와 Managed Service for Apache Kafka 간에 데이터가 흐를 때 데이터를 버퍼링하기 위해 메모리를 사용합니다. 메모리가 클수록 버퍼가 커집니다. 메모리가 크면 특히 대량 데이터 스트림의 처리량을 개선할 수 있습니다. 매우 큰 메시지나 레코드를 처리하는 커넥터는 OutOfMemoryError 예외가 발생하지 않고 처리할 수 있는 충분한 메모리가 필요합니다.

  • 메모리가 많을수록 Connect 클러스터의 비용이 증가합니다.

  • 변환 로직을 많이 사용하는 경우 메모리 할당이 더 많이 필요합니다.

목표는 Connect 클러스터에 적합한 용량 구성을 선택하는 것입니다. 이를 위해서는 Connect 클러스터에서 처리할 수 있는 처리량을 이해해야 합니다.

작업자 (기본) 서브넷

작업자 서브넷(기본 서브넷이라고도 함)은 VPC 네트워크를 Connect 클러스터에 연결합니다. 이 서브넷을 사용하면 클러스터 작업자가 소비자 네트워크의 소스 및 싱크 엔드포인트(예: Managed Service for Apache Kafka 클러스터 또는 자체 호스팅 Kafka 클러스터)에 도달할 수 있습니다.

작업자 서브넷 구성에 관한 요구사항은 다음과 같습니다.

  • 작업자 서브넷은 필수입니다.

  • 서브넷은 Connect 클러스터와 동일한 리전에 있어야 합니다.

  • 서브넷은 연결된 서브넷 목록에 있는 기본 Kafka 클러스터 중 하나와 동일한 상위 VPC에 있어야 합니다.

  • 서브넷 CIDR 범위의 최소 크기는 /22 (1024개 주소)여야 합니다.

클러스터 작업자에는 Private Service Connect 인터페이스를 사용하여 작업자 서브넷의 IP 주소가 할당됩니다. 작업자는 서브넷의 VPC 네트워크에서 액세스할 수 있는 모든 네트워크 대상에 도달할 수 있으며 다음 요구사항을 충족해야 합니다.

  • 엔드포인트는 172.16.0.0/14 CIDR 범위에 속하지 않아야 합니다. 이 범위는 Managed Service for Apache Kafka Connect 내부용으로 예약되어 있습니다.
  • 방화벽 규칙이 트래픽을 허용해야 합니다. 네트워크 연결에 대한 보안 구성을 참고하세요.
  • 인터넷 트래픽의 경우 Cloud NAT를 구성해야 합니다. 예를 들어 MirrorMaker 커넥터가 인터넷을 통해 액세스할 수 있는 Kafka 클러스터에서 데이터를 복제하려면 Cloud NAT가 필요합니다.
  • 작업자 서브넷 VPC와 다른 VPC에 있는 Private Service Connect 엔드포인트에 액세스하려면 지원되는 소비자 구성 (예: NCC)을 사용해야 합니다. 자세한 내용은 엔드포인트를 통해 게시된 서비스 액세스 정보를 참고하세요.

변환 가능한 DNS 도메인

확인 가능한 DNS 도메인(DNS 도메인 이름이라고도 함)을 사용하면 소비자 VPC 네트워크의 DNS 주소를 테넌트 VPC에서 사용할 수 있습니다. 이를 통해 Connect 클러스터가 DNS 이름을 IP 주소로 변환하여 MirrorMaker 커넥터의 다른 Kafka 클러스터를 비롯한 다른 서비스와의 통신을 용이하게 할 수 있습니다.

해결 가능한 DNS 도메인의 경우 Managed Service for Apache Kafka 클러스터를 선택할 수 있습니다. 기본 Managed Service for Apache Kafka 클러스터의 부트스트랩 주소가 변환 가능한 DNS 도메인 목록에 자동으로 포함되므로 기본 Managed Service for Apache Kafka 클러스터의 DNS 도메인 이름을 구성하지 않아도 됩니다.

하지만 DNS 도메인을 수동으로 지정할 수도 있습니다. 외부 Kafka 클러스터를 선택하는 경우 이 작업이 필요합니다. 기본 Managed Service for Apache Kafka 클러스터의 DNS 도메인이 자동으로 포함됩니다. 다른 Kafka 클러스터에서는 여전히 DNS 도메인을 구성해야 합니다.

Secret Manager 리소스

일부 커넥터는 구성의 일부로 비밀번호와 같은 민감한 정보가 필요합니다. 이러한 유형의 데이터를 안전하게 관리하려면 Secret Manager에 데이터를 저장하고 Connect 클러스터에 보안 비밀에 대한 액세스 권한을 부여하면 됩니다.

Kafka Connect에서 Secret Manager 보안 비밀을 사용하려면 다음 단계를 따르세요.

  1. 관리형 Kafka 서비스 계정에 Secret Manager 보안 비밀 접근자(roles/secretmanager.secretAccessor) 역할을 부여합니다. 이 역할을 사용하면 Connect 클러스터가 보안 비밀에 액세스할 수 있습니다.

  2. 민감한 정보가 포함된 보안 비밀을 만듭니다. 자세한 내용은 보안 비밀 만들기를 참고하세요.

  3. Connect 클러스터를 만들거나 업데이트할 때 클러스터가 액세스할 수 있는 보안 비밀을 지정합니다. Connect 클러스터당 최대 32개의 보안 비밀을 지정할 수 있습니다.

보안 비밀은 클러스터 작업자에 파일로 마운트됩니다. 커넥터는 이러한 파일에 대한 읽기 전용 액세스 권한을 갖습니다. 커넥터를 만들 때 커넥터의 구성 속성은 보안 비밀을 참조할 수 있습니다.

  • 보안 비밀 파일의 경로를 참조하려면 다음 형식을 사용하세요.

    /var/secrets/PROJECT_NAME-SECRET_NAME-SECRET_VERSION
    

    예: ssl.truststore.location=/var/secrets/project1-truststore-1

  • 보안 비밀의 을 구성 값 (예: 비밀번호)으로 사용하려면 다음 형식을 사용하세요.

    ${directory:/var/secrets:PROJECT_NAME-SECRET_NAME-SECRET_VERSION}
    

    예: password=${directory:/var/secrets:project1-database_password-3}

다음을 바꿉니다.

  • PROJECT_NAME: Google Cloud 프로젝트의 이름입니다.
  • SECRET_NAME: 보안 비밀의 이름입니다.
  • SECRET_VERSION: 보안 비밀 버전입니다.

라벨

라벨은 구성 및 식별에 도움이 되는 키-값 쌍입니다. Connect 클러스터를 정리하는 데 도움이 됩니다. 각 Connect 클러스터에 라벨을 연결한 후 라벨을 기준으로 리소스를 필터링할 수 있습니다. 라벨의 예로는 environment:prod, application:web-app가 있습니다.

Connect 클러스터 만들기

클러스터를 만들기 전에 연결 클러스터 속성 문서를 검토하세요.

Connect 클러스터를 만드는 데 20~30분 정도 걸립니다.

콘솔

  1. Google Cloud 콘솔에서 클러스터 연결 페이지로 이동합니다.

    클러스터 연결로 이동

  2. 만들기를 클릭합니다.

  3. Connect 클러스터 이름 필드에 Connect 클러스터 이름을 입력합니다. 자세한 내용은 Managed Service for Apache Kafka 리소스 이름 지정 가이드라인을 참고하세요.

  4. 기본 Kafka 클러스터 목록에서 Managed Service for Apache Kafka 클러스터를 선택합니다. 자세한 내용은 기본 Kafka 클러스터를 참고하세요.

  5. 리전 목록에서 Connect 클러스터의 위치를 선택합니다. 위치를 선택하는 방법에 대한 자세한 내용은 기본 Kafka 클러스터를 참고하세요.

  6. 용량 구성 섹션에서 다음 필드의 값을 입력하거나 기본값을 유지합니다.

    • vCPUs 필드에 클러스터의 가상 CPU 수를 입력합니다.

    • 메모리 필드에 CPU당 메모리 양(GiB)을 입력합니다. 값은 CPU당 8GiB를 초과할 수 없습니다.

    자세한 내용은 용량 구성을 참고하세요.

  7. 네트워크 구성 섹션의 네트워크 목록에서 VPC 네트워크를 선택하거나 기본값을 그대로 둡니다. 이 목록은 기본 Kafka 클러스터를 선택하면 채워집니다.

  8. 작업자 서브넷 섹션의 서브넷 목록에서 서브넷을 선택하거나 기본값을 그대로 둡니다. 자세한 내용은 작업자 서브넷을 참고하세요. 서브넷을 선택하면 서브넷 URI 경로 필드가 자동으로 채워집니다.

  9. 선택사항: 변환 가능한 DNS 도메인을 추가합니다. 기본 Kafka 클러스터의 DNS 도메인이 변환 가능한 DNS 도메인으로 자동으로 추가됩니다. 추가 DNS 도메인을 지정하려면 다음을 수행하세요.

    1. 변환 가능한 DNS 도메인 섹션을 펼칩니다.

    2. Add DNS domain(DNS 도메인 추가)을 클릭합니다.

    3. 기존 Managed Service for Apache Kafka 클러스터의 DNS 도메인을 추가하려면 Kafka 클러스터 목록에서 클러스터를 선택합니다. 그렇지 않으면 DNS 도메인 필드에 DNS 도메인을 입력합니다.

    4. 완료를 클릭합니다.

  10. 선택사항: Secret Manager 리소스를 추가하려면 다음 단계를 따르세요.

    1. Secret Manager 리소스 섹션을 펼칩니다.

    2. 보안 비밀 리소스 추가를 클릭합니다.

    3. 보안 비밀 목록에서 보안 비밀을 선택합니다.

    4. 보안 비밀 버전 목록에서 보안 비밀 버전을 선택합니다.

    5. 완료를 클릭합니다.

  11. 선택사항: 프로젝트를 정리하기 위해 라벨을 추가합니다. 라벨을 추가하려면 다음 단계를 따르세요.

    1. 라벨 섹션을 펼칩니다.

    2. 라벨 추가를 클릭합니다.

    3. 필드에 라벨의 키를 입력합니다.

    4. 필드에 라벨 값을 입력합니다.

  12. 만들기를 클릭합니다.

gcloud

  1. Google Cloud 콘솔에서 Cloud Shell을 활성화합니다.

    Cloud Shell 활성화

    Google Cloud 콘솔 하단에 Cloud Shell 세션이 시작되고 명령줄 프롬프트가 표시됩니다. Cloud Shell은 Google Cloud CLI가 사전 설치된 셸 환경으로, 현재 프로젝트의 값이 이미 설정되어 있습니다. 세션이 초기화되는 데 몇 초 정도 걸릴 수 있습니다.

  2. gcloud managed-kafka connect-clusters create 명령어를 실행합니다.

    gcloud managed-kafka connect-clusters create CONNECT_CLUSTER_ID \
        --location=LOCATION \
        --cpu=CPU \
        --memory=MEMORY \
        --primary-subnet=WORKER_SUBNET \
        --kafka-cluster=KAFKA_CLUSTER \
        [--project=PROJECT_ID] \
        [--secret=SECRET] \
        [--dns-name=DNS_DOMAIN_NAME] \
        [--config-file=CONFIG_FILE] \
        [--labels=LABELS]
        [--async]
    

    다음을 바꿉니다.

    • CONNECT_CLUSTER_ID: Connect 클러스터의 ID 또는 이름입니다. Connect 클러스터 이름을 지정하는 방법에 대한 가이드라인은 Apache Kafka용 관리형 서비스 리소스 이름 지정 가이드라인을 참고하세요. Connect 클러스터의 이름은 변경할 수 없습니다.

    • LOCATION: Connect 클러스터를 만드는 위치입니다. 지원되는 Google Cloud리전이어야 합니다. 생성 후에는 Connect 클러스터의 위치를 변경할 수 없습니다. 사용 가능한 위치 목록은 Managed Service for Apache Kafka 위치를 참고하세요. 위치 추천에 대한 자세한 내용은 기본 Kafka 클러스터를 참고하세요.

    • CPU: Connect 클러스터의 vCPU 수입니다. 최솟값은 vCPU 3개입니다. vCPU 수를 참고하세요.

    • MEMORY: Connect 클러스터의 메모리 양입니다. 'MB', 'MiB', 'GB', 'GiB', 'TB', 'TiB' 단위를 사용하세요. 예를 들어 '3GiB'입니다. vCPU당 1GiB에서 8GiB 사이로 프로비저닝해야 합니다. 메모리를 참고하세요.

    • WORKER_SUBNET: Connect 클러스터의 작업자 서브넷입니다.

      서브넷의 형식은 projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_ID입니다.

      작업자 서브넷은 Connect 클러스터와 동일한 리전에 있어야 합니다.

    • PROJECT_ID: (선택사항)Google Cloud 프로젝트의 ID입니다. 제공되지 않으면 현재 프로젝트가 사용됩니다.

    • KAFKA_CLUSTER: Connect 클러스터와 연결된 기본 Managed Service for Apache Kafka 클러스터의 ID 또는 정규화된 이름입니다. Kafka 클러스터를 참고하세요. Kafka 클러스터의 형식은 projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID입니다.

      Connect 클러스터를 만든 후에는 다른 Kafka 클러스터로 업데이트할 수 없습니다.

    • SECRET: (선택사항) 작업자에 로드할 보안 비밀입니다. Secret Manager의 정확한 보안 비밀 버전을 제공해야 합니다. 별칭은 지원되지 않습니다. 하나의 클러스터에 최대 32개의 보안 비밀을 로드할 수 있습니다. 형식: projects/PROJECT_ID/secrets/SECRET_NAME/versions/VERSION_ID

    • DNS_DOMAIN_NAME: (선택사항) Connect Cluster에 표시할 서브넷의 DNS 도메인 이름입니다. Connect 클러스터는 IP 주소에 의존하는 대신 도메인 이름을 사용하여 리소스에 액세스할 수 있습니다. DNS 피어링을 참고하세요.

    • LABELS: (선택사항) 클러스터와 연결할 라벨입니다. 라벨 형식에 대한 자세한 내용은 라벨을 참고하세요. 추가할 라벨 KEY=VALUE 쌍의 목록입니다. 키는 소문자로 시작해야 하고 하이픈 (-), 밑줄 (_), 소문자, 숫자만 포함할 수 있습니다. 값은 하이픈(-), 밑줄(_), 소문자, 숫자만 포함해야 합니다.

    • CONFIG_FILE: (선택사항) 클러스터 또는 커넥터 기본값에서 재정의된 구성이 포함된 JSON 또는 YAML 파일의 경로입니다. 이 파일은 인라인 JSON 또는 YAML도 지원합니다.

    • --async: (선택사항) 진행 중인 작업이 완료될 때까지 기다리지 않고 즉시 반환합니다. --async 플래그를 사용하면 클러스터가 백그라운드에서 생성되는 동안 다른 작업을 계속할 수 있습니다. 플래그를 사용하지 않으면 시스템은 작업이 완료될 때까지 기다린 후 응답을 반환합니다. 다른 작업을 계속하려면 클러스터가 완전히 업데이트될 때까지 기다려야 합니다.

    다음과 비슷한 응답이 표시됩니다.

    Create request issued for: [sample-connectcluster]
    Check operation [projects/test-project/locations/us-east1/operations/operation-1753590328249-63ae19098cc06-64300a0a-06512d02] for status.
    

    OPERATION_ID를 저장하여 진행 상황을 추적합니다. 예를 들어 여기의 값은 operation-1753590328249-63ae19098cc06-64300a0a-06512d02입니다.

Terraform

Terraform 리소스를 사용하여 Connect 클러스터를 만들 수 있습니다.

resource "google_managed_kafka_connect_cluster" "default" {
  provider           = google-beta
  project            = data.google_project.default.project_id
  connect_cluster_id = "my-connect-cluster-id"
  location           = "us-central1"
  kafka_cluster      = google_managed_kafka_cluster.default.id
  capacity_config {
    vcpu_count   = 12
    memory_bytes = 12884901888 # 12 GiB
  }
  gcp_config {
    access_config {
      network_configs {
        primary_subnet = google_compute_subnetwork.default.id
      }
    }
  }
}

Terraform 구성을 적용하거나 삭제하는 방법은 기본 Terraform 명령어를 참조하세요.

Go

이 샘플을 사용해 보기 전에 클라이언트 라이브러리 설치의 Go 설정 안내를 따르세요. 자세한 내용은 Managed Service for Apache Kafka Go API 참고 문서를 참고하세요.

Managed Service for Apache Kafka에 인증하려면 애플리케이션 기본 사용자 인증 정보(ADC)를 설정합니다. 자세한 내용은 로컬 개발 환경의 ADC 설정을 참고하세요.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func createConnectCluster(w io.Writer, projectID, region, clusterID, kafkaCluster string, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-connect-cluster"
	// kafkaCluster := "projects/my-project-id/locations/us-central1/clusters/my-kafka-cluster"
	ctx := context.Background()
	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
	}
	defer client.Close()

	locationPath := fmt.Sprintf("projects/%s/locations/%s", projectID, region)
	clusterPath := fmt.Sprintf("%s/connectClusters/%s", locationPath, clusterID)

	// Capacity configuration with 12 vCPU and 12 GiB RAM
	capacityConfig := &managedkafkapb.CapacityConfig{
		VcpuCount:   12,
		MemoryBytes: 12884901888, // 12 GiB in bytes
	}

	// Optionally, you can also specify accessible subnets and resolvable DNS
	// domains as part of your network configuration. For example:
	// networkConfigs := []*managedkafkapb.ConnectNetworkConfig{
	// 	{
	// 		PrimarySubnet:      primarySubnet,
	// 		AdditionalSubnets:  []string{"subnet-1", "subnet-2"},
	// 		DnsDomainNames:     []string{"domain-1", "domain-2"},
	// 	},
	// }

	connectCluster := &managedkafkapb.ConnectCluster{
		Name:           clusterPath,
		KafkaCluster:   kafkaCluster,
		CapacityConfig: capacityConfig,
	}

	req := &managedkafkapb.CreateConnectClusterRequest{
		Parent:           locationPath,
		ConnectClusterId: clusterID,
		ConnectCluster:   connectCluster,
	}
	op, err := client.CreateConnectCluster(ctx, req)
	if err != nil {
		return fmt.Errorf("client.CreateConnectCluster got err: %w", err)
	}
	// The duration of this operation can vary considerably, typically taking 5-15 minutes.
	resp, err := op.Wait(ctx)
	if err != nil {
		return fmt.Errorf("op.Wait got err: %w", err)
	}
	fmt.Fprintf(w, "Created connect cluster: %s\n", resp.Name)
	return nil
}

자바

이 샘플을 시도하기 전에 클라이언트 라이브러리 설치의 Java 설정 안내를 따르세요. 자세한 내용은 Managed Service for Apache Kafka Java API 참고 문서를 참고하세요.

Managed Service for Apache Kafka에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 ADC 설정을 참고하세요.


import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.longrunning.OperationSnapshot;
import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.RetryingFuture;
import com.google.api.gax.retrying.TimedRetryAlgorithm;
import com.google.cloud.managedkafka.v1.CapacityConfig;
import com.google.cloud.managedkafka.v1.ConnectAccessConfig;
import com.google.cloud.managedkafka.v1.ConnectCluster;
import com.google.cloud.managedkafka.v1.ConnectGcpConfig;
import com.google.cloud.managedkafka.v1.ConnectNetworkConfig;
import com.google.cloud.managedkafka.v1.CreateConnectClusterRequest;
import com.google.cloud.managedkafka.v1.LocationName;
import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
import com.google.cloud.managedkafka.v1.ManagedKafkaConnectSettings;
import com.google.cloud.managedkafka.v1.OperationMetadata;
import java.time.Duration;
import java.util.concurrent.ExecutionException;

public class CreateConnectCluster {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-connect-cluster";
    String subnet = "my-subnet"; // e.g. projects/my-project/regions/my-region/subnetworks/my-subnet
    String kafkaCluster = "my-kafka-cluster"; // The Kafka cluster to connect to
    int cpu = 12;
    long memoryBytes = 12884901888L; // 12 GiB
    createConnectCluster(projectId, region, clusterId, subnet, kafkaCluster, cpu, memoryBytes);
  }

  public static void createConnectCluster(
      String projectId,
      String region,
      String clusterId,
      String subnet,
      String kafkaCluster,
      int cpu,
      long memoryBytes)
      throws Exception {
    CapacityConfig capacityConfig = CapacityConfig.newBuilder().setVcpuCount(cpu)
        .setMemoryBytes(memoryBytes).build();
    ConnectNetworkConfig networkConfig = ConnectNetworkConfig.newBuilder()
        .setPrimarySubnet(subnet)
        .build();
    // Optionally, you can also specify additional accessible subnets and resolvable
    // DNS domains as part of your network configuration. For example:
    // .addAllAdditionalSubnets(List.of("subnet-1", "subnet-2"))
    // .addAllDnsDomainNames(List.of("dns-1", "dns-2"))
    ConnectGcpConfig gcpConfig = ConnectGcpConfig.newBuilder()
        .setAccessConfig(ConnectAccessConfig.newBuilder().addNetworkConfigs(networkConfig).build())
        .build();
    ConnectCluster connectCluster = ConnectCluster.newBuilder()
        .setCapacityConfig(capacityConfig)
        .setGcpConfig(gcpConfig)
        .setKafkaCluster(kafkaCluster)
        .build();

    // Create the settings to configure the timeout for polling operations
    ManagedKafkaConnectSettings.Builder settingsBuilder = ManagedKafkaConnectSettings.newBuilder();
    TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
        RetrySettings.newBuilder()
            .setTotalTimeoutDuration(Duration.ofHours(1L))
            .build());
    settingsBuilder.createConnectClusterOperationSettings()
        .setPollingAlgorithm(timedRetryAlgorithm);

    try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient
        .create(settingsBuilder.build())) {
      CreateConnectClusterRequest request = CreateConnectClusterRequest.newBuilder()
          .setParent(LocationName.of(projectId, region).toString())
          .setConnectClusterId(clusterId)
          .setConnectCluster(connectCluster)
          .build();

      // The duration of this operation can vary considerably, typically taking
      // between 10-30 minutes.
      OperationFuture<ConnectCluster, OperationMetadata> future = managedKafkaConnectClient
          .createConnectClusterOperationCallable().futureCall(request);

      // Get the initial LRO and print details.
      OperationSnapshot operation = future.getInitialFuture().get();
      System.out.printf(
          "Connect cluster creation started. Operation name: %s\nDone: %s\nMetadata: %s\n",
          operation.getName(), operation.isDone(), future.getMetadata().get().toString());

      while (!future.isDone()) {
        // The pollingFuture gives us the most recent status of the operation
        RetryingFuture<OperationSnapshot> pollingFuture = future.getPollingFuture();
        OperationSnapshot currentOp = pollingFuture.getAttemptResult().get();
        System.out.printf("Polling Operation:\nName: %s\n Done: %s\n",
            currentOp.getName(),
            currentOp.isDone());
      }

      // NOTE: future.get() blocks completion until the operation is complete (isDone
      // = True)
      ConnectCluster response = future.get();
      System.out.printf("Created connect cluster: %s\n", response.getName());
    } catch (ExecutionException e) {
      System.err.printf("managedKafkaConnectClient.createConnectCluster got err: %s\n", 
          e.getMessage());
      throw e;
    }
  }
}

Python

이 샘플을 시도하기 전에 클라이언트 라이브러리 설치의 Python 설정 안내를 따르세요. 자세한 내용은 Managed Service for Apache Kafka Python API 참고 문서를 참고하세요.

Managed Service for Apache Kafka에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 ADC 설정을 참고하세요.

from google.api_core.exceptions import GoogleAPICallError
from google.cloud import managedkafka_v1
from google.cloud.managedkafka_v1.services.managed_kafka_connect import ManagedKafkaConnectClient
from google.cloud.managedkafka_v1.types import ConnectCluster, CreateConnectClusterRequest, ConnectNetworkConfig

# TODO(developer): Update with your values.
# project_id = "my-project-id"
# region = "us-central1"
# connect_cluster_id = "my-connect-cluster"
# kafka_cluster_id = "my-kafka-cluster"
# primary_subnet = "projects/my-project-id/regions/us-central1/subnetworks/default"
# cpu = 12
# memory_bytes = 12884901888  # 12 GiB

connect_client = ManagedKafkaConnectClient()
kafka_client = managedkafka_v1.ManagedKafkaClient()

parent = connect_client.common_location_path(project_id, region)
kafka_cluster_path = kafka_client.cluster_path(project_id, region, kafka_cluster_id)

connect_cluster = ConnectCluster()
connect_cluster.name = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
connect_cluster.kafka_cluster = kafka_cluster_path
connect_cluster.capacity_config.vcpu_count = cpu
connect_cluster.capacity_config.memory_bytes = memory_bytes
connect_cluster.gcp_config.access_config.network_configs = [ConnectNetworkConfig(primary_subnet=primary_subnet)]
# Optionally, you can also specify accessible subnets and resolvable DNS domains as part of your network configuration.
# For example:
# connect_cluster.gcp_config.access_config.network_configs = [
#     ConnectNetworkConfig(
#         primary_subnet=primary_subnet,
#         additional_subnets=additional_subnets,
#         dns_domain_names=dns_domain_names,
#     )
# ]

request = CreateConnectClusterRequest(
    parent=parent,
    connect_cluster_id=connect_cluster_id,
    connect_cluster=connect_cluster,
)

try:
    operation = connect_client.create_connect_cluster(request=request)
    print(f"Waiting for operation {operation.operation.name} to complete...")
    # Creating a Connect cluster can take 10-40 minutes.
    response = operation.result(timeout=3000)
    print("Created Connect cluster:", response)
except GoogleAPICallError as e:
    print(f"The operation failed with error: {e}")

클러스터 생성 작업 모니터링

Connect 클러스터를 만들기 위해 gcloud CLI를 실행한 경우에만 다음 명령어를 실행할 수 있습니다.

  • Connect 클러스터를 만드는 데는 일반적으로 20~30분이 걸립니다. 클러스터 생성 진행 상황을 추적하기 위해 gcloud managed-kafka connect-clusters create 명령어는 장기 실행 작업 (LRO)을 사용하며, 다음 명령어를 사용하여 모니터링할 수 있습니다.

    gcloud managed-kafka operations describe OPERATION_ID \
        --location=LOCATION
    

    다음을 바꿉니다.

    • OPERATION_ID를 이전 섹션의 작업 ID 값으로 바꿉니다.
    • LOCATION을 이전 섹션의 위치 값으로 바꿉니다.

다음 단계

Apache Kafka®는 미국 및/또는 다른 국가에서 사용되는 Apache Software Foundation 또는 해당 계열사의 등록 상표입니다.