Kafka 명령줄 도구로 메시지 생성 및 소비

Kafka 명령줄 도구를 사용하여 Apache Kafka용 관리형 서비스 클러스터에 연결하고, 메시지를 생성하고, 메시지를 사용하는 방법을 알아봅니다.

시작하기 전에

이 튜토리얼을 시작하기 전에 Apache Kafka용 관리형 서비스 클러스터를 새로 만드세요. 클러스터가 이미 있으면 이 단계를 건너뛸 수 있습니다.

클러스터를 만드는 방법

콘솔

  1. Managed Service for Apache Kafka > 클러스터 페이지로 이동합니다.

    클러스터로 이동

  2. 만들기를 클릭합니다.
  3. 클러스터 이름 체크박스에 클러스터 이름을 입력합니다.
  4. 리전 목록에서 클러스터의 위치를 선택합니다.
  5. 네트워크 구성에서 클러스터에 액세스할 수 있는 서브넷을 구성합니다.
    1. 프로젝트에서 사용자 프로젝트를 선택합니다.
    2. 네트워크에서 VPC 네트워크를 선택합니다.
    3. 서브넷에서 서브넷을 선택합니다.
    4. 완료를 클릭합니다.
  6. 만들기를 클릭합니다.

만들기를 클릭하면 클러스터 상태가 Creating가 됩니다. 클러스터가 준비되면 상태는 Active입니다.

gcloud

Kafka 클러스터를 만들려면 managed-kafka clusters create 명령어를 실행합니다.

gcloud managed-kafka clusters create KAFKA_CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
--async

다음을 바꿉니다.

  • KAFKA_CLUSTER: Kafka 클러스터의 이름
  • REGION: 클러스터의 위치
  • PROJECT_ID: 프로젝트 ID입니다.
  • SUBNET_NAME: 클러스터를 만들려는 서브넷입니다(예: default).

지원되는 위치에 대한 자세한 내용은 Apache Kafka용 관리형 서비스 위치를 참고하세요.

이 명령어는 비동기적으로 실행되고 작업 ID를 반환합니다.

Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.

생성 작업의 진행 상황을 추적하려면 gcloud managed-kafka operations describe 명령어를 사용합니다.

gcloud managed-kafka operations describe OPERATION_ID \
  --location=REGION

클러스터가 준비되면 이 명령어의 출력에 state: ACTIVE 항목이 포함됩니다. 자세한 내용은 클러스터 생성 작업 모니터링을 참고하세요.

필요한 역할

클라이언트 VM을 만들고 구성하는 데 필요한 권한을 얻으려면 관리자에게 프로젝트에 대한 다음 IAM 역할을 부여해 달라고 요청하세요.

역할 부여에 대한 자세한 내용은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.

커스텀 역할이나 다른 사전 정의된 역할을 통해 필요한 권한을 얻을 수도 있습니다.

클라이언트 VM 만들기

Kafka 클러스터에 액세스할 수 있는 Compute Engine의 Linux 가상 머신 (VM) 인스턴스를 만듭니다. VM을 구성할 때 다음 옵션을 설정합니다.

  • Region 사용). Kafka 클러스터와 동일한 리전에 VM을 만듭니다.

  • 서브넷. Kafka 클러스터 구성에서 사용한 서브넷과 동일한 VPC 네트워크에 VM을 만듭니다. 자세한 내용은 클러스터의 서브넷 보기를 참고하세요.

  • 액세스 범위 VM에 https://www.googleapis.com/auth/cloud-platform 액세스 범위를 할당합니다. 이 범위는 VM이 관리형 Kafka API에 요청을 보낼 수 있도록 승인합니다.

다음 단계에서는 이러한 옵션을 설정하는 방법을 보여줍니다.

콘솔

  1. Google Cloud 콘솔에서 인스턴스 만들기 페이지로 이동합니다.

    인스턴스 만들기

  2. 머신 구성 창에서 다음을 수행합니다.

    1. 이름 필드에 인스턴스 이름을 지정합니다. 자세한 내용은 리소스 이름 지정 규칙을 참조하세요.

    2. 리전 목록에서 Kafka 클러스터와 동일한 리전을 선택합니다.

    3. 영역 목록에서 영역을 선택합니다.

  3. 탐색 메뉴에서 네트워킹을 클릭합니다. 네트워킹 창이 표시되면 다음을 수행합니다.

    1. 네트워크 인터페이스 섹션으로 이동합니다.

    2. 기본 네트워크 인터페이스를 펼치려면 화살표를 클릭합니다.

    3. 네트워크 필드에서 VPC 네트워크를 선택합니다.

    4. 서브네트워크 목록에서 서브넷을 선택합니다.

    5. 완료를 클릭합니다.

  4. 탐색 메뉴에서 보안을 클릭합니다. 보안 창이 표시되면 다음을 수행합니다.

    1. 액세스 범위에서 각 API에 액세스 설정을 선택합니다.

    2. 액세스 범위 목록에서 Cloud Platform 드롭다운 목록을 찾아 사용 설정됨을 선택합니다.

  5. 만들기를 클릭하여 VM을 만듭니다.

gcloud

VM 인스턴스를 만들려면 gcloud compute instances create 명령어를 사용합니다.

gcloud compute instances create VM_NAME \
  --scopes=https://www.googleapis.com/auth/cloud-platform \
  --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
  --zone=ZONE

다음을 바꿉니다.

  • VM_NAME: VM의 이름
  • PROJECT_ID: 프로젝트 ID입니다.
  • REGION: Kafka 클러스터를 만든 리전입니다(예: us-central1).
  • SUBNET: 클러스터 구성에서 사용한 서브넷과 동일한 VPC 네트워크의 서브넷
  • ZONE: 클러스터를 만든 리전의 영역(예: us-central1-c)

VM 만들기에 대한 자세한 내용은 특정 서브넷에 VM 인스턴스 만들기를 참고하세요.

IAM 역할 부여

Compute Engine 기본 서비스 계정에 다음 Identity and Access Management (IAM) 역할을 부여합니다.

  • 관리형 Kafka 클라이언트 (roles/managedkafka.client)
  • 서비스 계정 토큰 생성자 (roles/iam.serviceAccountTokenCreator)
  • 서비스 계정 OpenID 토큰 생성자 (roles/iam.serviceAccountOpenIdTokenCreator)

콘솔

  1. Google Cloud 콘솔에서 IAM 페이지로 이동합니다.

    IAM으로 이동

  2. Compute Engine 기본 서비스 계정 행을 찾아 주 구성원 수정을 클릭합니다.

  3. 다른 역할 추가를 클릭하고 관리형 Kafka 클라이언트 역할을 선택합니다. 서비스 계정 토큰 생성자서비스 계정 OpenID 토큰 생성자 역할에 대해 이 단계를 반복합니다.

  4. 저장을 클릭합니다.

gcloud

IAM 역할을 부여하려면 gcloud projects add-iam-policy-binding 명령어를 사용합니다.

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/managedkafka.client

gcloud projects add-iam-policy-binding PROJECT_ID\
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountTokenCreator

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountOpenIdTokenCreator

다음을 바꿉니다.

  • PROJECT_ID: 프로젝트 ID입니다.

  • PROJECT_NUMBER: 프로젝트 번호

프로젝트 번호를 가져오려면 gcloud projects describe 명령어를 실행합니다.

gcloud projects describe PROJECT_ID

자세한 내용은 프로젝트 이름, 번호, ID 찾기를 참고하세요.

VM에 연결

SSH를 사용하여 VM 인스턴스에 연결합니다.

콘솔

  1. VM 인스턴스 페이지로 이동합니다.

    VM 인스턴스로 이동

  2. VM 인스턴스 목록에서 VM 이름을 찾아 SSH를 클릭합니다.

gcloud

VM에 연결하려면 gcloud compute ssh 명령어를 사용합니다.

gcloud compute ssh VM_NAME \
  --project=PROJECT_ID \
  --zone=ZONE

다음을 바꿉니다.

  • VM_NAME: VM의 이름
  • PROJECT_ID: 프로젝트 ID입니다.
  • ZONE: VM을 만든 영역

SSH를 처음 사용하는 경우 추가 구성이 필요할 수 있습니다. 자세한 내용은 SSH 연결 정보를 참고하세요.

Kafka 명령줄 도구 설치

SSH 세션에서 다음 명령어를 실행하여 Kafka 명령줄 도구를 설치합니다.

  1. Kafka 명령줄 도구를 실행하는 데 필요한 Java와 종속 항목을 다운로드하는 데 도움이 되는 wget를 설치합니다. 다음 명령어는 Debian Linux 환경을 사용한다고 가정합니다.

    sudo apt-get install default-jre wget
    
  2. Kafka 명령줄 도구를 설치합니다.

    wget -O kafka_2.13-3.7.2.tgz https://dlcdn.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
    tar xfz kafka_2.13-3.7.2.tgz
    
  3. 다음 환경 변수를 설정합니다.

    export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
    export PATH=$PATH:$KAFKA_HOME/bin
    export CLASSPATH=$CLASSPATH:$KAFKA_HOME/libs/release-and-dependencies/*:$KAFKA_HOME/libs/release-and-dependencies/dependency/*
    

인증 설정

SSH 세션에서 다음 단계를 실행하여 Apache Kafka용 관리형 서비스 인증 라이브러리를 설정합니다.

  1. 라이브러리를 다운로드하고 로컬로 설치합니다.

    wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
    sudo apt-get install unzip
    unzip -n -j release-and-dependencies.zip -d $KAFKA_HOME/libs/
    

    이 명령어는 Kafka 설치 디렉터리의 lib 디렉터리에 라이브러리를 설치합니다. Kafka 명령줄 도구는 이 디렉터리에서 Java 종속 항목을 찾습니다.

  2. 텍스트 편집기를 사용하여 client.properties이라는 파일을 만들고 다음을 붙여넣습니다.

    security.protocol=SASL_SSL
    sasl.mechanism=OAUTHBEARER
    sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    

    파일을 저장합니다. 이 파일은 다음 설정으로 Kafka 클라이언트를 구성합니다.

    • Kafka 클러스터와의 보안 통신에는 SASL_SSL을 사용합니다.

    • 인증에 OAuth 2.0 Bearer 토큰을 사용합니다.

    • 라이브러리에서 제공하는 GcpLoginCallbackHandler 클래스를 로그인 콜백 핸들러로 사용하여 OAuth 2.0 토큰을 가져옵니다.

메시지 생성 및 사용

SSH 세션에서 다음 명령어를 실행하여 Kafka 메시지를 생성하고 소비합니다.

  1. 부트스트랩 주소를 환경 변수로 설정합니다.

    export BOOTSTRAP=bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092
    

    다음을 바꿉니다.

    • CLUSTER_ID: 클러스터 이름입니다.
    • REGION: 클러스터를 만든 위치
    • PROJECT_ID: 프로젝트 ID입니다.

    자세한 내용은 부트스트랩 주소 가져오기를 참고하세요.

  2. 클러스터의 주제를 나열합니다.

    kafka-topics.sh --list \
      --bootstrap-server $BOOTSTRAP \
      --command-config client.properties
    
  3. 주제에 메시지를 작성합니다.

    echo "hello world" | kafka-console-producer.sh \
      --topic KAFKA_TOPIC_NAME \
      --bootstrap-server $BOOTSTRAP \
      --producer.config client.properties
    

    KAFKA_TOPIC_NAME을 주제 이름으로 바꿉니다.

  4. 주제에서 메시지를 사용합니다.

    kafka-console-consumer.sh \
      --topic KAFKA_TOPIC_NAME \
      --from-beginning \
      --bootstrap-server $BOOTSTRAP \
      --consumer.config client.properties
    

    메시지 사용을 중지하려면 Ctrl+C를 입력합니다.

  5. 생산자 성능 테스트를 실행합니다.

    kafka-producer-perf-test.sh \
      --topic KAFKA_TOPIC_NAME \
      --num-records 1000000 --throughput 1000 --print-metrics --record-size 1024 \
      --producer-props bootstrap.servers=$BOOTSTRAP \
      --producer.config client.properties
    

삭제

이 페이지에서 사용한 리소스 비용이 Google Cloud 계정에 청구되지 않도록 하려면 다음 단계를 수행합니다.

콘솔

  1. VM 인스턴스 삭제

    1. VM 인스턴스 페이지로 이동합니다.

      VM 인스턴스로 이동

    2. VM을 선택하고 삭제를 클릭합니다.

  2. Kafka 클러스터를 삭제합니다.

    1. Managed Service for Apache Kafka > 클러스터 페이지로 이동합니다.

      클러스터로 이동

    2. Kafka 클러스터를 선택하고 삭제를 클릭합니다.

gcloud

  1. VM을 삭제하려면 gcloud compute instances delete 명령어를 사용합니다.

    gcloud compute instances delete VM_NAME --zone=ZONE
    
  2. Kafka 클러스터를 삭제하려면 gcloud managed-kafka clusters delete 명령어를 사용합니다.

    gcloud managed-kafka clusters delete CLUSTER_ID \
      --location=REGION --async
    

다음 단계

Apache Kafka®는 미국 및/또는 다른 국가에서 사용되는 Apache Software Foundation 또는 해당 계열사의 등록 상표입니다.