次の表に、Managed Service for Apache Kafka でサポートされている Kafka Connect コネクタタイプを示します。これらのコネクタを使用して、Apache Kafka をアプリケーションや他の Google Cloud サービスと統合できます。
| コネクタ | 説明 | ユースケース |
|---|---|---|
| MirrorMaker 2.0 | ある Kafka クラスタから別の Kafka クラスタにトピックとデータを複製します。 | データ複製、障害復旧、データ移行 |
| BigQuery シンク | Kafka トピックから BigQuery テーブルにデータをストリーミングします。 | データ ウェアハウジング、分析 |
| Cloud Storage シンク | Kafka トピックから Cloud Storage バケットにデータをストリーミングします。 | データレイクの取り込み、データのアーカイブ |
| Pub/Sub シンク | Kafka トピックから Pub/Sub トピックにデータをストリーミングします。 | サービス統合、リアルタイム通知 |
| Pub/Sub ソース | Pub/Sub サブスクリプションから Kafka トピックにメッセージをストリーミングします。 | リアルタイム データの取り込み、イベント ドリブン アーキテクチャ |
コンバータ
コンバータは、Kafka レコードデータのシリアル化と逆シリアル化を行います。Kafka トピックにある未加工のバイト形式と、Kafka Connect で使用される内部の構造化データ表現の間で変換を行います。
シンクコネクタの場合、コンバータはトピックのワイヤ形式から Kafka Connect の内部データ形式にデータを逆シリアル化します。この形式は、コネクタがターゲット システムに書き込むために使用します。
ソースコネクタの場合、コンバータは Kafka Connect の内部データ形式から Kafka トピックの指定されたワイヤ形式にデータをシリアル化します。
コンバータは、コネクタが外部システムと互換性のある形式で Kafka レコードを読み書きできるようにします。
コネクタを構成するときは、次のプロパティを設定します。
キー コンバータ(
key.converter): Kafka レコードキーのシリアル化と逆シリアル化に使用するコンバータ。値コンバータ(
value.converter): Kafka レコード値のシリアル化と逆シリアル化に使用するコンバータ。
コンバータを指定しない場合、デフォルトのコンバータ タイプは org.apache.kafka.connect.converters.ByteArrayConverter です。これにより、データは未加工のバイト形式で渡されます。
サポートされているコンバータ
Managed Service for Apache Kafka は、次の組み込みコンバータをサポートしています。
| コンバータ | 形式 |
|---|---|
io.confluent.connect.avro.AvroConverter |
Apache Avro |
org.apache.kafka.connect.converters.BooleanConverter |
ブール値 |
org.apache.kafka.connect.converters.ByteArrayConverter |
バイト配列 デフォルトのコンバータ タイプ。2 つのシステム間でメッセージの正確な内容を保持します。 |
org.apache.kafka.connect.converters.DoubleConverter |
Double |
org.apache.kafka.connect.converters.FloatConverter |
浮動小数点数 |
org.apache.kafka.connect.converters.IntegerConverter |
Integer |
org.apache.kafka.connect.json.JsonConverter |
JSON スキーマのない JSON データの場合は、
|
org.apache.kafka.connect.converters.LongConverter |
長い |
org.apache.kafka.connect.converters.ShortConverter |
短い |
org.apache.kafka.connect.storage.StringConverter |
文字列 |
コンバータの選択は、コネクタタイプと Kafka に保存するデータによって異なります。詳細については、特定のコネクタのドキュメントをご覧ください。
ToDo リスト
コネクタは、並行して動作する 1 つ以上のタスク を作成してデータを転送します。コネクタが作成するタスクの数に上限を設定するには、コネクタの tasks.max 構成プロパティを設定します。コネクタが作成するタスクの数は、この値よりも少なくなる可能性があります。
tasks.max の値を大きくすると、スループットは向上しますが、リソース消費量(CPU とメモリ)も増加します。最適な値は、ワークロードと Connect クラスタ ワーカーに割り当てられたリソースによって異なります。シンクコネクタの場合、Kafka トピック パーティションの数も並列処理に影響する可能性があります。
タスクの再起動ポリシー
コネクタのタスク再起動ポリシーを設定できます。これにより、障害発生時の動作が 決まります。コネクタは次のポリシーをサポートしています。
再起動しない。コネクタは失敗したタスクを再起動しません。このポリシーはデフォルトの動作です。デバッグや、エラー後に手動での介入が必要な場合に便利です。
指数バックオフで再起動する。コネクタは、遅延(バックオフ期間 )の後に失敗したタスクを再起動します。遅延は、後続の失敗ごとに指数関数的に増加します。このポリシーは、ほとんどの本番環境ワークロードに推奨されます。
指数バックオフ ポリシーを使用する場合は、最小バックオフと最大バックオフの値も設定します。最小バックオフは 60 秒より長く、最大バックオフは 7, 200 秒未満にする必要があります。
変換と述語
Managed Service for Apache Kafka は、デフォルトの Kafka Connect 変換 と 述語をサポートしています。
変換 を使用すると、Managed Service for Apache Kafka(ソースコネクタの場合)または外部システム(シンクコネクタの場合)に送信される前に、個々のメッセージを変更できます。変換を使用して、センシティブ データのマスキング、タイムスタンプの追加、フィールド名の変更を行うことができます。
述語 を使用すると、特定の条件に基づいてデータをフィルタリングし、メッセージ プロパティに基づいて変換を適用するメッセージを決定できます。
たとえば、DoNotProcess ヘッダーキーを含むメッセージを無視するようにシンクコネクタを構成するには、次の構成を追加します。
transforms=dropMessage
transforms.dropMessage.type=org.apache.kafka.connect.transforms.Filter
transforms.dropMessage.predicate=hasKey
predicates=hasKey
predicates.hasKey.type=org.apache.kafka.connect.transforms.predicates.HasHeaderKey
predicates.hasKey.name=DoNotProcess
この構成では、次の処理が行われます。
org.apache.kafka.connect.transforms.predicates.HasHeaderKey型のhasKeyという名前の述語を構成します。 この述語は、キーDoNotProcessを持つヘッダーを含むすべてのメッセージと一致します。org.apache.kafka.connect.transforms.Filter型のdropMessageという名前の変換を構成します。 この変換は、構成された述語に一致するすべてのメッセージを削除します。変換を述語
hasKeyにリンクします。これにより、DoNotProcessヘッダーキーが存在するメッセージのみが変換によって削除されます。