Managed Service for Apache Kafka では、メッセージはトピックに整理されます。トピックはパーティションで構成されます。パーティションは、Kafka クラスタ内の単一のブローカーが所有するレコードの順序付けされた不変のシーケンスです。メッセージをパブリッシュまたはコンシュームするには、トピックを作成する必要があります。
トピックを作成するには、 Google Cloud コンソール、Google Cloud CLI、クライアント ライブラリ、Managed Kafka API、またはオープンソースの Apache Kafka API を使用します。
始める前に
トピックを作成する前に、クラスタを作成する必要があります。次の項目が設定されていることを確認します。
トピックの作成に必要なロールと権限
トピックの作成に必要な権限を取得するには、プロジェクトに対する Managed Kafka トピック編集者 (roles/managedkafka.topicEditor)IAM ロールを付与するよう管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。
この事前定義ロールには、トピックの作成に必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。
必要な権限
トピックを作成するには、次の権限が必要です。
-
トピックを作成する:
managedkafka.topics.create
カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。
マネージド Kafka トピック編集者のロールには、マネージド Kafka 閲覧者のロールも含まれています。このロールの詳細については、Managed Service for Apache Kafka の事前定義ロールをご覧ください。
Managed Service for Apache Kafka トピックのプロパティ
Managed Service for Apache Kafka トピックを作成または更新するときは、次のプロパティを指定する必要があります。
トピック名
作成する Managed Service for Apache Kafka トピックの名前。トピックの命名方法のガイドラインについては、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。トピックの名前は変更できません。
パーティション数
トピック内のパーティションの数。トピックを編集して、トピックのパーティション数を増やすことはできますが、減らすことはできません。キーを使用するトピックのパーティション数を増やすと、メッセージの配信方法が変わる可能性があります。
レプリケーション係数
各パーティションのレプリカ数。値を指定しない場合、クラスタのデフォルトのレプリケーション係数が使用されます。
レプリケーション係数を大きくすると、データが複数のブローカーに複製されるため、ブローカーの障害が発生した場合のデータ整合性を高めることができます。本番環境では、レプリケーション ファクタを 3 以上にすることをおすすめします。レプリカ数が多いほど、トピックのローカル ストレージとデータ転送の費用が増加します。ただし、永続ストレージの費用は増加しません。レプリケーション係数は、使用可能なブローカーの数を超えることはできません。
その他のパラメータ
他の Apache Kafka トピックレベルの構成パラメータを設定することもできます。これらは、クラスタのデフォルトをオーバーライドする key=value ペアとして指定されます。
トピックに関連する構成には、サーバーのデフォルトと、トピックごとのオーバーライド(省略可)があります。形式は KEY=VALUE ペアのカンマ区切りリストです。ここで、KEY は Kafka トピック構成プロパティの名前、VALUE は必要な設定です。これらの Key-Value ペアは、クラスタのデフォルトをオーバーライドするのに役立ちます。たとえば、flush.ms=10 や compression.type=producer などがあります。
サポートされているトピックレベルの構成の完全な一覧については、Apache Kafka のドキュメントのトピックレベルの構成をご覧ください。
トピックの作成
トピックを作成する前に、トピックのプロパティを確認してください。
コンソール
Google Cloud コンソールで、[クラスタ] ページに移動します。
トピックを作成するクラスタをクリックします。
[クラスタの詳細] ページが開きます。
クラスタの詳細ページで、[トピックを作成] をクリックします。
[Kafka トピックの作成] ページが開きます。
[トピック名] に文字列を入力します。
[パーティション数] に、必要なパーティション数を入力するか、デフォルト値をそのまま使用します。
[レプリケーション係数] に、必要なレプリケーション係数を入力するか、デフォルト値をそのまま使用します。
(省略可)トピック構成を変更するには、[構成] フィールドにカンマ区切りの Key-Value ペアとして追加します。
[作成] をクリックします。
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
gcloud managed-kafka topics createコマンドを実行します。gcloud managed-kafka topics create TOPIC_ID \ --cluster=CLUSTER_ID --location=LOCATION_ID \ --partitions=PARTITIONS \ --replication-factor=REPLICATION_FACTOR \ --configs=CONFIGS次のように置き換えます。
- TOPIC_ID: トピックの名前。
- CLUSTER: トピックを作成するクラスタの名前。
- LOCATION: クラスタのリージョン。
- PARTITIONS: トピックのパーティション数。
- REPLICATION_FACTOR: トピックのレプリケーション ファクタ。
- CONFIGS: トピック レベルの省略可能なパラメータ。カンマ区切りの Key-Value ペアとして指定します。例:
compression.type=producer
BOOTSTRAP_ADDRESS: Managed Service for Apache Kafka クラスタのブートストラップ アドレス。
TOPIC_ID: トピックの名前。
PARTITIONS: トピックのパーティション数。
REPLICATION_FACTOR: トピックのレプリケーション ファクタ。
-
PROJECT_ID: 実際の Google Cloud プロジェクト ID -
LOCATION: クラスタのロケーション -
CLUSTER_ID: クラスタの ID -
TOPIC_ID: トピックの ID -
PARTITION_COUNT: トピックのパーティション数 -
REPLICATION_FACTOR: 各パーティションのレプリカ数
Kafka CLI
このコマンドを実行する前に、Compute Engine VM に Kafka コマンドライン ツールをインストールします。VM は、Managed Service for Apache Kafka クラスタに接続されているサブネットに到達できる必要があります。 Kafka コマンドライン ツールを使用してメッセージを生成して使用するの手順に沿って操作します。
次のように kafka-topics.sh コマンドを実行します。
kafka-topics.sh --create --if-not-exists \
--bootstrap-server=BOOTSTRAP_ADDRESS \
--command-config client.properties \
--topic TOPIC_ID \
--partitions PARTITIONS \
--replication-factor REPLICATION_FACTOR
次のように置き換えます。
REST
リクエストのデータを使用する前に、次のように置き換えます。
HTTP メソッドと URL:
POST https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics?topicId=TOPIC_ID
リクエストの本文(JSON):
{
"name": "TOPIC_ID",
"partitionCount": PARTITION_COUNT,
"replicationFactor": REPLICATION_FACTOR
}
リクエストを送信するには、次のいずれかのオプションを展開します。
次のような JSON レスポンスが返されます。
{
"name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
"partitionCount": PARTITION_COUNT,
"replicationFactor": REPLICATION_FACTOR
}
Terraform
Terraform リソースを使用してトピックを作成できます。
Terraform 構成を適用または削除する方法については、基本的な Terraform コマンドをご覧ください。
Go
このサンプルを試す前に、 クライアント ライブラリをインストールするにある Go の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Go API のリファレンス ドキュメントをご覧ください。
Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報(ADC)を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。
Java
このサンプルを試す前に、 クライアント ライブラリをインストールするにある Java の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Java API リファレンス ドキュメントをご覧ください。
Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、 ローカル開発環境の ADC の設定をご覧ください。
Python
このサンプルを試す前に、 クライアント ライブラリをインストールするの Python の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Python API リファレンス ドキュメントをご覧ください。
Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。