Google Cloud Managed Service for Apache Kafka トピックを作成する

Managed Service for Apache Kafka では、メッセージはトピックに整理されます。トピックはパーティションで構成されます。パーティションは、Kafka クラスタ内の単一のブローカーが所有するレコードの順序付けされた不変のシーケンスです。メッセージをパブリッシュまたはコンシュームするには、トピックを作成する必要があります。

トピックを作成するには、 Google Cloud コンソール、Google Cloud CLI、クライアント ライブラリ、Managed Kafka API、またはオープンソースの Apache Kafka API を使用します。

始める前に

トピックを作成する前に、クラスタを作成する必要があります。次の項目が設定されていることを確認します。

トピックの作成に必要なロールと権限

トピックの作成に必要な権限を取得するには、プロジェクトに対する Managed Kafka トピック編集者 roles/managedkafka.topicEditor)IAM ロールを付与するよう管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。

この事前定義ロールには、トピックの作成に必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。

必要な権限

トピックを作成するには、次の権限が必要です。

  • トピックを作成する: managedkafka.topics.create

カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。

マネージド Kafka トピック編集者のロールには、マネージド Kafka 閲覧者のロールも含まれています。このロールの詳細については、Managed Service for Apache Kafka の事前定義ロールをご覧ください。

Managed Service for Apache Kafka トピックのプロパティ

Managed Service for Apache Kafka トピックを作成または更新するときは、次のプロパティを指定する必要があります。

トピック名

作成する Managed Service for Apache Kafka トピックの名前。トピックの命名方法のガイドラインについては、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。トピックの名前は変更できません。

パーティション数

トピック内のパーティションの数。トピックを編集して、トピックのパーティション数を増やすことはできますが、減らすことはできません。キーを使用するトピックのパーティション数を増やすと、メッセージの配信方法が変わる可能性があります。

レプリケーション係数

各パーティションのレプリカ数。値を指定しない場合、クラスタのデフォルトのレプリケーション係数が使用されます。

レプリケーション係数を大きくすると、データが複数のブローカーに複製されるため、ブローカーの障害が発生した場合のデータ整合性を高めることができます。本番環境では、レプリケーション ファクタを 3 以上にすることをおすすめします。レプリカ数が多いほど、トピックのローカル ストレージとデータ転送の費用が増加します。ただし、永続ストレージの費用は増加しません。レプリケーション係数は、使用可能なブローカーの数を超えることはできません。

その他のパラメータ

他の Apache Kafka トピックレベルの構成パラメータを設定することもできます。これらは、クラスタのデフォルトをオーバーライドする key=value ペアとして指定されます。

トピックに関連する構成には、サーバーのデフォルトと、トピックごとのオーバーライド(省略可)があります。形式は KEY=VALUE ペアのカンマ区切りリストです。ここで、KEY は Kafka トピック構成プロパティの名前、VALUE は必要な設定です。これらの Key-Value ペアは、クラスタのデフォルトをオーバーライドするのに役立ちます。たとえば、flush.ms=10compression.type=producer などがあります。

サポートされているトピックレベルの構成の完全な一覧については、Apache Kafka のドキュメントのトピックレベルの構成をご覧ください。

トピックの作成

トピックを作成する前に、トピックのプロパティを確認してください。

コンソール

  1. Google Cloud コンソールで、[クラスタ] ページに移動します。

    [クラスタ] に移動

  2. トピックを作成するクラスタをクリックします。

    [クラスタの詳細] ページが開きます。

  3. クラスタの詳細ページで、[トピックを作成] をクリックします。

    [Kafka トピックの作成] ページが開きます。

  4. [トピック名] に文字列を入力します。

  5. [パーティション数] に、必要なパーティション数を入力するか、デフォルト値をそのまま使用します。

  6. [レプリケーション係数] に、必要なレプリケーション係数を入力するか、デフォルト値をそのまま使用します。

  7. (省略可)トピック構成を変更するには、[構成] フィールドにカンマ区切りの Key-Value ペアとして追加します。

  8. [作成] をクリックします。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. gcloud managed-kafka topics create コマンドを実行します。

    gcloud managed-kafka topics create TOPIC_ID \
        --cluster=CLUSTER_ID --location=LOCATION_ID \
        --partitions=PARTITIONS \
        --replication-factor=REPLICATION_FACTOR \
        --configs=CONFIGS
    

    次のように置き換えます。

    • TOPIC_ID: トピックの名前。
    • CLUSTER: トピックを作成するクラスタの名前。
    • LOCATION: クラスタのリージョン。
    • PARTITIONS: トピックのパーティション数。
    • REPLICATION_FACTOR: トピックのレプリケーション ファクタ。
    • CONFIGS: トピック レベルの省略可能なパラメータ。カンマ区切りの Key-Value ペアとして指定します。例: compression.type=producer
  3. Kafka CLI

    このコマンドを実行する前に、Compute Engine VM に Kafka コマンドライン ツールをインストールします。VM は、Managed Service for Apache Kafka クラスタに接続されているサブネットに到達できる必要があります。 Kafka コマンドライン ツールを使用してメッセージを生成して使用するの手順に沿って操作します。

    次のように kafka-topics.sh コマンドを実行します。

    kafka-topics.sh --create --if-not-exists \
      --bootstrap-server=BOOTSTRAP_ADDRESS \
      --command-config client.properties \
      --topic TOPIC_ID \
      --partitions PARTITIONS \
      --replication-factor REPLICATION_FACTOR
    

    次のように置き換えます。

    • BOOTSTRAP_ADDRESS: Managed Service for Apache Kafka クラスタのブートストラップ アドレス

    • TOPIC_ID: トピックの名前。

    • PARTITIONS: トピックのパーティション数。

    • REPLICATION_FACTOR: トピックのレプリケーション ファクタ。

    REST

    リクエストのデータを使用する前に、次のように置き換えます。

    • PROJECT_ID: 実際の Google Cloud プロジェクト ID
    • LOCATION: クラスタのロケーション
    • CLUSTER_ID: クラスタの ID
    • TOPIC_ID: トピックの ID
    • PARTITION_COUNT: トピックのパーティション数
    • REPLICATION_FACTOR: 各パーティションのレプリカ数

    HTTP メソッドと URL:

    POST https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics?topicId=TOPIC_ID

    リクエストの本文(JSON):

    {
      "name": "TOPIC_ID",
      "partitionCount": PARTITION_COUNT,
      "replicationFactor": REPLICATION_FACTOR
    }
    

    リクエストを送信するには、次のいずれかのオプションを展開します。

    次のような JSON レスポンスが返されます。

    {
      "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
      "partitionCount": PARTITION_COUNT,
      "replicationFactor": REPLICATION_FACTOR
    }
    

    Terraform

    Terraform リソースを使用してトピックを作成できます。

    resource "google_managed_kafka_topic" "default" {
      project            = data.google_project.default.project_id # Replace this with your project ID in quotes
      topic_id           = "my-topic-id"
      cluster            = google_managed_kafka_cluster.default.cluster_id
      location           = "us-central1"
      partition_count    = 2
      replication_factor = 3
    }

    Terraform 構成を適用または削除する方法については、基本的な Terraform コマンドをご覧ください。

    Go

    このサンプルを試す前に、 クライアント ライブラリをインストールするにある Go の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Go API のリファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報(ADC)を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func createTopic(w io.Writer, projectID, region, clusterID, topicID string, partitionCount, replicationFactor int32, configs map[string]string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// topicID := "my-topic"
    	// partitionCount := 10
    	// replicationFactor := 3
    	// configs := map[string]string{"min.insync.replicas":"1"}
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
    	topicConfig := &managedkafkapb.Topic{
    		Name:              topicPath,
    		PartitionCount:    partitionCount,
    		ReplicationFactor: replicationFactor,
    		Configs:           configs,
    	}
    
    	req := &managedkafkapb.CreateTopicRequest{
    		Parent:  clusterPath,
    		TopicId: topicID,
    		Topic:   topicConfig,
    	}
    	topic, err := client.CreateTopic(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateTopic got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created topic: %s\n", topic.Name)
    	return nil
    }
    

    Java

    このサンプルを試す前に、 クライアント ライブラリをインストールするにある Java の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Java API リファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、 ローカル開発環境の ADC の設定をご覧ください。

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ClusterName;
    import com.google.cloud.managedkafka.v1.CreateTopicRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.Topic;
    import com.google.cloud.managedkafka.v1.TopicName;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreateTopic {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        String topicId = "my-topic";
        int partitionCount = 100;
        int replicationFactor = 3;
        Map<String, String> configs =
            new HashMap<String, String>() {
              {
                put("min.insync.replicas", "2");
              }
            };
        createTopic(projectId, region, clusterId, topicId, partitionCount, replicationFactor, configs);
      }
    
      public static void createTopic(
          String projectId,
          String region,
          String clusterId,
          String topicId,
          int partitionCount,
          int replicationFactor,
          Map<String, String> configs)
          throws Exception {
        Topic topic =
            Topic.newBuilder()
                .setName(TopicName.of(projectId, region, clusterId, topicId).toString())
                .setPartitionCount(partitionCount)
                .setReplicationFactor(replicationFactor)
                .putAllConfigs(configs)
                .build();
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
          CreateTopicRequest request =
              CreateTopicRequest.newBuilder()
                  .setParent(ClusterName.of(projectId, region, clusterId).toString())
                  .setTopicId(topicId)
                  .setTopic(topic)
                  .build();
          // This operation is being handled synchronously.
          Topic response = managedKafkaClient.createTopic(request);
          System.out.printf("Created topic: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaClient.createTopic got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    このサンプルを試す前に、 クライアント ライブラリをインストールするの Python の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Python API リファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。

    from google.api_core.exceptions import AlreadyExists
    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # topic_id = "my-topic"
    # partition_count = 10
    # replication_factor = 3
    # configs = {"min.insync.replicas": "1"}
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    topic = managedkafka_v1.Topic()
    topic.name = client.topic_path(project_id, region, cluster_id, topic_id)
    topic.partition_count = partition_count
    topic.replication_factor = replication_factor
    # For a list of configs, one can check https://kafka.apache.org/documentation/#topicconfigs
    topic.configs = configs
    
    request = managedkafka_v1.CreateTopicRequest(
        parent=client.cluster_path(project_id, region, cluster_id),
        topic_id=topic_id,
        topic=topic,
    )
    
    try:
        response = client.create_topic(request=request)
        print("Created topic:", response.name)
    except AlreadyExists as e:
        print(f"Failed to create topic {topic.name} with error: {e.message}")
    

次のステップ