MirrorMaker 2.0 커넥터 만들기

MirrorMaker 2.0은 Kafka 클러스터 간에 주제를 복제하는 도구입니다. 다음 MirrorMaker 2.0 커넥터를 만들 수 있습니다.

  • MirrorMaker 2.0 소스

  • MirrorMaker 2.0 체크포인트

  • MirrorMaker 2.0 하트비트

MirrorMaker 2.0 소스 커넥터는 항상 필요합니다. 소스에서 대상 클러스터로 데이터를 미러링하기 때문입니다. ACL도 동기화합니다. MirrorMaker 2.0 체크포인트 및 하트비트 커넥터는 선택사항입니다. 소스 커넥터를 만들지 않고 MirrorMaker 2.0 체크포인트 및 하트비트 커넥터를 만들 수도 있습니다.

이러한 커넥터에 대한 자세한 내용은 커넥터 개요를 참고하세요.

MirrorMaker 2.0의 클러스터 역할 이해

MirrorMaker 2.0을 구성할 때는 Kafka 클러스터가 수행하는 다양한 역할을 이해하는 것이 중요합니다.

  • 기본 클러스터: Managed Service for Apache Kafka 컨텍스트에서 이는 Kafka Connect 클러스터가 직접 연결된 Managed Service for Apache Kafka 클러스터입니다. Connect 클러스터는 MirrorMaker 2.0 커넥터 인스턴스를 호스팅합니다.

  • 보조 클러스터: 복제에 참여하는 다른 Kafka 클러스터입니다. 다른 Managed Service for Apache Kafka 클러스터 또는 외부 클러스터일 수 있습니다. Compute Engine, GKE, 온프레미스 또는 다른 클라우드에서 자체 관리하는 경우가 그 예입니다.

  • 소스 클러스터: MirrorMaker 2.0이 데이터를 복제하는 Kafka 클러스터입니다.

  • 타겟 클러스터: MirrorMaker 2.0이 데이터를 복제하는 Kafka 클러스터입니다.

기본 클러스터는 소스 또는 타겟으로 사용할 수 있습니다.

  • 기본 클러스터가 소스인 경우 보조 클러스터는 타겟입니다. 데이터는 기본 클러스터에서 보조 클러스터 흐릅니다.

  • 기본 클러스터가 타겟인 경우 보조 클러스터는 소스입니다. 데이터는 보조 클러스터에서 기본 클러스터 흐릅니다.

쓰기 작업의 지연 시간을 최소화하려면 타겟 클러스터를 기본 클러스터로 지정하고 Connect 클러스터를 타겟 클러스터와 동일한 리전에 배치하는 것이 좋습니다.

커넥터의 모든 속성을 올바르게 구성해야 합니다. 여기에는 보조 클러스터를 대상으로 하는 생산자 인증 속성도 포함됩니다. 잠재적인 문제에 관한 자세한 내용은 MirrorMaker 2.0 클라이언트 구성 개선을 참고하세요.

시작하기 전에

MirrorMaker 2.0 커넥터를 만들려면 다음 작업을 완료하세요.

  • Apache Kafka용 관리형 서비스 클러스터 (기본)를 만듭니다. 이 클러스터는 MirrorMaker 2.0 커넥터의 엔드포인트 중 하나로 사용됩니다.

  • 보조 Kafka 클러스터를 만듭니다. 이 클러스터는 다른 엔드포인트 역할을 합니다. 다른 Managed Service for Apache Kafka 클러스터이거나 외부 또는 자체 관리형 Kafka 클러스터일 수 있습니다. 여러 Kafka 클러스터를 Connect 클러스터의 보조 Kafka 클러스터로 구성할 수 있습니다.

  • MirrorMaker 2.0 커넥터를 호스팅하는 Connect 클러스터를 만듭니다.

  • 보조 Kafka 클러스터의 DNS 도메인이 구성되어 있는지 확인합니다.

  • Private Service Connect 인터페이스가 소스 Kafka 클러스터와 타겟 Kafka 클러스터에 모두 도달할 수 있도록 방화벽 규칙을 구성합니다.

  • 인터넷을 통해 소스 또는 타겟 Kafka 클러스터에 액세스하는 경우 Connect 작업자가 인터넷에 액세스할 수 있도록 Cloud NAT를 구성합니다.

  • 보조 클러스터에 외부 또는 자체 관리형 Kafka 클러스터가 포함된 경우 필요한 사용자 인증 정보가 보안 비밀 리소스로 구성되어 있는지 확인합니다.

네트워킹 요구사항에 대한 자세한 내용은 작업자 서브넷을 참고하세요.

필수 역할 및 권한

MirrorMaker 2.0 커넥터를 만드는 데 필요한 권한을 얻으려면 관리자에게 프로젝트에 대한 관리형 Kafka 커넥터 편집자 (roles/managedkafka.connectorEditor) IAM 역할을 부여해 달라고 요청하세요. 역할 부여에 대한 자세한 내용은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.

이 사전 정의된 역할에는 MirrorMaker 2.0 커넥터를 만드는 데 필요한 권한이 포함되어 있습니다. 필요한 정확한 권한을 보려면 필수 권한 섹션을 펼치세요.

필수 권한

MirrorMaker 2.0 커넥터를 만들려면 다음 권한이 필요합니다.

  • 상위 연결 클러스터에 커넥터 생성 권한을 부여합니다. managedkafka.connectors.create

커스텀 역할이나 다른 사전 정의된 역할을 사용하여 이 권한을 부여받을 수도 있습니다.

관리형 Kafka 커넥터 편집자 역할에 대한 자세한 내용은 Apache Kafka용 관리형 서비스 사전 정의된 역할을 참고하세요.

다른 프로젝트에서 MirrorMaker 2.0 커넥터 만들기

기본 Apache Kafka용 관리형 서비스 클러스터가 MirrorMaker 2.0 커넥터를 실행하는 Connect 클러스터와 다른 프로젝트에 있는 경우 다른 프로젝트에서 Connect 클러스터 만들기를 참고하세요.

자체 관리형 보조 Kafka 클러스터에 연결

자체 관리형 보조 Kafka 클러스터에 연결할 때는 네트워킹 및 인증에 유의하세요.

  • 네트워킹: Connect 클러스터 VPC 네트워크와 자체 관리 또는 외부 클러스터를 호스팅하는 네트워크 간의 연결을 허용하도록 적절한 VPC 네트워크 설정과 방화벽 규칙이 구성되어 있는지 확인합니다.

  • VPC 내 클러스터의 경우 VPC 네트워크 만들기 및 관리를 참고하세요.

  • 온프레미스 또는 다른 클라우드 환경에 연결하려면 Cloud VPN 또는 Cloud Interconnect와 같은 솔루션을 고려하세요. 온프레미스 Kafka에 연결에 관한 구체적인 안내도 참고하세요.

  • 인증 및 암호화: Connect 클러스터는 자체 관리형 또는 외부 클러스터 (필요한 경우)로 인증해야 하며 TLS 암호화를 처리해야 합니다. Kafka 인증에 대한 일반적인 정보는 Apache Kafka 보안 문서를 참고하세요.

사용자 인증 정보에 Secret Manager 사용

연결 클러스터는 Secret Manager와 직접 통합됩니다. 비밀번호, 자체 관리 또는 외부 클러스터에 연결하는 데 필요한 신뢰 저장소 및 키 저장소 콘텐츠와 같은 모든 민감한 구성 값을 Secret Manager의 보안 비밀로 저장합니다.

  • Connect 클러스터 서비스 계정에 부여된 보안 비밀은 커넥터의 런타임 환경 내에서 /var/secrets/ 디렉터리 아래에 파일로 자동 마운트됩니다.

  • 파일 이름은 {PROJECT_NAME}-{SECRET_NAME}-{SECRET_VERSION} 패턴을 따릅니다. 프로젝트 번호가 아닌 프로젝트 이름을 사용해야 합니다.

  • 보안 비밀을 참조하는 방법은 Kafka 속성이 보안 비밀 password를 예상하는지 아니면 파일의 path를 예상하는지에 따라 다릅니다.

    • 비밀번호의 경우 Kafka DirectoryConfigProvider 속성을 사용합니다. ${directory:/var/secrets}:{SECRET_FILENAME} 형식으로 값을 지정합니다. 예: password=${directory:/var/secrets}:my-project-db-password-1

    • 파일 경로의 경우 마운트된 보안 비밀 파일의 직접 경로를 지정합니다. 예: ssl.truststore.location=/var/secrets/my-project-kafka-truststore-3

클러스터 생성 중 액세스 권한을 부여하고 보안 비밀을 구성하는 방법에 대한 자세한 내용은 Secret Manager 보안 비밀 구성을 참고하세요.

MirrorMaker 소스 커넥터 작동 방식

MirrorMaker 소스 커넥터는 소스 클러스터의 하나 이상의 Kafka 주제에서 데이터를 가져와 ACL과 함께 대상 클러스터의 주제로 데이터를 복제합니다.

MirrorMaker 소스 커넥터가 데이터를 복제하는 방식에 관한 자세한 내용은 다음을 참고하세요.

  • 커넥터는 소스 클러스터 내의 지정된 Kafka 주제에서 메시지를 소비합니다. 쉼표로 구분된 주제 이름 또는 단일 Java 스타일 정규 표현식을 허용하는 topics 구성 속성을 사용하여 복제할 주제를 지정합니다. 예를 들면 topic-a,topic-b 또는 my-prefix-.*입니다.

  • 커넥터는 topics.exclude 속성을 사용하여 지정한 특정 주제의 복제를 건너뛸 수도 있습니다. 제외가 포함보다 우선합니다.

  • 커넥터는 사용된 메시지를 타겟 클러스터에 씁니다.

  • 커넥터에는 source.cluster.bootstrap.serverstarget.cluster.bootstrap.servers과 같은 소스 및 타겟 클러스터 연결 세부정보가 필요합니다.

  • 커넥터에는 source.cluster.aliastarget.cluster.alias에 지정된 대로 소스 및 타겟 클러스터의 별칭도 필요합니다. 기본적으로 복제된 주제는 소스 별칭을 사용하여 자동으로 이름이 바뀝니다. 예를 들어 별칭이 primary인 소스의 orders라는 주제는 타겟에서 primary.orders가 됩니다.

  • 복제된 주제와 연결된 ACL도 소스에서 타겟 클러스터로 동기화됩니다. sync.topic.acls.enabled 속성을 사용하여 사용 중지할 수 있습니다.

  • 소스 클러스터와 타겟 클러스터 모두에 연결하기 위한 인증 세부정보는 클러스터에서 필요한 경우 구성에 제공해야 합니다. 소스에는 source.cluster., 타겟에는 target.cluster.이 접두사로 지정된 security.protocol, sasl.mechanism, sasl.jaas.config과 같은 속성을 구성해야 합니다.

  • 커넥터는 내부 주제를 사용합니다. offset-syncs.topic.replication.factor와 같은 이러한 속성을 구성해야 할 수 있습니다.

  • 커넥터는 Kafka 레코드 변환기 key.converter, value.converter, header.converter를 사용합니다. 직접 복제의 경우 이러한 값은 변환을 실행하지 않는 (통과) org.apache.kafka.connect.converters.ByteArrayConverter로 기본 설정되는 경우가 많습니다.

  • tasks.max 속성은 커넥터의 동시 로드 수준을 제어합니다. tasks.max를 늘리면 처리량이 향상될 수 있지만 효과적인 병렬 처리는 복제되는 소스 Kafka 주제의 파티션 수에 의해 제한되는 경우가 많습니다.

MirrorMaker 2.0 커넥터의 속성

MirrorMaker 2.0 커넥터를 만들거나 업데이트할 때 다음 속성을 지정합니다.

커넥터 이름

커넥터의 이름 또는 ID입니다. 리소스 이름을 지정하는 방법에 대한 가이드라인은 Apache Kafka용 관리형 서비스 리소스 이름 지정 가이드라인을 참고하세요. 이름은 변경할 수 없습니다.

커넥터 유형

커넥터 유형은 다음 중 하나여야 합니다.

기본 Kafka 클러스터

Managed Service for Apache Kafka 클러스터입니다. 이 필드는 시스템에서 자동으로 채웁니다.

  • 기본 Kafka 클러스터를 대상 클러스터로 사용: 다른 Kafka 클러스터에서 기본 Managed Service for Apache Kafka 클러스터로 데이터를 이동하려면 이 옵션을 선택합니다.

  • 기본 Kafka 클러스터를 소스 클러스터로 사용: 기본 Managed Service for Apache Kafka 클러스터에서 다른 Kafka 클러스터로 데이터를 이동하려면 이 옵션을 선택합니다.

타겟 또는 소스 클러스터

파이프라인의 다른 쪽 끝을 형성하는 보조 Kafka 클러스터입니다.

  • Apache Kafka용 관리형 서비스 클러스터: 드롭다운 메뉴에서 클러스터를 선택합니다.

  • 자체 관리형 또는 외부 Kafka 클러스터: hostname:port_number 형식으로 부트스트랩 주소를 입력합니다. 예를 들면 kafka-test:9092입니다.

주제 이름 또는 정규 표현식

복제할 주제입니다. 개별 이름 (topic1, topic2)을 지정하거나 정규식 (topic.*)을 사용합니다. 이 속성은 MirrorMaker 2.0 소스 커넥터에 필요합니다. 기본값은 .*입니다.

소비자 그룹 이름 또는 정규 표현식

복제할 소비자 그룹입니다. 개별 이름 (group1, group2)을 지정하거나 정규 표현식 (group.*)을 사용합니다. 이 속성은 MirrorMaker 2.0 체크포인트 커넥터에 필요합니다. 기본값은 .*입니다.

구성

이 섹션에서는 MirrorMaker 2.0 커넥터의 커넥터별 추가 구성 속성을 지정할 수 있습니다.

Kafka 주제의 데이터는 Avro, JSON, 원시 바이트 등 다양한 형식일 수 있으므로 구성의 핵심 부분은 변환기를 지정하는 것입니다. 변환기는 Kafka 주제에 사용된 형식의 데이터를 Kafka Connect의 표준화된 내부 형식으로 변환합니다.

Kafka Connect의 변환기 역할, 지원되는 변환기 유형, 일반적인 구성 옵션에 관한 일반적인 내용은 변환기를 참고하세요.

모든 MirrorMaker 2.0 커넥터의 일반적인 구성은 다음과 같습니다.

  • source.cluster.alias: 소스 클러스터의 별칭입니다.

  • target.cluster.alias: 대상 클러스터의 별칭입니다.

데이터를 복제할 때 특정 리소스를 제외하는 데 사용되는 구성:

  • topics.exclude: 제외된 주제입니다. 쉼표로 구분된 주제 이름과 정규식을 지원합니다. 제외가 포함보다 우선합니다. MirrorMaker 2.0 소스 커넥터에 사용됩니다. 기본값은 mm2.*.internal,.*.replica,__.*입니다.

  • groups.exclude: 그룹을 제외합니다. 쉼표로 구분된 그룹 ID와 정규식을 지원합니다. 제외가 포함보다 우선합니다. MirrorMaker 2.0 체크포인트 커넥터에 사용됩니다. 기본값은 console-consumer-.*,connect-.*,__.*입니다.

MirrorMaker 2.0 커넥터에는 인증 구성이 필요합니다.

소스 또는 타겟 Kafka 클러스터가 Apache Kafka용 관리형 서비스 클러스터인 경우 Connect 클러스터는 OAuthBearer를 사용하여 인증합니다. 인증 구성은 사전 구성되어 있으므로 구성을 수동으로 설정할 필요가 없습니다.

자체 관리형 또는 온프레미스 Kafka 클러스터의 경우 인증 구성은 Kafka 클러스터에서 지원하는 인증 메커니즘에 따라 달라집니다. 소스 Kafka 클러스터 구성의 인증 구성 예시는 다음과 같습니다.

source.cluster.security.protocol=SASL_SSL
source.cluster.sasl.mechanism=OAUTHBEARER
source.cluster.sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
source.cluster.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;

타겟 Kafka 클러스터 구성의 인증 구성 예시는 다음과 같습니다.

target.cluster.security.protocol=SASL_SSL
target.cluster.sasl.mechanism=OAUTHBEARER
target.cluster.sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
target.cluster.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;

사용 가능한 구성 속성은 특정 커넥터에 따라 다릅니다. 지원되는 MirrorMaker 2.0 커넥터 버전을 확인하여 지원되는 구성 예시를 확인하세요. 다음 문서를 참고하세요.

Kafka 레코드 변환

Kafka Connect는 키와 값의 기본 변환기로 org.apache.kafka.connect.converters.ByteArrayConverter을 사용하며, 이는 변환을 실행하지 않는 패스스루 옵션을 제공합니다.

다른 변환기를 사용하도록 header.converter, key.converter, value.converter를 구성할 수 있습니다.

태스크 수

tasks.max 값은 Kafka Connect가 MirrorMaker 커넥터를 실행하는 데 사용하는 최대 태스크 수를 구성합니다. 커넥터의 병렬 처리 수준을 제어합니다. 작업 수를 늘리면 처리량이 증가할 수 있지만 Kafka 주제 파티션 수와 같은 요인에 의해 제한됩니다.

MirrorMaker 2.0 소스 커넥터 만들기

커넥터를 만들기 전에 커넥터 속성 문서를 검토하세요.

콘솔

  1. Google Cloud 콘솔에서 클러스터 연결 페이지로 이동합니다.

    클러스터 연결로 이동

  2. 커넥터를 만들려는 Connect 클러스터를 클릭합니다.

    클러스터 연결 세부정보 페이지가 표시됩니다.

  3. 커넥터 만들기를 클릭합니다.

    Kafka 커넥터 만들기 페이지가 표시됩니다.

  4. 커넥터 이름에 문자열을 입력합니다.

    커넥터 이름을 지정하는 방법에 대한 자세한 내용은 Apache Kafka용 관리형 서비스 리소스 이름 지정 가이드라인을 참고하세요.

  5. 커넥터 플러그인에서 'MirrorMaker 2.0 소스'를 선택합니다.

  6. 기본 Kafka 클러스터에서 다음 옵션 중 하나를 선택합니다.

    • 기본 Kafka 클러스터를 소스 클러스터로 사용: Apache Kafka용 관리형 서비스 클러스터에서 데이터를 이동합니다.
    • 기본 Kafka 클러스터를 대상 클러스터로 사용: Apache Kafka용 관리형 서비스 클러스터로 데이터를 이동합니다.
  7. 타겟 클러스터 또는 소스 클러스터에서 다음 옵션 중 하나를 선택합니다.

    • Managed Service for Apache Kafka 클러스터: 메뉴에서 선택합니다.
    • 자체 관리형 또는 외부 Kafka 클러스터: hostname:port_number 형식으로 부트스트랩 주소를 입력합니다.
  8. 쉼표로 구분된 주제 이름 또는 주제 정규식을 입력합니다.

  9. 필수 보안 설정을 비롯한 구성을 검토하고 조정합니다.

    구성 및 인증에 대한 자세한 내용은 구성을 참고하세요.

  10. 작업 재시작 정책을 선택합니다. 자세한 내용은 작업 다시 시작 정책을 참고하세요.

  11. 만들기를 클릭합니다.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. gcloud managed-kafka connectors create 명령어를 실행합니다.

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    다음을 바꿉니다.

    • CONNECTOR_ID: 커넥터의 ID 또는 이름입니다. 커넥터 이름을 지정하는 방법에 대한 가이드라인은 Apache Kafka용 관리형 서비스 리소스 이름 지정 가이드라인을 참고하세요. 커넥터의 이름은 변경할 수 없습니다.

    • LOCATION: 커넥터를 만드는 위치입니다. 이 위치는 Connect 클러스터를 만든 위치와 동일해야 합니다.

    • CONNECT_CLUSTER_ID: 커넥터가 생성된 Connect 클러스터의 ID입니다.

    • CONFIG_FILE: BigQuery 싱크 커넥터의 YAML 구성 파일 경로입니다.

    다음은 MirrorMaker 2.0 소스 커넥터의 구성 파일 예시입니다.

    connector.class: "org.apache.kafka.connect.mirror.MirrorSourceConnector"
    name: "MM2_CONNECTOR_ID"
    source.cluster.alias: "source"
    target.cluster.alias: "target"
    topics: "GMK_TOPIC_NAME"
    source.cluster.bootstrap.servers: "GMK_SOURCE_CLUSTER_DNS"
    target.cluster.bootstrap.servers: "GMK_TARGET_CLUSTER_DNS"
    offset-syncs.topic.replication.factor: "1"
    source.cluster.security.protocol: "SASL_SSL"
    source.cluster.sasl.mechanism: "OAUTHBEARER"
    source.cluster.sasl.login.callback.handler.class: com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
    source.cluster.sasl.jaas.config: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    target.cluster.security.protocol: "SASL_SSL"
    target.cluster.sasl.mechanism: "OAUTHBEARER"
    target.cluster.sasl.login.callback.handler.class: "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"
    target.cluster.sasl.jaas.config: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    
  3. Terraform

    Terraform 리소스를 사용하여 커넥터를 만들 수 있습니다.

    # A single MirrorMaker 2 Source Connector to replicate from one source to one target.
    resource "google_managed_kafka_connector" "default" {
      project         = data.google_project.default.project_id
      connector_id    = "mm2-source-to-target-connector-id"
      connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
      location        = "us-central1"
    
      configs = {
        "connector.class"      = "org.apache.kafka.connect.mirror.MirrorSourceConnector"
        "name"                 = "mm2-source-to-target-connector-id"
        "tasks.max"            = "3"
        "source.cluster.alias" = "source"
        "target.cluster.alias" = "target"
        "topics"               = ".*" # Replicate all topics from the source
        # The value for bootstrap.servers is a comma-separated list of hostname:port pairs
        # for one or more Kafka brokers in the source/target cluster.
        "source.cluster.bootstrap.servers" = "source_cluster_dns"
        "target.cluster.bootstrap.servers" = "target_cluster_dns"
        # You can define an exclusion policy for topics as follows:
        # To exclude internal MirrorMaker 2 topics, internal topics and replicated topics,.
        "topics.exclude" = "mm2.*\\.internal,.*\\.replica,__.*"
      }
    
      provider = google-beta
    }

    Terraform 구성을 적용하거나 삭제하는 방법은 기본 Terraform 명령어를 참조하세요.

    Go

    이 샘플을 사용해 보기 전에 클라이언트 라이브러리 설치의 Go 설정 안내를 따르세요. 자세한 내용은 Apache Kafka용 관리형 서비스 Go API 참조 문서를 참고하세요.

    Managed Service for Apache Kafka에 인증하려면 애플리케이션 기본 사용자 인증 정보(ADC)를 설정합니다. 자세한 내용은 로컬 개발 환경의 ADC 설정을 참고하세요.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    )
    
    // createMirrorMaker2SourceConnector creates a MirrorMaker 2.0 Source connector.
    func createMirrorMaker2SourceConnector(w io.Writer, projectID, region, connectClusterID, connectorID, sourceBootstrapServers, targetBootstrapServers, tasksMax, sourceClusterAlias, targetClusterAlias, topics, topicsExclude string, opts ...option.ClientOption) error {
    	// TODO(developer): Update with your config values. Here is a sample configuration:
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "mm2-source-to-target-connector-id"
    	// sourceBootstrapServers := "source_cluster_dns"
    	// targetBootstrapServers := "target_cluster_dns"
    	// tasksMax := "3"
    	// sourceClusterAlias := "source"
    	// targetClusterAlias := "target"
    	// topics := ".*"
    	// topicsExclude := "mm2.*.internal,.*.replica,__.*"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)
    
    	config := map[string]string{
    		"connector.class":      "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    		"name":                 connectorID,
    		"tasks.max":            tasksMax,
    		"source.cluster.alias": sourceClusterAlias,
    		"target.cluster.alias": targetClusterAlias, // This is usually the primary cluster.
    		// Replicate all topics from the source
    		"topics": topics,
    		// The value for bootstrap.servers is a hostname:port pair for the Kafka broker in
    		// the source/target cluster.
    		// For example: "kafka-broker:9092"
    		"source.cluster.bootstrap.servers": sourceBootstrapServers,
    		"target.cluster.bootstrap.servers": targetBootstrapServers,
    		// You can define an exclusion policy for topics as follows:
    		// To exclude internal MirrorMaker 2 topics, internal topics and replicated topics.
    		// topicsExclude := "mm2.*.internal,.*.replica,__.*"
    		"topics.exclude": topicsExclude,
    	}
    
    	connector := &managedkafkapb.Connector{
    		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
    		Configs: config,
    	}
    
    	req := &managedkafkapb.CreateConnectorRequest{
    		Parent:      parent,
    		ConnectorId: connectorID,
    		Connector:   connector,
    	}
    
    	resp, err := client.CreateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created MirrorMaker 2.0 Source connector: %s\n", resp.Name)
    	return nil
    }
    

    자바

    이 샘플을 시도하기 전에 클라이언트 라이브러리 설치의 Java 설정 안내를 따르세요. 자세한 내용은 Apache Kafka용 관리형 서비스 Java API 참조 문서를 참고하세요.

    Managed Service for Apache Kafka에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 ADC 설정을 참고하세요.

    
    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreateMirrorMaker2SourceConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String maxTasks = "3";
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-mirrormaker2-connector";
        String sourceClusterBootstrapServers = "my-source-cluster:9092";
        String targetClusterBootstrapServers = "my-target-cluster:9092";
        String sourceClusterAlias = "source";
        String targetClusterAlias = "target"; // This is usually the primary cluster.
        String connectorClass = "org.apache.kafka.connect.mirror.MirrorSourceConnector";
        String topics = ".*";
        // You can define an exclusion policy for topics as follows:
        // To exclude internal MirrorMaker 2 topics, internal topics and replicated topics.
        String topicsExclude = "mm2.*.internal,.*.replica,__.*";
        createMirrorMaker2SourceConnector(
            projectId,
            region,
            maxTasks,
            connectClusterId,
            connectorId,
            sourceClusterBootstrapServers,
            targetClusterBootstrapServers,
            sourceClusterAlias,
            targetClusterAlias,
            connectorClass,
            topics,
            topicsExclude);
      }
    
      public static void createMirrorMaker2SourceConnector(
          String projectId,
          String region,
          String maxTasks,
          String connectClusterId,
          String connectorId,
          String sourceClusterBootstrapServers,
          String targetClusterBootstrapServers,
          String sourceClusterAlias,
          String targetClusterAlias,
          String connectorClass,
          String topics,
          String topicsExclude)
          throws Exception {
    
        // Build the connector configuration
        Map<String, String> configMap = new HashMap<>();
        configMap.put("tasks.max", maxTasks);
        configMap.put("connector.class", connectorClass);
        configMap.put("name", connectorId);
        configMap.put("source.cluster.alias", sourceClusterAlias);
        configMap.put("target.cluster.alias", targetClusterAlias);
        configMap.put("topics", topics);
        configMap.put("topics.exclude", topicsExclude);
        configMap.put("source.cluster.bootstrap.servers", sourceClusterBootstrapServers);
        configMap.put("target.cluster.bootstrap.servers", targetClusterBootstrapServers);
    
        Connector connector = Connector.newBuilder()
            .setName(
                ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
            .putAllConfigs(configMap)
            .build();
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
              .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
              .setConnectorId(connectorId)
              .setConnector(connector)
              .build();
    
          // This operation is being handled synchronously.
          Connector response = managedKafkaConnectClient.createConnector(request);
          System.out.printf("Created MirrorMaker2 Source connector: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
        }
      }
    }
    

    Python

    이 샘플을 시도하기 전에 클라이언트 라이브러리 설치의 Python 설정 안내를 따르세요. 자세한 내용은 Apache Kafka용 관리형 서비스 Python API 참조 문서를 참고하세요.

    Managed Service for Apache Kafka에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 ADC 설정을 참고하세요.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest
    
    connect_client = ManagedKafkaConnectClient()
    parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
    
    configs = {
        "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
        "name": connector_id,
        "tasks.max": tasks_max,
        "source.cluster.alias": source_cluster_alias,
        "target.cluster.alias": target_cluster_alias,  # This is usually the primary cluster.
        # Replicate all topics from the source
        "topics": topics,
        # The value for bootstrap.servers is a hostname:port pair for the Kafka broker in
        # the source/target cluster.
        # For example: "kafka-broker:9092"
        "source.cluster.bootstrap.servers": source_bootstrap_servers,
        "target.cluster.bootstrap.servers": target_bootstrap_servers,
        # You can define an exclusion policy for topics as follows:
        # To exclude internal MirrorMaker 2 topics, internal topics and replicated topics.
        "topics.exclude": topics_exclude,
    }
    
    connector = Connector()
    # The name of the connector.
    connector.name = connector_id
    connector.configs = configs
    
    request = CreateConnectorRequest(
        parent=parent,
        connector_id=connector_id,
        connector=connector,
    )
    
    try:
        operation = connect_client.create_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Created Connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")

다음 단계

Apache Kafka®는 미국 및/또는 다른 국가에서 사용되는 Apache Software Foundation 또는 해당 계열사의 등록 상표입니다.