コネクタの概要

次の表に、Managed Service for Apache Kafka でサポートされている Kafka Connect コネクタタイプを示します。これらのコネクタを使用して、Apache Kafka をアプリケーションや他の Google Cloud サービスと統合できます。

コネクタ 説明 ユースケース
MirrorMaker 2.0 ある Kafka クラスタから別の Kafka クラスタにトピックとデータを複製します。 データ複製、障害復旧、データ移行
BigQuery シンク Kafka トピックから BigQuery テーブルにデータをストリーミングします。 データ ウェアハウジング、分析
Cloud Storage シンク Kafka トピックから Cloud Storage バケットにデータをストリーミングします。 データレイクの取り込み、データのアーカイブ
Pub/Sub シンク Kafka トピックから Pub/Sub トピックにデータをストリーミングします。 サービス統合、リアルタイム通知
Pub/Sub ソース Pub/Sub サブスクリプションから Kafka トピックにメッセージをストリーミングします。 リアルタイム データの取り込み、イベント ドリブン アーキテクチャ

コンバータ

コンバータは、Kafka レコードデータのシリアル化と逆シリアル化を行います。Kafka トピックにある未加工のバイト形式と、Kafka Connect で使用される内部の構造化データ表現の間で変換を行います。

  • シンクコネクタの場合、コンバータはトピックのワイヤ形式から Kafka Connect の内部データ形式にデータを逆シリアル化します。この形式は、コネクタがターゲット システムに書き込むために使用します。

  • ソースコネクタの場合、コンバータは Kafka Connect の内部データ形式から Kafka トピックの指定されたワイヤ形式にデータをシリアル化します。

コンバータは、コネクタが外部システムと互換性のある形式で Kafka レコードを読み書きできるようにします。

コネクタを構成するときは、次のプロパティを設定します。

  • キー コンバータ(key.converter): Kafka レコードキーのシリアル化と逆シリアル化に使用するコンバータ。

  • 値コンバータ(value.converter): Kafka レコード値のシリアル化と逆シリアル化に使用するコンバータ。

コンバータを指定しない場合、デフォルトのコンバータ タイプは org.apache.kafka.connect.converters.ByteArrayConverter です。これにより、データは未加工のバイト形式で渡されます。

サポートされているコンバータ

Managed Service for Apache Kafka は、次の組み込みコンバータをサポートしています。

コンバータ形式
io.confluent.connect.avro.AvroConverter Apache Avro
org.apache.kafka.connect.converters.BooleanConverter ブール値
org.apache.kafka.connect.converters.ByteArrayConverter

バイト配列

デフォルトのコンバータ タイプ。2 つのシステム間でメッセージの正確な内容を保持します。

org.apache.kafka.connect.converters.DoubleConverter Double
org.apache.kafka.connect.converters.FloatConverter 浮動小数点数
org.apache.kafka.connect.converters.IntegerConverter Integer
org.apache.kafka.connect.json.JsonConverter

JSON

スキーマのない JSON データの場合は、 value.converter.schemas.enable=false も設定します。

org.apache.kafka.connect.converters.LongConverter 長い
org.apache.kafka.connect.converters.ShortConverter 短い
org.apache.kafka.connect.storage.StringConverter 文字列

コンバータの選択は、コネクタタイプと Kafka に保存するデータによって異なります。詳細については、特定のコネクタのドキュメントをご覧ください。

ToDo リスト

コネクタは、並行して動作する 1 つ以上のタスク を作成してデータを転送します。コネクタが作成するタスクの数に上限を設定するには、コネクタの tasks.max 構成プロパティを設定します。コネクタが作成するタスクの数は、この値よりも少なくなる可能性があります。

tasks.max の値を大きくすると、スループットは向上しますが、リソース消費量(CPU とメモリ)も増加します。最適な値は、ワークロードと Connect クラスタ ワーカーに割り当てられたリソースによって異なります。シンクコネクタの場合、Kafka トピック パーティションの数も並列処理に影響する可能性があります。

タスクの再起動ポリシー

コネクタのタスク再起動ポリシーを設定できます。これにより、障害発生時の動作が 決まります。コネクタは次のポリシーをサポートしています。

  • 再起動しない。コネクタは失敗したタスクを再起動しません。このポリシーはデフォルトの動作です。デバッグや、エラー後に手動での介入が必要な場合に便利です。

  • 指数バックオフで再起動する。コネクタは、遅延(バックオフ期間 )の後に失敗したタスクを再起動します。遅延は、後続の失敗ごとに指数関数的に増加します。このポリシーは、ほとんどの本番環境ワークロードに推奨されます。

    指数バックオフ ポリシーを使用する場合は、最小バックオフと最大バックオフの値も設定します。最小バックオフは 60 秒より長く、最大バックオフは 7, 200 秒未満にする必要があります。

変換と述語

Managed Service for Apache Kafka は、デフォルトの Kafka Connect 変換述語をサポートしています。

変換 を使用すると、Managed Service for Apache Kafka(ソースコネクタの場合)または外部システム(シンクコネクタの場合)に送信される前に、個々のメッセージを変更できます。変換を使用して、センシティブ データのマスキング、タイムスタンプの追加、フィールド名の変更を行うことができます。

述語 を使用すると、特定の条件に基づいてデータをフィルタリングし、メッセージ プロパティに基づいて変換を適用するメッセージを決定できます。

たとえば、DoNotProcess ヘッダーキーを含むメッセージを無視するようにシンクコネクタを構成するには、次の構成を追加します。

transforms=dropMessage
transforms.dropMessage.type=org.apache.kafka.connect.transforms.Filter
transforms.dropMessage.predicate=hasKey
predicates=hasKey
predicates.hasKey.type=org.apache.kafka.connect.transforms.predicates.HasHeaderKey
predicates.hasKey.name=DoNotProcess

この構成では、次の処理が行われます。

  1. org.apache.kafka.connect.transforms.predicates.HasHeaderKey 型の hasKey という名前の述語を構成します。 この述語は、キー DoNotProcess を持つヘッダーを含むすべてのメッセージと一致します。

  2. org.apache.kafka.connect.transforms.Filter 型の dropMessage という名前の変換を構成します。 この変換は、構成された述語に一致するすべてのメッセージを削除します。

  3. 変換を述語 hasKey にリンクします。これにより、DoNotProcess ヘッダーキーが存在するメッセージのみが変換によって削除されます。

次のステップ

Apache Kafka® は、Apache Software Foundation または米国その他の諸国における関連会社の商標です。