Pub/Sub ソースコネクタは、Pub/Sub から Kafka にメッセージをストリーミングします。これにより、Pub/Sub を Kafka ベースのアプリケーションやデータ パイプラインと統合できます。
コネクタは、Pub/Sub サブスクリプションからメッセージを読み取り、各メッセージを Kafka レコードに変換して、Kafka トピックにレコードを書き込みます。デフォルトでは、コネクタは次のように Kafka レコードを作成します。
- Kafka レコードキーは
nullです。 - Kafka レコード値は、Pub/Sub メッセージ データ(バイト単位)です。
- Kafka レコード ヘッダーが空です。
ただし、この動作は構成できます。詳細については、コネクタを構成するをご覧ください。
始める前に
Pub/Sub ソースコネクタを作成する前に、次のことを確認してください。
サブスクリプションを含む Pub/Sub トピック。
Kafka クラスタ内の Kafka トピック。
Connect クラスタ。Connect クラスタを作成するときに、Managed Service for Apache Kafka クラスタをプライマリ Kafka クラスタとして設定します。
必要なロールと権限
Pub/Sub ソース コネクタの作成に必要な権限を取得するには、Connect クラスタを含むプロジェクトに対する Managed Kafka Connector 編集者 (roles/managedkafka.connectorEditor)IAM ロールの付与を管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。
この事前定義ロールには、Pub/Sub Source コネクタの作成に必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。
必要な権限
Pub/Sub ソース コネクタを作成するには、次の権限が必要です。
-
親 Connect クラスタでコネクタの作成権限を付与します。
managedkafka.connectors.create
カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。
Managed Kafka コネクタ編集者ロールの詳細については、Managed Service for Apache Kafka の事前定義ロールをご覧ください。
Managed Service for Apache Kafka クラスタが Connect クラスタと同じプロジェクトにある場合、追加の権限は必要ありません。Connect クラスタが別のプロジェクトにある場合は、別のプロジェクトに Connect クラスタを作成するをご覧ください。
Pub/Sub から読み取る権限を付与する
マネージド Kafka サービス アカウントには、Pub/Sub サブスクリプションからメッセージを読み取る権限が必要です。Pub/Sub サブスクリプションを含むプロジェクトのサービス アカウントに次の IAM ロールを付与します。
- Pub/Sub サブスクライバー(
roles/pubsub.subscriber) - Pub/Sub 閲覧者(
roles/pubsub.viewer)
マネージド Kafka サービス アカウントの形式は service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com です。PROJECT_NUMBER は、プロジェクト番号に置き換えます。
Pub/Sub ソースコネクタを作成する
コンソール
Google Cloud コンソールで、[クラスタを接続] ページに移動します。
コネクタを作成する Connect クラスタをクリックします。
[コネクタを作成] をクリックします。
コネクタ名には文字列を入力します。
コネクタの命名方法のガイドラインについては、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。
[コネクタ プラグイン] で、[Pub/Sub ソース] を選択します。
[Cloud Pub/Sub サブスクリプション] リストで、Pub/Sub サブスクリプションを選択します。コネクタはこのサブスクリプションからメッセージを pull します。サブスクリプションは、
projects/{project}/subscriptions/{subscription}のように完全なリソース名で表示されます。[Kafka トピック] リストで、メッセージが書き込まれる Kafka トピックを選択します。
省略可: [構成] ボックスで、構成プロパティを追加するか、デフォルトのプロパティを編集します。詳細については、コネクタを構成するをご覧ください。
[タスクの再起動ポリシー] を選択します。詳細については、タスクの再起動ポリシーをご覧ください。
[作成] をクリックします。
gcloud
gcloud managed-kafka connectors createコマンドを実行します。gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILE次のように置き換えます。
CONNECTOR_ID: コネクタの ID または名前。コネクタの命名方法のガイドラインについては、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。コネクタの名前は変更できません。
LOCATION: Connect クラスタのロケーション。
CONNECT_CLUSTER_ID: コネクタが作成される Connect クラスタの ID。
CONFIG_FILE: YAML または JSON 構成ファイルへのパス。
構成ファイルの例を次に示します。
connector.class: "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
cps.project: "PROJECT_ID"
cps.subscription: "PUBSUB_SUBSCRIPTION_ID"
kafka.topic: "KAFKA_TOPIC_ID"
value.converter: "org.apache.kafka.connect.converters.ByteArrayConverter"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
tasks.max: "3"
次のように置き換えます。
PROJECT_ID: Pub/Sub サブスクリプションが存在する Google Cloudプロジェクトの ID。
PUBSUB_SUBSCRIPTION_ID: データの pull 元となる Pub/Sub サブスクリプションの ID。
KAFKA_TOPIC_ID: データが書き込まれる Kafka トピックの ID。
cps.project、cps.subscription、kafka.topic 構成プロパティは必須です。その他の構成オプションについては、コネクタを構成するをご覧ください。
Terraform
Terraform リソースを使用してコネクタを作成できます。
Terraform 構成を適用または削除する方法については、基本的な Terraform コマンドをご覧ください。
Go
このサンプルを試す前に、 クライアント ライブラリをインストールするにある Go の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Go API のリファレンス ドキュメントをご覧ください。
Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報(ADC)を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。
Java
このサンプルを試す前に、 クライアント ライブラリをインストールするにある Java の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Java API リファレンス ドキュメントをご覧ください。
Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、 ローカル開発環境の ADC の設定をご覧ください。
Python
このサンプルを試す前に、 クライアント ライブラリをインストールするの Python の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Python API リファレンス ドキュメントをご覧ください。
Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。
コネクタを作成した後は、コネクタの編集、削除、一時停止、停止、再起動を行うことができます。
コネクタを構成する
このセクションでは、コネクタで設定できる構成プロパティについて説明します。
このコネクタに固有のプロパティの一覧については、Pub/Sub ソースコネクタの構成をご覧ください。
プルモード
pull モードでは、コネクタが Pub/Sub メッセージを取得する方法を指定します。次のモードがサポートされています。
プルモード(デフォルト)。メッセージはバッチで取得されます。このモードを有効にするには、
cps.streamingPull.enabled=false.を設定します。バッチサイズを構成するには、cps.maxBatchSizeプロパティを設定します。プルモードの詳細については、Pull API をご覧ください。
ストリーミング プルモード。Pub/Sub からメッセージを取得する際に、最大スループットと最小レイテンシを実現します。このモードを有効にするには、
cps.streamingPull.enabled=trueを設定します。ストリーミング プルモードの詳細については、StreamingPull API をご覧ください。
ストリーミング プルが有効になっている場合は、次の構成プロパティを設定してパフォーマンスを調整できます。
cps.streamingPull.flowControlBytes: タスクあたりの未処理メッセージ バイトの最大数。cps.streamingPull.flowControlMessages: タスクあたりの未処理メッセージの最大数。cps.streamingPull.maxAckExtensionMs: コネクタがサブスクライブの期限を延長する最大時間(ミリ秒単位)。cps.streamingPull.maxMsPerAckExtension: コネクタが拡張ごとにサブスクライブの期限を延長する最大時間(ミリ秒単位)。cps.streamingPull.parallelStreams: サブスクリプションからメッセージを pull するストリームの数。
Pub/Sub エンドポイント
デフォルトでは、コネクタはグローバル Pub/Sub エンドポイントを使用します。エンドポイントを指定するには、cps.endpoint プロパティをエンドポイント アドレスに設定します。エンドポイントの詳細については、Pub/Sub エンドポイントをご覧ください。
Kafka レコード
Pub/Sub ソースコネクタは、Pub/Sub メッセージを Kafka レコードに変換します。以降のセクションでは、変換プロセスについて説明します。
レコードキー
キー コンバータは org.apache.kafka.connect.storage.StringConverter である必要があります。
デフォルトでは、レコードキーは
nullです。Pub/Sub メッセージ属性をキーとして使用するには、
kafka.key.attributeを属性の名前に設定します。例:kafka.key.attribute=usernamePub/Sub の順序キーをキーとして使用するには、
kafka.key.attribute=orderingKeyを設定します。
レコード ヘッダー
デフォルトでは、レコード ヘッダーは空です。
kafka.record.headers が true の場合、Pub/Sub メッセージ属性はレコード ヘッダーとして書き込まれます。順序指定キーを含めるには、cps.makeOrderingKeyAttribute=true を設定します。
レコード値
kafka.record.headers が true の場合、または Pub/Sub メッセージにカスタム属性がない場合、レコード値はメッセージ データ(バイト配列)になります。値コンバータを org.apache.kafka.connect.converters.ByteArrayConverter に設定します。
それ以外の場合、kafka.record.headers が false で、メッセージに 1 つ以上のカスタム属性がある場合、コネクタはレコード値を struct として書き込みます。値コンバータを org.apache.kafka.connect.json.JsonConverter に設定します。
struct には次のフィールドがあります。
message: Pub/Sub メッセージ データ(バイト単位)。Pub/Sub メッセージ属性のフィールド。順序キーを含めるには、
cps.makeOrderingKeyAttribute=trueを設定します。
たとえば、メッセージに username 属性があるとします。
{
"message":"MESSAGE_DATA",
"username":"Alice"
}
value.converter.schemas.enable が true の場合、struct にはペイロードとスキーマの両方が含まれます。
{
"schema":
{
"type":"struct",
"fields": [
{
"type":"bytes",
"optional":false,
"field":"message"
},
{
"type":"string",
"optional":false,
"field":"username"
}
],
"optional":false
},
"payload": {
"message":"MESSAGE_DATA",
"username":"Alice"
}
}
Kafka パーティション
デフォルトでは、コネクタはトピックの単一のパーティションに書き込みます。コネクタが書き込むパーティションの数を指定するには、kafka.partition.count プロパティを設定します。値はトピックのパーティション数を超えないようにする必要があります。
コネクタがパーティションにメッセージを割り当てる方法を指定するには、kafka.partition.scheme プロパティを設定します。詳細については、Pub/Sub ソース コネクタの構成をご覧ください。