Kafka Connect は、Kafka デベロッパーにとって推奨されるデータ統合ツールです。データベース、メッセージ キュー、ファイル システムなどの外部システムと Kafka を接続するためのフレームワークを提供します。
Kafka Connect は、 Google Cloudによって検証および維持される、厳選された組み込みコネクタ プラグインのセットを提供します。これらのコネクタ プラグインは自動的にパッチが適用され、アップグレードされるため、メンテナンスが簡素化され、互換性が確保されます。また、Google Cloud には、パイプラインの健全性を維持するためのモニタリングとロギングが組み込まれています。
Kafka Connect API は、Google Cloud Managed Service for Apache Kafka サービスの一部として提供されます。これらの API には managedkafka.googleapis.com を介してアクセスでき、 Google Cloud コンソールとクライアント ライブラリに統合されています。Kafka Connect を管理するには、 Google Cloud コンソール、gcloud CLI、Managed Kafka API、Cloud クライアント ライブラリ、または Terraform を使用します。
Kafka Connect のユースケース
Kafka Connect は、Managed Service for Apache Kafka クラスタと他のさまざまなシステム間のデータ統合をサポートしています。主なユースケースは次のとおりです。
既存の Kafka デプロイを Managed Service for Apache Kafka に移行します。
障害復旧のために、Managed Service for Apache Kafka クラスタを別のリージョンに複製します。
Managed Service for Apache Kafka から BigQuery、Cloud Storage、Pub/Sub にデータをストリーミングします。
Kafka Connect の用語
以降のセクションでは、Kafka Connect の特定の主要コンポーネントについて説明します。
接続クラスタ
Connect クラスタは、事前パッケージ化されたコネクタ プラグインと構成を含む Kafka Connect の分散型デプロイです。各 Connect クラスタは、プライマリ Managed Service for Apache Kafka クラスタに関連付けられています。このプライマリ クラスタには、Connect クラスタで実行されているコネクタの状態が保存されます。
通常、プライマリ Managed Service for Apache Kafka クラスタは、関連付けられた Connect クラスタで実行されているすべてのソースコネクタのターゲットと、すべてのシンクコネクタのソースとしても機能します。
1 つの Managed Service for Apache Kafka クラスタに複数の Connect クラスタを設定できます。MirrorMaker 2.0 を実行している場合、Connect クラスタは非プライマリの Managed Service for Apache Kafka クラスタまたはセルフマネージド Kafka クラスタに接続して、トピックデータの読み取りまたは書き込みを行うことができます。このプロセスにより、異なるクラスタ間でトピックのレプリケーションが可能になります。
リソースモデルの観点から見ると、Connect クラスタは Managed Service for Apache Kafka クラスタとは別のリソースです。
ウェブサイトのトラフィック データを保存する Managed Service for Apache Kafka クラスタがあるとします。このデータを分析のために BigQuery にストリーミングしたいと考えています。Connect クラスタを作成し、BigQuery シンク コネクタを使用して、Kafka トピックから BigQuery にデータを移動できます。この Connect クラスタは、プライマリ クラスタとして Managed Service for Apache Kafka クラスタに関連付けられています。
コネクタ プラグイン
コネクタを作成するためのソフトウェア パッケージ。コネクタのロジックを定義するコードと考えてください。
コネクタは、ソース コネクタまたはシンク コネクタにできます。ソース コネクタは、ソースから Managed Service for Apache Kafka クラスタにデータを書き込みます。
シンク コネクタは、Managed Service for Apache Kafka クラスタからシンクにデータを書き込みます。
Managed Service for Apache Kafka は、コネクタの作成用に構成できる複数のタイプの組み込みコネクタ プラグインをサポートしています。これらのコネクタは、Pub/Sub や BigQuery などの一般的なサービスとの統合を提供します。これらのコネクタ プラグインは次のとおりです。
BigQuery シンクコネクタ プラグイン
Cloud Storage シンクコネクタ プラグイン
Pub/Sub ソースコネクタ プラグイン
Pub/Sub シンクコネクタ プラグイン
MirrorMaker 2.0 コネクタ プラグイン
コネクタ
コネクタは、特定の Connect クラスタ内のコネクタ プラグインの実行中のインスタンスです。同じコネクタ プラグインから複数のコネクタを作成できます。各コネクタには独自の構成を設定できます。構成の例としては、さまざまな認証の詳細や運用設定があります。コネクタは、Connect クラスタ内でデプロイ、構成、管理されます。起動、停止、一時停止、再起動が可能で、構成を更新できます。
コネクタのコンポーネントについては、次のセクションで説明します。
コンバージョンに至ったユーザー数
コンバータは、シリアル化とシリアル化解除を担当する Kafka Connect 内の重要なコンポーネントです。これらは、Avro や JSON 形式などの Kafka トピックにある未加工のバイト ワイヤ形式と、Kafka Connect の内部の構造化データ表現との間でデータを変換します。
コンバーターの役割
シンク コネクタの場合、コンバータはトピックのワイヤー形式から Kafka Connect の内部構造化データ表現にデータを逆シリアル化します。コネクタは、この表現を使用してターゲット システムに書き込みます。
ソースコネクタの場合、コンバータは、コネクタによって提供される Kafka Connect の内部構造化データ表現から、Kafka トピックの指定されたワイヤ形式にデータをシリアル化します。
この内部形式は共通の表現として機能し、さまざまな中間処理ステップを可能にします。これらのステップには、フィルタ、述語、変換、コンバータなどのプリミティブが含まれており、これらはすべてこの統一された内部形式で動作します。抽象的な内部形式を使用することで、これらの中間ステップのロジックは特定の入力または出力データ形式に依存しません。
コンバータは、データを単に渡すだけでなく、データとやり取りする必要がある場合に必要になります。具体的には、述語や変換などの中間処理ステップをきめ細かい構造認識の方法で実行する必要がある場合に、コンバータが必要になります。
バイト文字列(JSON の場合も含む)を操作せずにソースから Kafka に移動するだけの場合は、コンバータは必要ありません。
コネクタ構成でキーと値のコンバータを指定しない場合、コネクタはデフォルトの ByteArrayConverter 値を使用します。org.apache.kafka.connect.converters.ByteArrayConverter 値はデータに変換を適用せず、元の形式でデータを渡します。
サポートされているコンバータ
このリリースでは、 Google Cloud は次の組み込みコンバータをサポートしています。
org.apache.kafka.connect.converters.ByteArrayConverter: データをバイト配列に変換します。これがデフォルトのコンバータです。基になる未加工のバイトとしてコネクタを介してデータを渡します。org.apache.kafka.connect.json.JsonConverter: データを JSON 形式に変換します。org.apache.kafka.connect.storage.StringConverter: データを String 形式に変換します。org.apache.kafka.connect.converters.ByteArrayConverter: データをバイト配列に変換します。org.apache.kafka.connect.converters.DoubleConverter: データを Double 形式に変換します。org.apache.kafka.connect.converters.FloatConverter: データを Float 形式に変換します。org.apache.kafka.connect.converters.IntegerConverter: データを整数形式に変換します。org.apache.kafka.connect.converters.LongConverter: データを Long 形式に変換します。org.apache.kafka.connect.converters.ShortConverter: データを Short 形式に変換します。org.apache.kafka.connect.converters.BooleanConverter: データをブール値形式に変換します。io.confluent.connect.avro.AvroConverter: データを Apache Avro 形式に変換します。
このリリースでは、Kafka Connect は Schema Registry を使用したリモート スキーマに対する検証をサポートしていません。
各コネクタの優先コンバータについては、特定のコネクタのドキュメントをご覧ください。
デフォルトのコンバータ構成
サポートされているすべてのコネクタのデフォルトのキーと値のコンバータは org.apache.kafka.connect.json.JsonConverter です。
コネクタを構成する際は、Kafka メッセージのキーと値の両方に適切なコンバータを指定する必要があります。たとえば、JSON データを扱う場合は、JsonConverter を使用します。データが文字列形式の場合は、StringConverter を使用します。
一般的な構成には、次のようなものがあります。
tasks.max: このコネクタ用に作成するタスクの最大数。これにより、コネクタの並列処理が制御されます。タスク数を増やすとスループットは向上しますが、リソース消費量(CPU とメモリ)も増加します。最適な値は、ワークロードと Connect クラスタ ワーカーに割り当てられたリソース、シンク コネクタの場合は Kafka トピック パーティションの数によって異なります。value.converter: メッセージの値をシリアル化して Cloud Storage バケットに送信するために使用するコンバータ。一般的なコンバータは次のとおりです。org.apache.kafka.connect.json.JsonConverter: JSON データの場合。このコンバータをプレーン JSON(スキーマなし)で使用する場合は、value.converter.schemas.enable=falseを設定する必要があることがよくあります。org.apache.kafka.connect.converters.ByteArrayConverter: 2 つのシステム間でメッセージの正確な内容を保持するため。org.apache.kafka.connect.storage.StringConverter: プレーン テキスト文字列の場合。
key.converter: メッセージのキーのシリアル化に使用するコンバータ。value.converterと同じコンバータ オプションが適用されます。メッセージにキーがない場合は、多くの場合org.apache.kafka.connect.storage.StringConverterを使用できます。value.converter.schemas.enable: シンク コネクタの場合、org.apache.kafka.connect.json.JsonConverterを使用するときにこれをtrueに設定すると、Kafka Connect は受信 Kafka メッセージに埋め込まれたスキーマを探して使用します。false(デフォルト)に設定すると、Kafka Connect は、スキーマが埋め込まれていないプレーンな JSON 形式のデータを想定します。
変換(省略可)
変換を使用すると、データ パイプラインでデータの操作や拡充を行うことができます。変換を使用すると、Managed Service for Apache Kafka(ソースコネクタの場合)または外部システム(シンクコネクタの場合)に送信される前に、個々のメッセージを変更できます。変換を使用して、機密データをマスクしたり、タイムスタンプを追加したり、フィールド名を変更したりできます。
述語(省略可)
述語を使用すると、特定の条件に基づいてデータをフィルタリングできます。述語は変換を適用するためのフィルタとして機能し、メッセージ プロパティに基づいて変換を適用するメッセージを決定します。
Google Cloud内で Kafka Connect を管理する
Kafka Connect を使用すると、基盤となるインフラストラクチャと運用上の複雑さを Google Cloudが処理するため、コネクタのデプロイに集中できます。 Google Cloud で自動化される内容と構成可能な内容は次のとおりです。
Kafka Connect サービスは、次の処理を自動化します。
Kafka Connect ワーカーのプロビジョニング: Connect クラスタを作成すると、Kafka Connect サービスは Kubernetes にワーカー クラスタを自動的にプロビジョニングします。
ネットワーキング: Kafka Connect サービスは、ワーカー、Managed Service for Apache Kafka ブローカー、外部システム間の通信を可能にするようにネットワークを構成します。場合によっては、既存のネットワーク設定を変更する必要があります。
ゾーンの復元力: Kafka Connect サービスは、ワーカーを 3 つ以上のゾーンに分散します。これにより、ゾーンが停止した場合でもデータ処理を続行できます。
認証: Kafka Connect サービスは、Kafka ブローカーとの認証も構成し、安全な接続を確保します。
ロールアウトとアップグレード: Kafka Connect サービスは、ワーカー構成の変更、バージョンのアップグレード、セキュリティ パッチを管理し、デプロイが常に最新の状態になるようにします。
Kafka Connect サービス内では、次の構成を行うことができます。
容量とネットワークの制約: パフォーマンスと費用を最適化するために、リソースの上限とネットワーク構成を定義します。
モニタリングとロギング: コネクタのログと指標にアクセスして、パフォーマンスをモニタリングし、問題をトラブルシューティングします。
コネクタのライフサイクル管理: 必要に応じてコネクタを一時停止、再開、再起動、停止して、データ パイプラインを管理します。
制限事項
Kafka Connect サービスは、プライマリ Kafka クラスタとして Managed Service for Apache Kafka クラスタのみをサポートしています。プライマリ クラスタは、Kafka Connect クラスタがメタデータを書き込むクラスタです。
このサービスでは、カスタム コネクタ プラグインを Kafka Connect クラスタにアップロードすることはできません。