Apache Kafka용 Google Cloud 관리형 서비스 주제 만들기

Managed Service for Apache Kafka에서 메시지는 주제로 구성됩니다. 주제는 파티션으로 구성됩니다. 파티션은 Kafka 클러스터 내에서 단일 브로커가 소유하는 순서가 지정되고 변경 불가능한 레코드 시퀀스입니다. 메시지를 게시하거나 사용하려면 주제를 만들어야 합니다.

주제를 만들려면 Google Cloud 콘솔, Google Cloud CLI, 클라이언트 라이브러리, 관리형 Kafka API 또는 오픈소스 Apache Kafka API를 사용하면 됩니다.

시작하기 전에

주제를 만들기 전에 먼저 클러스터를 만들어야 합니다. 다음을 설정했는지 확인합니다.

주제를 만드는 데 필요한 역할 및 권한

주제를 만드는 데 필요한 권한을 얻으려면 관리자에게 프로젝트에 대한 관리형 Kafka 주제 편집자 (roles/managedkafka.topicEditor) IAM 역할을 부여해 달라고 요청하세요. 역할 부여에 대한 자세한 내용은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.

이 사전 정의된 역할에는 주제를 만드는 데 필요한 권한이 포함되어 있습니다. 필요한 정확한 권한을 보려면 필수 권한 섹션을 펼치세요.

필수 권한

주제를 만들려면 다음 권한이 필요합니다.

  • 주제 만들기: managedkafka.topics.create

커스텀 역할이나 다른 사전 정의된 역할을 사용하여 이 권한을 부여받을 수도 있습니다.

관리형 Kafka 주제 편집자 역할에는 관리형 Kafka 뷰어 역할도 포함됩니다. 이 역할에 대한 자세한 내용은 Apache Kafka용 관리형 서비스 사전 정의된 역할을 참고하세요.

Apache Kafka용 관리형 서비스 주제의 속성

Apache Kafka용 관리형 서비스 주제를 만들거나 업데이트할 때는 다음 속성을 지정해야 합니다.

주제 이름

생성 중인 Apache Kafka용 관리형 서비스 주제의 이름입니다. 주제 이름을 지정하는 방법에 대한 가이드라인은 Apache Kafka용 관리형 서비스 리소스 이름 지정 가이드라인을 참고하세요. 주제 이름은 변경할 수 없습니다.

파티션 수

주제의 파티션 수입니다. 주제를 수정하여 주제의 파티션 수를 늘릴 수는 있지만 줄일 수는 없습니다. 키를 사용하는 주제의 파티션 수를 늘리면 메시지 배포 방식이 변경될 수 있습니다.

복제 인수

각 파티션의 복제본 수입니다. 값을 지정하지 않으면 클러스터의 기본 복제 요소가 사용됩니다.

데이터가 여러 브로커에 복제되므로 복제 요소가 높을수록 브로커 오류 발생 시 데이터 일관성이 향상될 수 있습니다. 프로덕션 환경에서는 복제 요소가 3 이상인 것이 좋습니다. 복제본 수가 많을수록 주제의 로컬 스토리지 및 데이터 전송 비용이 증가합니다. 하지만 영구 스토리지 비용은 증가하지 않습니다. 복제 요소는 사용 가능한 브로커 수를 초과할 수 없습니다.

기타 매개변수

다른 Apache Kafka 주제 수준 구성 매개변수를 설정할 수도 있습니다. 이는 클러스터 기본값을 재정의하는 key=value 쌍으로 지정됩니다.

주제와 관련된 구성에는 서버 기본값과 주제별 재정의(선택사항)가 있습니다. 형식은 KEY=VALUE 쌍의 쉼표로 구분된 목록입니다. 여기서 KEY은 Kafka 주제 구성 속성의 이름이고 VALUE은 필수 설정입니다. 이러한 키-값 쌍은 클러스터 기본값을 재정의하는 데 도움이 됩니다. 예로는 flush.ms=10compression.type=producer이 있습니다.

지원되는 모든 주제 수준 구성의 목록은 Apache Kafka 문서의 주제 수준 구성을 참고하세요.

주제 만들기

주제를 만들기 전에 주제 속성을 검토하세요.

콘솔

  1. Google Cloud 콘솔에서 클러스터 페이지로 이동합니다.

    클러스터로 이동

  2. 주제를 만들 클러스터를 클릭합니다.

    클러스터 세부정보 페이지가 열립니다.

  3. 클러스터 세부정보 페이지에서 주제 만들기를 클릭합니다.

    Kafka 주제 만들기 페이지가 열립니다.

  4. 주제 이름에 문자열을 입력합니다.

  5. 파티션 수에 원하는 파티션 수를 입력하거나 기본값을 유지합니다.

  6. 복제 인수에 원하는 복제 인수를 입력하거나 기본값을 유지합니다.

  7. (선택사항) 주제 구성을 변경하려면 구성 필드에 쉼표로 구분된 키-값 쌍으로 추가합니다.

  8. 만들기를 클릭합니다.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. gcloud managed-kafka topics create 명령어를 실행합니다.

    gcloud managed-kafka topics create TOPIC_ID \
        --cluster=CLUSTER_ID --location=LOCATION_ID \
        --partitions=PARTITIONS \
        --replication-factor=REPLICATION_FACTOR \
        --configs=CONFIGS
    

    다음을 바꿉니다.

    • TOPIC_ID: 주제의 이름입니다.
    • CLUSTER: 주제를 만들려는 클러스터의 이름입니다.
    • LOCATION: 클러스터의 리전
    • PARTITIONS: 주제의 파티션 수입니다.
    • REPLICATION_FACTOR: 주제의 복제 요소입니다.
    • CONFIGS: 주제 수준 선택적 매개변수입니다. 쉼표로 구분된 키-값 쌍으로 지정합니다. 예를 들면 compression.type=producer입니다.
  3. Kafka CLI

    이 명령어를 실행하기 전에 Compute Engine VM에 Kafka 명령줄 도구를 설치하세요. VM은 Apache Kafka용 관리형 서비스 클러스터에 연결된 서브넷에 도달할 수 있어야 합니다. Kafka 명령줄 도구로 메시지 생성 및 사용의 안내를 따릅니다.

    다음과 같이 kafka-topics.sh 명령어를 실행합니다.

    kafka-topics.sh --create --if-not-exists \
      --bootstrap-server=BOOTSTRAP_ADDRESS \
      --command-config client.properties \
      --topic TOPIC_ID \
      --partitions PARTITIONS \
      --replication-factor REPLICATION_FACTOR
    

    다음을 바꿉니다.

    • BOOTSTRAP_ADDRESS: Managed Service for Apache Kafka 클러스터의 부트스트랩 주소입니다.

    • TOPIC_ID: 주제의 이름입니다.

    • PARTITIONS: 주제의 파티션 수입니다.

    • REPLICATION_FACTOR: 주제의 복제 요소입니다.

    REST

    요청 데이터를 사용하기 전에 다음을 바꿉니다.

    • PROJECT_ID: Google Cloud 프로젝트 ID
    • LOCATION: 클러스터의 위치
    • CLUSTER_ID: 클러스터의 ID
    • TOPIC_ID: 주제의 ID
    • PARTITION_COUNT: 주제의 파티션 수
    • REPLICATION_FACTOR: 각 파티션의 복제본 수

    HTTP 메서드 및 URL:

    POST https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics?topicId=TOPIC_ID

    JSON 요청 본문:

    {
      "name": "TOPIC_ID",
      "partitionCount": PARTITION_COUNT,
      "replicationFactor": REPLICATION_FACTOR
    }
    

    요청을 보내려면 다음 옵션 중 하나를 펼칩니다.

    다음과 비슷한 JSON 응답이 표시됩니다.

    {
      "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
      "partitionCount": PARTITION_COUNT,
      "replicationFactor": REPLICATION_FACTOR
    }
    

    Terraform

    Terraform 리소스를 사용하여 주제를 만들 수 있습니다.

    resource "google_managed_kafka_topic" "default" {
      project            = data.google_project.default.project_id # Replace this with your project ID in quotes
      topic_id           = "my-topic-id"
      cluster            = google_managed_kafka_cluster.default.cluster_id
      location           = "us-central1"
      partition_count    = 2
      replication_factor = 3
    }

    Terraform 구성을 적용하거나 삭제하는 방법은 기본 Terraform 명령어를 참조하세요.

    Go

    이 샘플을 사용해 보기 전에 클라이언트 라이브러리 설치의 Go 설정 안내를 따르세요. 자세한 내용은 Apache Kafka용 관리형 서비스 Go API 참조 문서를 참고하세요.

    Managed Service for Apache Kafka에 인증하려면 애플리케이션 기본 사용자 인증 정보(ADC)를 설정합니다. 자세한 내용은 로컬 개발 환경의 ADC 설정을 참고하세요.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func createTopic(w io.Writer, projectID, region, clusterID, topicID string, partitionCount, replicationFactor int32, configs map[string]string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// topicID := "my-topic"
    	// partitionCount := 10
    	// replicationFactor := 3
    	// configs := map[string]string{"min.insync.replicas":"1"}
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
    	topicConfig := &managedkafkapb.Topic{
    		Name:              topicPath,
    		PartitionCount:    partitionCount,
    		ReplicationFactor: replicationFactor,
    		Configs:           configs,
    	}
    
    	req := &managedkafkapb.CreateTopicRequest{
    		Parent:  clusterPath,
    		TopicId: topicID,
    		Topic:   topicConfig,
    	}
    	topic, err := client.CreateTopic(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateTopic got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created topic: %s\n", topic.Name)
    	return nil
    }
    

    자바

    이 샘플을 시도하기 전에 클라이언트 라이브러리 설치의 Java 설정 안내를 따르세요. 자세한 내용은 Apache Kafka용 관리형 서비스 Java API 참조 문서를 참고하세요.

    Managed Service for Apache Kafka에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 ADC 설정을 참고하세요.

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ClusterName;
    import com.google.cloud.managedkafka.v1.CreateTopicRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.Topic;
    import com.google.cloud.managedkafka.v1.TopicName;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreateTopic {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        String topicId = "my-topic";
        int partitionCount = 100;
        int replicationFactor = 3;
        Map<String, String> configs =
            new HashMap<String, String>() {
              {
                put("min.insync.replicas", "2");
              }
            };
        createTopic(projectId, region, clusterId, topicId, partitionCount, replicationFactor, configs);
      }
    
      public static void createTopic(
          String projectId,
          String region,
          String clusterId,
          String topicId,
          int partitionCount,
          int replicationFactor,
          Map<String, String> configs)
          throws Exception {
        Topic topic =
            Topic.newBuilder()
                .setName(TopicName.of(projectId, region, clusterId, topicId).toString())
                .setPartitionCount(partitionCount)
                .setReplicationFactor(replicationFactor)
                .putAllConfigs(configs)
                .build();
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
          CreateTopicRequest request =
              CreateTopicRequest.newBuilder()
                  .setParent(ClusterName.of(projectId, region, clusterId).toString())
                  .setTopicId(topicId)
                  .setTopic(topic)
                  .build();
          // This operation is being handled synchronously.
          Topic response = managedKafkaClient.createTopic(request);
          System.out.printf("Created topic: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaClient.createTopic got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    이 샘플을 시도하기 전에 클라이언트 라이브러리 설치의 Python 설정 안내를 따르세요. 자세한 내용은 Apache Kafka용 관리형 서비스 Python API 참조 문서를 참고하세요.

    Managed Service for Apache Kafka에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 ADC 설정을 참고하세요.

    from google.api_core.exceptions import AlreadyExists
    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # topic_id = "my-topic"
    # partition_count = 10
    # replication_factor = 3
    # configs = {"min.insync.replicas": "1"}
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    topic = managedkafka_v1.Topic()
    topic.name = client.topic_path(project_id, region, cluster_id, topic_id)
    topic.partition_count = partition_count
    topic.replication_factor = replication_factor
    # For a list of configs, one can check https://kafka.apache.org/documentation/#topicconfigs
    topic.configs = configs
    
    request = managedkafka_v1.CreateTopicRequest(
        parent=client.cluster_path(project_id, region, cluster_id),
        topic_id=topic_id,
        topic=topic,
    )
    
    try:
        response = client.create_topic(request=request)
        print("Created topic:", response.name)
    except AlreadyExists as e:
        print(f"Failed to create topic {topic.name} with error: {e.message}")
    

다음 단계