このドキュメントでは、Pub/Sub グループの Kafka コネクタを使用して Apache Kafka と Pub/Sub を統合する方法について説明します。
Pub/Sub Kafka コネクタの概要
Apache Kafka は、イベント ストリーミングのオープンソース プラットフォームです。分散型アーキテクチャでよく使用され、疎結合のコンポーネント間の通信を可能にします。Pub/Sub は、メッセージを非同期で送受信するためのマネージド サービスです。Kafka と同様に、Pub/Sub を使用してクラウド アーキテクチャ内のコンポーネント間で通信できます。
Pub/Sub グループの Kafka コネクタを使用すると、これら 2 つのシステムを統合できます。コネクタ JAR には次のコネクタがパッケージ化されています。
- シンクコネクタは、1 つ以上の Kafka トピックからレコードを読み取り、Pub/Sub に公開します。
- ソースコネクタは、Pub/Sub トピックからメッセージを読み取り、Kafka にパブリッシュします。
Pub/Sub グループの Kafka コネクタを使用する可能性のあるいくつかのシナリオを次に示します。
- Kafka ベースのアーキテクチャを Google Cloudに移行する。
- フロントエンド システムがGoogle Cloud外の Kafka にイベントを保存するが、Kafka イベントを受信する必要がある一部のバックエンド サービスを実行するために Google Cloud も使用する。
- オンプレミスの Kafka ソリューションからログを収集し、Google Cloud に送信してデータ分析を行う。
- フロントエンド システムは Google Cloudを使用しているが、Kafka を使用してオンプレミスでもデータを保存する。
このコネクタには、Kafka と他のシステム間でデータをストリーミングするためのフレームワークである Kafka コネクト が必要です。このコネクタを使用するには、Kafka クラスタとともに Kafka Connect を実行する必要があります。
このドキュメントでは、Kafka と Pub/Sub の両方に精通していることを前提としています。この文書を読む前に、Pub/Sub クイックスタートのいずれかを完了することをお勧めします。
Pub/Sub コネクタは、 Google Cloud IAM と Kafka Connect ACL 間の統合をサポートしていません。
コネクタを使ってみる
このセクションでは、次のタスクについて説明します。- Pub/Sub グループの Kafka コネクタを構成する
- Kafka から Pub/Sub にイベントを送信する。
- Pub/Sub から Kafka にメッセージを送信する。
前提条件
Kafka をインストールする
Apache Kafka クイックスタートに沿って、ローカルマシンにシングルノード Kafka をインストールします。クイックスタートで、次の手順を行います。
- 最新の Kafka リリースをダウンロードして展開します。
- Kafka 環境を起動します。
- Kafka トピックを作成します。
認証
Pub/Sub メッセージを送受信するために、Pub/Sub グループの Kafka コネクタが Pub/Sub による認証を行う必要があります。認証を設定するには、次の手順を実行します。
- Google Cloud アカウントにログインします。 Google Cloudを初めて使用する場合は、 アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
-
Google Cloud CLI をインストールします。
-
外部 ID プロバイダ(IdP)を使用している場合は、まず連携 ID を使用して gcloud CLI にログインする必要があります。
-
gcloud CLI を初期化するには、次のコマンドを実行します。
gcloud init -
Google Cloud プロジェクトを作成または選択します。
プロジェクトの選択または作成に必要なロール
- プロジェクトを選択する: プロジェクトの選択に特定の IAM ロールは必要ありません。ロールが付与されているプロジェクトであれば、どのプロジェクトでも選択できます。
-
プロジェクトを作成する: プロジェクトを作成するには、
resourcemanager.projects.create権限を含むプロジェクト作成者ロール(roles/resourcemanager.projectCreator)が必要です。ロールを付与する方法を確認する。
-
Google Cloud プロジェクトを作成します。
gcloud projects create PROJECT_ID
PROJECT_IDは、作成する Google Cloud プロジェクトの名前に置き換えます。 -
作成した Google Cloud プロジェクトを選択します。
gcloud config set project PROJECT_ID
PROJECT_IDは、 Google Cloud プロジェクトの名前に置き換えます。
-
ユーザー アカウントのローカル認証情報を作成します。
gcloud auth application-default login
認証エラーが返され、外部 ID プロバイダ(IdP)を使用している場合は、 連携 ID を使用して gcloud CLI にログインしていることを確認します。
-
ユーザー アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。
roles/pubsub.admingcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
次のように置き換えます。
PROJECT_ID: プロジェクト ID。USER_IDENTIFIER: ユーザー アカウントの識別子。例:myemail@example.comROLE: ユーザー アカウントに付与する IAM ロール。
-
Google Cloud CLI をインストールします。
-
外部 ID プロバイダ(IdP)を使用している場合は、まず連携 ID を使用して gcloud CLI にログインする必要があります。
-
gcloud CLI を初期化するには、次のコマンドを実行します。
gcloud init -
Google Cloud プロジェクトを作成または選択します。
プロジェクトの選択または作成に必要なロール
- プロジェクトを選択する: プロジェクトの選択に特定の IAM ロールは必要ありません。ロールが付与されているプロジェクトであれば、どのプロジェクトでも選択できます。
-
プロジェクトを作成する: プロジェクトを作成するには、
resourcemanager.projects.create権限を含むプロジェクト作成者ロール(roles/resourcemanager.projectCreator)が必要です。ロールを付与する方法を確認する。
-
Google Cloud プロジェクトを作成します。
gcloud projects create PROJECT_ID
PROJECT_IDは、作成する Google Cloud プロジェクトの名前に置き換えます。 -
作成した Google Cloud プロジェクトを選択します。
gcloud config set project PROJECT_ID
PROJECT_IDは、 Google Cloud プロジェクトの名前に置き換えます。
-
ユーザー アカウントのローカル認証情報を作成します。
gcloud auth application-default login
認証エラーが返され、外部 ID プロバイダ(IdP)を使用している場合は、 連携 ID を使用して gcloud CLI にログインしていることを確認します。
-
ユーザー アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。
roles/pubsub.admingcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
次のように置き換えます。
PROJECT_ID: プロジェクト ID。USER_IDENTIFIER: ユーザー アカウントの識別子。例:myemail@example.comROLE: ユーザー アカウントに付与する IAM ロール。
コネクタ JAR をダウンロードする
ローカルマシンにコネクタの JAR ファイルをダウンロードします。詳細については、GitHub ReadMe のコネクタの取得をご覧ください。
コネクタ構成ファイルをコピーする
コネクタの GitHub リポジトリのクローンを作成するか、ダウンロードします。
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git cd java-pubsub-group-kafka-connectorconfigディレクトリの内容を Kafka インストールのconfigサブディレクトリにコピーします。cp config/* [path to Kafka installation]/config/
これらのファイルには、コネクタの構成設定が含まれています。
Kafka Connect 構成を更新する
- ダウンロードした Kafka コネクト バイナリが含まれているディレクトリに移動します。
- Kafka コネクト バイナリ ディレクトリにある
config/connect-standalone.propertiesという名前のファイルをテキスト エディタで開きます。 plugin.path propertyがコメントアウトされている場合は、コメント化解除します。plugin.path propertyを更新して、コネクタ JAR へのパスを追加します。例:
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jaroffset.storage.file.filenameプロパティをローカル ファイル名に設定します。スタンドアロン モードでは、Kafka はこのファイルを使用してオフセット データを保存します。例:
offset.storage.file.filename=/tmp/connect.offsets
Kafka から Pub/Sub にイベントを転送する
このセクションでは、シンクコネクタを起動し、Kafka にイベントをパブリッシュして、Pub/Sub から転送されたメッセージを読み取る方法について説明します。
Google Cloud CLI を使用して、サブスクリプションを含む Pub/Sub トピックを作成します。
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
以下を置き換えます。
- PUBSUB_TOPIC: Kafka からメッセージを受信する Pub/Sub トピックの名前。
- PUBSUB_SUBSCRIPTION: トピックの Pub/Sub サブスクリプションの名前。
テキスト エディタで
/config/cps-sink-connector.propertiesというファイルを開きます。コメントで"TODO"とマークされている次のプロパティの値を追加します。topics=KAFKA_TOPICS cps.project=PROJECT_ID cps.topic=PUBSUB_TOPIC
以下を置き換えます。
- KAFKA_TOPICS: 読み取る Kafka トピックのカンマ区切りのリスト。
- PROJECT_ID: Pub/Sub トピックを含む Google Cloud プロジェクト。
- PUBSUB_TOPIC: Kafka からメッセージを受信する Pub/Sub トピック。
Kafka ディレクトリから次のコマンドを実行します。
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-sink-connector.propertiesApache Kafka クイックスタートの手順に沿って、Kafka トピックにイベントを書き込みます。
gcloud CLI を使用して、Pub/Sub からイベントを読み取ります。
gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
Pub/Sub から Kafka にメッセージを転送する。
このセクションでは、ソースコネクタの起動、Pub/Sub へのメッセージのパブリッシュ、Kafka から転送されたメッセージの読み取りを行う方法について説明します。
gcloud CLI を使用して、サブスクリプションで Pub/Sub トピックを作成します。
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
以下を置き換えます。
- PUBSUB_TOPIC: Pub/Sub トピックの名前。
- PUBSUB_SUBSCRIPTION: Pub/Sub サブスクリプションの名前。
テキスト エディタで
/config/cps-source-connector.propertiesという名前のファイルを開きます。コメントで"TODO"とマークされている次のプロパティの値を追加します。kafka.topic=KAFKA_TOPIC cps.project=PROJECT_ID cps.subscription=PUBSUB_SUBSCRIPTION
以下を置き換えます。
- KAFKA_TOPIC: Pub/Sub メッセージを受信する Kafka トピック。
- PROJECT_ID: Pub/Sub トピックを含む Google Cloud プロジェクト。
- PUBSUB_TOPIC: Pub/Sub トピック。
Kafka ディレクトリから次のコマンドを実行します。
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-source-connector.propertiesメッセージを gcloud CLI を使用して Pub/Sub にパブリッシュします。
gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
Kafka からのメッセージを読み取ります。Apache Kafka クイックスタートの手順に沿って、Kafka トピックからメッセージを読み取ります。
メッセージ コンバージョン
Kafka レコードにはキーと値が含まれています。これらは可変長のバイト配列です。必要に応じて、Key-Value ペアであるKafka レコードのヘッダーを含めることもできます。Pub/Sub メッセージには、メッセージ本文と 0 個以上の Key-Value 属性の 2 つの主要部分があります。
Kafka コネクトはコンバータを使用して、Kafka との間でキーと値をシリアル化します。 シリアル化を制御するには、コネクタ構成ファイルで次のプロパティを設定します。
key.converter: レコードキーをシリアル化するために使用されるコンバータ。value.converter: レコード値をシリアル化するために使用されるコンバータ。
Pub/Sub メッセージの本文は ByteString オブジェクトであるため、最も効率的な変換はペイロードを直接コピーすることです。そのため、可能であれば、同じメッセージ本文のシリアル化解除と再シリアル化を防ぐため、プリミティブ データ型(整数、浮動小数点、文字列、バイトスキーマ)を生成するコンバータの使用をおすすめします。
Kafka から Pub/Sub への変換
シンクコネクタは、Kafka レコードを次のように Pub/Sub メッセージに変換します。
- Kafka レコードキーは、Pub/Sub メッセージに
"key"という名前の属性として保存されます。 - デフォルトでは、コネクタは Kafka レコードのヘッダーをすべてドロップします。ただし、
headers.publish構成オプションをtrueに設定すると、コネクタはヘッダーを Pub/Sub 属性として書き込みます。コネクタは、Pub/Sub のメッセージ属性の制限を超えるヘッダーをスキップします。 - 整数、浮動小数点数、文字列、バイトのスキーマの場合、コネクタは Kafka レコード値のバイトを Pub/Sub メッセージ本文に直接渡します。
- 構造体スキーマの場合、コネクタは各フィールドを Pub/Sub メッセージの属性として書き込みます。たとえば、フィールドが
{ "id"=123 }の場合、生成される Pub/Sub メッセージには"id"="123"という属性が与えられます。フィールドの値は常に文字列に変換されます。マップ型と構造体型は、構造体内のフィールド型としてサポートされていません。 - マップスキーマの場合、コネクタは各 Key-Value ペアを Pub/Sub メッセージの属性として書き込みます。たとえば、マップが
{"alice"=1,"bob"=2}の場合、結果の Pub/Sub メッセージには"alice"="1"と"bob"="2"の 2 つの属性を持ちます。キーと値は文字列に変換されます。
構造体とマップのスキーマには、次のような追加の動作があります。
必要に応じて、
messageBodyName構成プロパティを設定することで、特定の構造体フィールドまたはマップキーをメッセージ本文に指定できます。フィールドやキーの値はメッセージ本文にByteStringとして格納されます。messageBodyNameを設定しない場合、構造体とマップのスキーマのメッセージ本文は空になります。配列値の場合、コネクタはプリミティブ配列タイプのみをサポートします。配列内の値の順序は、1 つの
ByteStringオブジェクトに連結されます。
Pub/Sub から Kafka への変換
ソースコネクタは、Pub/Sub メッセージを次のように Kafka レコードに変換します。
Kafka レコードキー: デフォルトでは、キーは
nullに設定されています。必要に応じて、kafka.key.attribute構成オプションを設定して、キーとして使用する Pub/Sub メッセージ属性を指定できます。その場合、コネクタはその名前の属性を検索し、レコードキーを属性値に設定します。指定された属性が存在しない場合、レコードキーはnullに設定されます。Kafka レコード値. コネクタはレコード値を次のように書き込みます。
Pub/Sub メッセージにカスタム属性がない場合、コネクタは
value.converterによって指定されたコンバータを使用して、Pub/Sub メッセージ本文をbyte[]型として Kafka レコード値に直接書き込みます。Pub/Sub メッセージにカスタム属性があり、
kafka.record.headersがfalseの場合、コネクタはレコード値に構造体を書き込みます。この構造体は、属性ごとに 1 つのフィールドと、Pub/Sub メッセージ本文(バイトとして保存)を持つ"message"という名前のフィールドを含んでいます。{ "message": "<Pub/Sub message body>", "<attribute-1>": "<value-1>", "<attribute-2>": "<value-2>", .... }この場合、
structスキーマと互換性のあるvalue.converter(org.apache.kafka.connect.json.JsonConverterなど)を使用する必要があります。Pub/Sub メッセージにカスタム属性があり、
kafka.record.headersがtrueの場合、コネクタは属性を Kafka レコード ヘッダーとして書き込みます。value.converterで指定されたコンバータを使用して、Pub/Sub メッセージ本文をbyte[]タイプとして Kafka レコード値に直接書き込みます。
Kafka レコード ヘッダー。デフォルトでは、
kafka.record.headersをtrueに設定しない限りヘッダーは空です。
構成オプション
Kafka Connect API が提供する構成に加えて、Pub/Sub グループの Kafka コネクタは、Pub/Sub コネクタの構成で説明されているシンクとソースの構成をサポートしています。
サポートの利用
ご不明な点がある場合は、サポート チケットを作成してください。一般的な質問やディスカッションについては、GitHub リポジトリで問題を作成してください。
次のステップ
- Kafka と Pub/Sub の違いについて。
- Pub/Sub グループの Kafka コネクタの詳細を確認する。
- Pub/Sub グループの Kafka コネクタの GitHub リポジトリを確認する。