Pub/Sub Sink コネクタは、Kafka トピックから Pub/Sub トピックにメッセージをストリーミングします。これにより、Kafka ベースのアプリケーションを Pub/Sub と統合し、イベント ドリブン アーキテクチャとリアルタイム データ処理を容易に実現できます。
始める前に
Pub/Sub Sink コネクタを作成する前に、次のことを確認してください。
Connect クラスタの Managed Service for Apache Kafka クラスタを作成します。これは、Connect クラスタに関連付けられているプライマリ Kafka クラスタです。これは、コネクタ パイプラインの一端を形成するソース クラスタでもあります。
Pub/Sub シンクコネクタをホストする Connect クラスタを作成します。
ソース クラスタ内に Kafka トピックを作成して構成します。データは、この Kafka トピックから宛先 Pub/Sub トピックに移動します。
必要なロールと権限
Pub/Sub シンク コネクタの作成に必要な権限を取得するには、Connect クラスタを含むプロジェクトに対する次の IAM ロールを付与するよう管理者に依頼してください。
-
マネージド Kafka コネクタ編集者 (
roles/managedkafka.connectorEditor) -
Pub/Sub:
Pub/Sub パブリッシャー (
roles/pubsub.publisher)
ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。
これらの事前定義ロールには、Pub/Sub Sink コネクタの作成に必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。
必要な権限
Pub/Sub シンク コネクタを作成するには、次の権限が必要です。
-
親 Connect クラスタでコネクタの作成権限を付与します。
managedkafka.connectors.create
カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。
Managed Kafka Connector 編集者ロールの詳細については、Managed Service for Apache Kafka の事前定義ロールをご覧ください。
Managed Service for Apache Kafka クラスタが Connect クラスタと同じプロジェクトにある場合、追加の権限は必要ありません。Connect クラスタが別のプロジェクトにある場合は、別のプロジェクトに Connect クラスタを作成するをご覧ください。
Pub/Sub トピックにパブリッシュする権限を付与する
Connect クラスタ サービス アカウント(形式は service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com)には、Pub/Sub トピックにメッセージをパブリッシュする権限が必要です。これを行うには、Pub/Sub トピックを含むプロジェクトの Connect クラスタ サービス アカウントに Pub/Sub パブリッシャー ロール(roles/pubsub.publisher)を付与します。
Pub/Sub シンクコネクタの仕組み
Pub/Sub シンクコネクタは、1 つ以上の Kafka トピックからメッセージを pull し、Pub/Sub トピックにパブリッシュします。
Pub/Sub シンクコネクタがデータをコピーする仕組みの詳細を以下に示します。
コネクタは、ソースクラスタ内の 1 つ以上の Kafka トピックからメッセージを消費します。
コネクタは、
cps.topic構成プロパティを使用して指定されたターゲット Pub/Sub トピック ID にメッセージを書き込みます。これは必須プロパティです。また、コネクタでは、
cps.project構成プロパティを使用して、Pub/Sub トピックを含む Google Cloud プロジェクトを指定する必要があります。これは必須プロパティです。コネクタは、
cps.endpointプロパティを使用して指定されたカスタム Pub/Sub エンドポイントを必要に応じて使用することもできます。デフォルトのエンドポイントは"pubsub.googleapis.com:443"です。パフォーマンスを最適化するため、コネクタはメッセージをバッファリングしてから Pub/Sub に公開します。
maxBufferSize、maxBufferBytes、maxDelayThresholdMs、maxOutstandingRequestBytes、maxOutstandingMessagesを構成して、バッファリングを制御できます。Kafka レコードには、ヘッダー、キー、値の 3 つのコンポーネントがあります。コネクタは、キーと値のコンバータを使用して、Kafka メッセージ データを Pub/Sub で想定される形式に変換します。構造体またはマップ値スキーマを使用する場合、
messageBodyNameプロパティは、Pub/Sub メッセージ本文として使用するフィールドまたはキーを指定します。コネクタは、
metadata.publishプロパティをtrueに設定することで、Kafka トピック、パーティション、オフセット、タイムスタンプをメッセージ属性として含めることができます。コネクタは、
headers.publishプロパティをtrueに設定することで、Kafka メッセージ ヘッダーを Pub/Sub メッセージ属性として含めることができます。コネクタは、
orderingKeySourceプロパティを使用して Pub/Sub メッセージの順序指定キーを含めることができます。値のオプションには、"none"(デフォルト)、"key"、"partition"があります。tasks.maxプロパティは、コネクタの並列処理のレベルを制御します。tasks.maxを増やすとスループットを改善できますが、実際の並列処理は Kafka トピックのパーティション数によって制限されます。
Pub/Sub シンクコネクタのプロパティ
Pub/Sub シンクコネクタを作成するときは、次のプロパティを指定する必要があります。
コネクタ名
Connect クラスタ内のコネクタの一意の名前。リソースの命名ガイドラインについては、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。
コネクタ プラグインのタイプ
コネクタ プラグインのタイプとして [Pub/Sub シンク] を選択します。これにより、Kafka から Pub/Sub へのデータフローの方向と、使用される特定のコネクタ実装が決まります。ユーザー インターフェースを使用してコネクタを構成しない場合は、コネクタ クラスも指定する必要があります。
Kafka トピック
コネクタがメッセージを消費する Kafka トピック。1 つ以上のトピックを指定することも、正規表現を使用して複数のトピックを照合することもできます。たとえば、topic.* は「topic」で始まるすべてのトピックに一致します。これらのトピックは、Connect クラスタに関連付けられている Managed Service for Apache Kafka クラスタ内に存在する必要があります。
Pub/Sub トピック
コネクタがメッセージをパブリッシュする既存の Pub/Sub トピック。始める前にで説明したように、Connect クラスタのサービス アカウントにトピックのプロジェクトに対する roles/pubsub.publisher ロールがあることを確認します。
構成
このセクションでは、コネクタ固有の追加の構成プロパティを指定できます。
Kafka トピックのデータは、Avro、JSON、未加工のバイトなど、さまざまな形式で指定できるため、構成の重要な部分としてコンバータの指定があります。コンバータは、Kafka トピックで使用される形式のデータを Kafka Connect の標準化された内部形式に変換します。Pub/Sub シンク コネクタは、この内部データを取得し、Pub/Sub で必要な形式に変換してから書き込みます。
Kafka Connect のコンバータの役割、サポートされているコンバータのタイプ、一般的な構成オプションの詳細については、コンバータをご覧ください。
Pub/Sub シンクコネクタに固有の構成を次に示します。
cps.project: Pub/Sub トピックを含む Google Cloud プロジェクト ID を指定します。cps.topic: データの公開先の Pub/Sub トピックを指定します。cps.endpoint: 使用する Pub/Sub エンドポイントを指定します。
このコネクタに固有の使用可能な構成プロパティのリストについては、Pub/Sub シンク コネクタの構成をご覧ください。
Pub/Sub シンクコネクタを作成する
コネクタを作成する前に、Pub/Sub シンクコネクタのプロパティのドキュメントを確認してください。
コンソール
Google Cloud コンソールで、[クラスタを接続] ページに移動します。
コネクタを作成する Connect クラスタをクリックします。
[クラスタの詳細を接続] ページが表示されます。
[コネクタを作成] をクリックします。
[Kafka コネクタの作成] ページが表示されます。
コネクタ名には文字列を入力します。
コネクタの命名方法のガイドラインについては、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。
[コネクタ プラグイン] で、[Pub/Sub シンク] を選択します。
[トピック] で、[Kafka トピックのリストを選択する] または [トピックの正規表現を使用する] を選択します。次に、このコネクタがメッセージを消費する Kafka トピックを選択または入力します。これらのトピックは、関連付けられた Kafka クラスタにあります。
[Cloud Pub/Sub トピックを選択してください] で、このコネクタがメッセージをパブリッシュする Pub/Sub トピックを選択します。トピックは、完全なリソース名形式(
projects/{project}/topics/{topic})で表示されます。(省略可)[構成] セクションで追加の設定を行います。前のセクションで説明したように、
tasks.max、key.converter、value.converterなどのプロパティを指定します。[タスクの再起動ポリシー] を選択します。詳細については、タスクの再起動ポリシーをご覧ください。
[作成] をクリックします。
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
gcloud managed-kafka connectors createコマンドを実行します。gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILE次のように置き換えます。
CONNECTOR_ID: コネクタの ID または名前。コネクタの命名方法のガイドラインについては、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。コネクタの名前は変更できません。
LOCATION: コネクタを作成するロケーション。これは、Connect クラスタを作成したロケーションと同じである必要があります。
CONNECT_CLUSTER_ID: コネクタが作成される Connect クラスタの ID。
CONFIG_FILE: BigQuery Sink コネクタの YAML 構成ファイルへのパス。
Pub/Sub Sink コネクタの構成ファイルの例を次に示します。
connector.class: "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector" name: "CPS_SINK_CONNECTOR_ID" tasks.max: "1" topics: "GMK_TOPIC_ID" value.converter: "org.apache.kafka.connect.storage.StringConverter" key.converter: "org.apache.kafka.connect.storage.StringConverter" cps.topic: "CPS_TOPIC_ID" cps.project: "GCP_PROJECT_ID"次のように置き換えます。
CPS_SINK_CONNECTOR_ID: Pub/Sub Sink コネクタの ID または名前。コネクタの命名方法のガイドラインについては、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。コネクタの名前は変更できません。
GMK_TOPIC_ID: Pub/Sub Sink コネクタがデータを読み取る Managed Service for Apache Kafka トピックの ID。
CPS_TOPIC_ID: データの公開先の Pub/Sub トピックの ID。
GCP_PROJECT_ID: Pub/Sub トピックが存在する Google Cloudプロジェクトの ID。
Terraform
Terraform リソースを使用してコネクタを作成できます。
Terraform 構成を適用または削除する方法については、基本的な Terraform コマンドをご覧ください。
Go
このサンプルを試す前に、 クライアント ライブラリをインストールするにある Go の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Go API のリファレンス ドキュメントをご覧ください。
Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報(ADC)を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。
Java
このサンプルを試す前に、 クライアント ライブラリをインストールするにある Java の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Java API リファレンス ドキュメントをご覧ください。
Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、 ローカル開発環境の ADC の設定をご覧ください。
Python
このサンプルを試す前に、 クライアント ライブラリをインストールするの Python の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Python API リファレンス ドキュメントをご覧ください。
Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。
コネクタを作成した後は、コネクタの編集、削除、一時停止、停止、再起動を行うことができます。