このページでは、Kafka コネクタを使用して Spanner の変更ストリーム データの使用と転送を行う方法について説明します。
基本コンセプト
以下では、Kafka コネクタの基本コンセプトについて説明します。
Debezium
Debezium は、変更データ キャプチャ用の低レイテンシ データ ストリーミング プラットフォームを提供するオープンソース プロジェクトです。
Kafka コネクタ
Kafka コネクタは、Spanner API の抽象化を提供し、Spanner 変更ストリームを Kafka に公開します。このコネクタを使用すると、Spanner API を直接使用する際に必要な変更ストリーム パーティションのライフサイクルを管理する必要がなくなります。
Kafka コネクタは、データ変更レコードの mod ごとに変更イベントを生成し、変更ストリームで追跡されるテーブルごとに、変更イベント レコードを個別の Kafka トピックにダウンストリームします。データ変更レコードの mod は、キャプチャされた単一の変更(挿入、更新、または削除)を表します。1 つのデータ変更レコードに複数の mod を含めることができます。
Kafka コネクタの出力
Kafka コネクタは、変更ストリーム レコードを個別の Kafka トピックに直接転送します。出力トピック名は connector_name.table_name にする必要があります。トピックが存在しない場合、Kafka コネクタは自動的にその名前でトピックを作成します。
トピック ルーティング変換を構成して、指定したトピックにレコードを再ルーティングすることもできます。トピック ルーティングを使用する場合は、低ウォーターマーク機能を無効にします。
レコードの順序付け
Kafka トピックのレコードは、主キーごとに commit タイムスタンプで順序付けされます。異なる主キーに属するレコードに順序の保証はありません。同じ主キーを持つレコードは、同じ Kafka トピック パーティションに保存されます。トランザクション全体を処理する場合は、データ変更レコードの server_transaction_id フィールドと number_of_records_in_transaction フィールドを使用して、Spanner トランザクションを組み立てることもできます。
変更イベント
Kafka コネクタは、INSERT、UPDATE、DELETE オペレーションごとにデータ変更イベントを生成します。各イベントには、変更された行のキーと値が含まれます。
Kafka Connect コンバータを使用すると、Protobuf、AVRO、JSON、JSON Schemaless 形式でデータ変更イベントを生成できます。スキーマを生成する Kafka Connect コンバータを使用する場合、イベントにはキーと値の個別のスキーマが含まれます。それ以外の場合、イベントにはキーと値のみが含まれます。
キーのスキーマは変更されません。値のスキーマは、コネクタの開始時刻以降に変更ストリームが追跡したすべての列の集合体です。
JSON イベントを生成するようにコネクタを構成すると、出力変更イベントには次の 5 つのフィールドが含まれます。
最初の
schemaフィールドは、Spanner キースキーマを記述する Kafka Connect スキーマを示します。最初の
payloadフィールドには、前のschemaフィールドで説明した構造があり、変更された行のキーが含まれています。2 番目の
schemaフィールドは、変更された行のスキーマを記述する Kafka Connect スキーマを示します。2 番目の
payloadフィールドは、前のschemaフィールドで説明されている構造を持ち、変更された行の実際のデータが含まれています。sourceフィールドは、イベントのソース メタデータを記述する必須フィールドです。
データ変更イベントの例を次に示します。
{
// The schema for the Spanner key.
"schema": {
"type": "struct",
"name": "customers.Key",
"optional": false,
"fields": [
{
"type": "int64",
"optional": "false"
"field": "false"
}
]
},
// The value of the Spanner key.
"payload": {
"id": "1"
},
// The schema for the payload, which contains the before and after values
// of the changed row. The schema for the payload contains all the
// columns that the change stream has tracked since the connector start
// time.
"schema": {
"type": "struct",
"fields": [
{
// The schema for the before values of the changed row.
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "first_name"
}
],
"optional": true,
"name": "customers.Value",
"field": "before"
},
{
// The schema for the after values of the changed row.
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
}
],
"optional": true,
"name": "customers.Value",
"field": "after"
},
{
// The schema for the source metadata for the event.
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "boolean",
"optional": true,
"default": false,
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": false,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "project_id"
},
{
"type": "string",
"optional": false,
"field": "instance_id"
},
{
"type": "string",
"optional": false,
"field": "database_id"
},
{
"type": "string",
"optional": false,
"field": "change_stream_name"
},
{
"type": "string",
"optional": true,
"field": "table"
}
{
"type": "string",
"optional": true,
"field": "server_transaction_id"
}
{
"type": "int64",
"optional": true,
"field": "low_watermark"
}
{
"type": "int64",
"optional": true,
"field": "read_at_timestamp"
}
{
"type": "int64",
"optional": true,
"field": "number_of_records_in_transaction"
}
{
"type": "string",
"optional": true,
"field": "transaction_tag"
}
{
"type": "boolean",
"optional": true,
"field": "system_transaction"
}
{
"type": "string",
"optional": true,
"field": "value_capture_type"
}
{
"type": "string",
"optional": true,
"field": "partition_token"
}
{
"type": "int32",
"optional": true,
"field": "mod_number"
}
{
"type": "boolean",
"optional": true,
"field": "is_last_record_in_transaction_in_partition"
}
{
"type": "int64",
"optional": true,
"field": "number_of_partitions_in_transaction"
}
],
"optional": false,
"name": "io.debezium.connector.spanner.Source",
"field": "source"
},
]
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
],
"optional": false,
"name": "connector_name.customers.Envelope"
},
"payload": {
// The values of the row before the event.
"before": null,
// The values of the row after the event.
"after": {
"id": 1,
"first_name": "Anne",
}
},
// The source metadata.
"source": {
"version": "{debezium-version}",
"connector": "spanner",
"name": "spanner_connector",
"ts_ms": 1670955531785,
"snapshot": "false",
"db": "database",
"sequence": "1",
"project_id": "project",
"instance_id": "instance",
"database_id": "database",
"change_stream_name": "change_stream",
"table": "customers",
"server_transaction_id": "transaction_id",
"low_watermark": 1670955471635,
"read_at_timestamp": 1670955531791,
"number_records_in_transaction": 2,
"transaction_tag": "",
"system_transaction": false,
"value_capture_type": "OLD_AND_NEW_VALUES",
"partition_token": "partition_token",
"mod_number": 0,
"is_last_record_in_transaction_in_partition": true,
"number_of_partitions_in_transaction": 1
},
"op": "c",
"ts_ms": 1559033904863 //
}
低ウォーターマーク
低ウォーターマークは、Kafka コネクタが T 未満のタイムスタンプすべてのイベントをストリーミングして Kafka トピックに公開されることが保証されている時間 T を表します。
Kafka コネクタで低ウォーターマークを有効にするには、gcp.spanner.low-watermark.enabled パラメータを使用します。このパラメータはデフォルトで無効になっています。低ウォーターマークが有効になっている場合、変更ストリーム データ変更レコードの low_watermark フィールドには、Kafka コネクタの現在の低ウォーターマークのタイムスタンプが入力されます。
レコードが生成されていない場合、Kafka コネクタは、コネクタによって検出された Kafka 出力トピックに定期的にウォーターマーク ハートビートを送信します。
これらのウォーターマーク ハートビートは、low_watermark フィールドを除いて空のレコードです。その後、低ウォーターマークを使用して時間ベースの集計を実行できます。たとえば、低ウォーターマークを使用して、主キー全体で commit タイムスタンプ別にイベントを並べ替えることができます。
メタデータ トピック
Kafka コネクタと Kafka Connect フレームワークは、コネクタ関連の情報を格納するために、複数のメタデータ トピックを作成します。これらのメタデータ トピックの構成やコンテンツを変更することはおすすめしません。
メタデータのトピックは次のとおりです。
_consumer_offsets: Kafka によって自動的に作成されるトピック。Kafka コネクタで作成されたコンシューマーのコンシューマー オフセットを保存します。_kafka-connect-offsets: Kafka Connect によって自動的に作成されるトピック。コネクタのオフセットを保存します。_sync_topic_spanner_connector_connectorname: コネクタによって自動的に作成されるトピック。変更ストリーム パーティションに関するメタデータを格納します。_rebalancing_topic_spanner_connector_connectorname: コネクタによって自動的に作成されるトピック。コネクタタスクの存続を確認するために使用されます。_debezium-heartbeat.connectorname: Spanner 変更ストリームのハートビートの処理に使用されるトピック。
Kafka コネクタ ランタイム
以下では、Kafka コネクタのランタイムについて説明します。
スケーラビリティ
Kafka コネクタは水平方向にスケーラブルで、複数の Kafka Connect ワーカーに分散された 1 つ以上のタスクで実行されます。
メッセージ配信の保証
Kafka コネクタは、「1 回以上」の配信保証をサポートしています。
フォールト トレラント
Kafka コネクタは、障害に対して耐性があります。変更を読み取ってイベントを生成する際に、Kafka コネクタは、処理された直近の commit タイムスタンプを変更ストリームのパーティションごとに記録します。Kafka コネクタがなんらかの理由(通信障害、ネットワークの問題、ソフトウェア障害など)で停止した場合、再起動後、Kafka コネクタは最後に停止したところからレコードのストリーミングを再開します。
Kafka コネクタは、Kafka コネクタの開始タイムスタンプで情報スキーマを読み取り、スキーマ情報を取得します。デフォルトでは、Spanner はバージョン保持期間(デフォルトは 1 時間)より前の読み取りタイムスタンプで情報スキーマを読み取ることができません。1 時間より前のコネクタを開始する場合は、データベースのバージョン保持期間を長くする必要があります。
Kafka コネクタを設定する
変更ストリームを作成する
変更ストリームの作成方法の詳細については、変更ストリームの作成をご覧ください。次のステップに進むには、変更ストリームが構成された Spanner インスタンスが必要です。
各データ変更イベントで変更された列と変更されていない列の両方を返す場合は、値キャプチャ タイプ NEW_ROW を使用します。詳細については、値キャプチャ タイプをご覧ください。
Kafka コネクタ JAR をインストールする
Zookeeper、Kafka、Kafka Connect がインストールされている場合、Kafka コネクタをデプロイする残りのタスクでは、Connectors プラグイン アーカイブをダウンロードし、JAR ファイルを Kafka Connect 環境に解凍して、JAR ファイルを含むディレクトリを Kafka Connect の plugin.path に追加します。新しい JAR ファイルを取得するには、Kafka Connect プロセスを再起動する必要があります。
不変コンテナを使用している場合は、Zookeeper、Kafka、Kafka Connect の Debezium のコンテナ イメージからイメージを pull できます。Kafka Connect イメージには、Spanner コネクタがプリインストールされています。
Debezium ベースの Kafka コネクタ JAR をインストールする方法については、Debezium のインストールをご覧ください。
Kafka コネクタを構成する
次の例は、インスタンス test-instance とプロジェクト test-project のデータベース users の changeStreamAll という変更ストリームに接続する Kafka コネクタの構成の例です。
"name": "spanner-connector", "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{"client_id": user@example.com}", "gcp.spanner.database.role": "cdc-role", "tasks.max": "10" }
この構成には、次のものが含まれます。
Kafka Connect サービスに登録されたコネクタの名前。
この Spanner コネクタクラスの名前。
プロジェクト ID
Spanner インスタンス ID。
Spanner データベース ID。
変更ストリーム名。
サービス アカウント キーの JSON オブジェクト。
(省略可)使用する Spanner データベース ロール。
タスクの最大数。
コネクタ プロパティの一覧については、Kafka コネクタの構成プロパティをご覧ください。
コネクタ構成を Kafka Connect に追加する
Spanner コネクタの実行を開始するには:
Spanner コネクタの構成を作成します。
Kafka Connect REST API を使用して、そのコネクタ構成を Kafka Connect クラスタに追加します。
この構成は、POST コマンドを使用して、実行中の Kafka Connect サービスに送信できます。デフォルトでは、Kafka Connect サービスはポート 8083 で実行されます。サービスは構成を記録し、Spanner データベースに接続して変更イベント レコードを Kafka トピックにストリーミングするコネクタタスクを開始します。
POST コマンドの例を示します。
POST /connectors HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
"name": "spanner-connector"
"config": {
"connector.class": "io.debezium.connector.spanner.SpannerConnector",
"gcp.spanner.project.id": "test-project",
"gcp.spanner.instance.id": "test-instance",
"gcp.spanner.database.id": "users",
"gcp.spanner.change.stream": "changeStreamAll",
"gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
"heartbeat.interval.ms": "100",
"tasks.max": "10"
}
}成功したレスポンスの例:
HTTP/1.1 201 Created
Content-Type: application/json
{
"name": "spanner-connector",
"config": {
"connector.class": "io.debezium.connector.spanner.SpannerConnector",
"gcp.spanner.project.id": "test-project",
"gcp.spanner.instance.id": "test-instance",
"gcp.spanner.database.id": "users",
"gcp.spanner.change.stream": "changeStreamAll",
"gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
"heartbeat.interval.ms": "100",
"tasks.max": "10"
},
"tasks": [
{ "connector": "spanner-connector", "task": 1 },
{ "connector": "spanner-connector", "task": 2 },
{ "connector": "spanner-connector", "task": 3 }
]
}Kafka コネクタの構成を更新する
コネクタ構成を更新するには、同じコネクタ名を使用して、実行中の Kafka Connect サービスに PUT コマンドを送信します。
前のセクションの構成でコネクタが実行されているとします。PUT コマンドの例を示します。
PUT /connectors/spanner-connector/config HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
"connector.class": "io.debezium.connector.spanner.SpannerConnector",
"gcp.spanner.project.id": "test-project",
"gcp.spanner.instance.id": "test-instance",
"gcp.spanner.database.id": "users",
"gcp.spanner.change.stream": "changeStreamAll",
"gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
"heartbeat.interval.ms": "100",
"tasks.max": "10"
}成功したレスポンスの例:
HTTP/1.1 200 OK
Content-Type: application/json
{
"connector.class": "io.debezium.connector.spanner.SpannerConnector",
"tasks.max": "10",
"gcp.spanner.project.id": "test-project",
"gcp.spanner.instance.id": "test-instance",
"gcp.spanner.database.id": "users",
"gcp.spanner.change.stream": "changeStreamAll",
"gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
"heartbeat.interval.ms": "100",
"tasks.max": "10"
}Kafka コネクタを停止する
コネクタを停止するには、同じコネクタ名を使用して、実行中の Kafka Connect サービスに DELETE コマンドを送信します。
前のセクションの構成でコネクタが実行されているとします。DELETE コマンドの例を示します。
DELETE /connectors/spanner-connector HTTP/1.1 Host: http://localhost:8083
成功したレスポンスの例:
HTTP/1.1 204 No Content
Kafka コネクタをモニタリングする
Kafka コネクタは、標準の Kafka Connect と Debezium の指標に加えて、独自の指標をエクスポートします。
MilliSecondsLowWatermark: コネクタタスクの現在の下限値(ミリ秒単位)。低ウォーターマークは、コネクタが T 未満のタイムスタンプのすべてのイベントをストリーミングすることが保証されている時間 T を表します。MilliSecondsLowWatermarkLag: 低ウォーターマークの現在の時刻に対する遅延(ミリ秒単位)。タイムスタンプが T より小さいすべてのイベントがストリーミングされています。LatencyLowWatermark<Variant>MilliSeconds: 低ウォーターマークの現在の時刻に対する遅延(ミリ秒単位)。P50、P95、P99、平均、最小、最大値のバリエーションがあります。LatencySpanner<Variant>MilliSeconds: Spanner-commit-timestamp-to-connector-read の遅延。P50、P95、P99、平均、最小、最大値のバリエーションがあります。LatencyReadToEmit<Variant>MilliSeconds: Spanner-read-timestamp-to-connector-emit の遅延。P50、P95、P99、平均、最小、最大値のバリエーションがあります。LatencyCommitToEmit<Variant>tMilliSeconds: Spanner-commit-timestamp-to-connector-emit の遅延。P50、P95、P99、平均、最小、最大値のバリエーションがあります。LatencyCommitToPublish<Variant>MilliSeconds: Spanner-commit-timestamp から Kafka-publish-timestamp までの遅延。P50、P95、P99、平均、最小、最大値のバリエーションがあります。NumberOfChangeStreamPartitionsDetected: 現在のコネクタタスクで検出されたパーティションの合計数。NumberOfChangeStreamQueriesIssued: 現在のタスクによって発行された変更ストリーム クエリの合計数。NumberOfActiveChangeStreamQueries: 現在のコネクタタスクで検出されたアクティブな変更ストリーム クエリの数。SpannerEventQueueCapacity: 変更ストリーム クエリから受信した要素を保存するキューであるStreamEventQueueの合計容量。SpannerEventQueueCapacity: 残りのStreamEventQueue容量。TaskStateChangeEventQueueCapacity: コネクタで発生したイベントを保存するキューであるTaskStateChangeEventQueueの合計容量。RemainingTaskStateChangeEventQueueCapacity: 残りのTaskStateChangeEventQueue容量。NumberOfActiveChangeStreamQueries: 現在のコネクタタスクで検出されたアクティブな変更ストリーム クエリの数。
Kafka コネクタの構成プロパティ
コネクタに必要な構成プロパティは次のとおりです。
name: コネクタの一意の名前。同じ名前で再度登録しようとすると、失敗します。このプロパティは、すべての Kafka Connect コネクタで必要です。connector.class: コネクタの Java クラスの名前。Kafka コネクタには常にio.debezium.connector.spanner.SpannerConnectorの値を使用します。tasks.max: このコネクタ用に作成するタスクの最大数。gcp.spanner.project.id: プロジェクト IDgcp.spanner.instance.id: Spanner インスタンス IDgcp.spanner.database.id: Spanner データベース IDgcp.spanner.change.stream: Spanner 変更ストリーム名gcp.spanner.credentials.json: サービス アカウント キーの JSON オブジェクト。gcp.spanner.credentials.path: サービス アカウント キーの JSON オブジェクトへのファイルパス。上記のフィールドが指定されていない場合は必須です。gcp.spanner.database.role: 使用する Spanner データベース ロール。これは、変更ストリームがきめ細かいアクセス制御で保護されている場合にのみ必要です。データベースのロールには、変更ストリームに対するSELECT権限と、変更ストリームの読み取り機能に対するEXECUTE権限が必要です。詳細については、変更ストリームのきめ細かなアクセス制御をご覧ください。
次の高度な構成プロパティには、ほとんどの状況で機能するデフォルト値が設定されているため、コネクタの構成で指定する必要はほとんどありません。
gcp.spanner.low-watermark.enabled: コネクタで低ウォーターマークが有効になっているかどうかを示します。デフォルトは false です。gcp.spanner.low-watermark.update-period.ms: 低ウォーターマークが更新される間隔。デフォルトは 1,000 ミリ秒です。heartbeat.interval.ms: Spanner ハートビート間隔。デフォルトは 300,000(5 分)です。gcp.spanner.start.time: コネクタの開始時刻。デフォルトは現在の時刻です。gcp.spanner.end.time: コネクタの終了時間。デフォルトは無限大です。tables.exclude.list: 変更イベントを除外するテーブル。デフォルトは空です。tables.include.list: 変更イベントを含めるテーブル。入力されていない場合、すべてのテーブルが対象となります。デフォルトは空です。gcp.spanner.stream.event.queue.capacity: Spanner イベントキューの容量。デフォルトは 10,000 です。connector.spanner.task.state.change.event.queue.capacity: タスク状態変更イベントキューの容量。デフォルトは 1,000 です。connector.spanner.max.missed.heartbeats: 例外がスローされる前に変更ストリーム クエリでハートビートが欠落する最大回数。デフォルトは 10 です。scaler.monitor.enabled: タスクの自動スケーリングが有効かどうかを示します。デフォルトは false です。tasks.desired.partitions: タスクあたりの変更ストリーム パーティションの推奨数。このパラメータは、タスクの自動スケーリングに必要です。デフォルトは 2 です。tasks.min: タスクの最小数。このパラメータは、タスクの自動スケーリングに必要です。デフォルトは 1 です。connector.spanner.sync.topic: 同期トピックの名前。タスク間の通信の保存に使用される内部コネクタ トピックです。名前を指定しなかった場合、デフォルトは_sync_topic_spanner_connector_connectornameです。connector.spanner.sync.poll.duration: 同期トピックのポーリング期間。デフォルトは 500 ミリ秒です。connector.spanner.sync.request.timeout.ms: 同期トピックへのリクエストのタイムアウト。デフォルトは 5,000 ミリ秒です。connector.spanner.sync.delivery.timeout.ms: 同期トピックへの公開のタイムアウト。デフォルトは 15,000 ミリ秒です。connector.spanner.sync.commit.offsets.interval.ms: 同期トピックのオフセットが commit される間隔。デフォルトは 60,000 ミリ秒です。connector.spanner.sync.publisher.wait.timeout: メッセージが同期トピックに公開される間隔。デフォルトは 5 ミリ秒です。connector.spanner.rebalancing.topic: 再調整トピックの名前。再調整トピックは、タスクの存続を確認するために使用される内部コネクタ トピックです。名前を指定しなかった場合、デフォルトは_rebalancing_topic_spanner_connector_connectornameです。connector.spanner.rebalancing.poll.duration: 再調整トピックのポーリング期間。デフォルトは 5,000 ミリ秒です。connector.spanner.rebalancing.commit.offsets.timeout: 再調整トピックのオフセットを commit するタイムアウト。デフォルトは 5,000 ミリ秒です。connector.spanner.rebalancing.commit.offsets.interval.ms: 同期トピックのオフセットが commit される間隔。デフォルトは 60,000 ミリ秒です。connector.spanner.rebalancing.task.waiting.timeout: タスクが再調整イベントを処理するまで待機する時間。デフォルトは 1,000 ミリ秒です。
構成可能なコネクタ プロパティの詳細なリストについては、GitHub リポジトリをご覧ください。
制限事項
コネクタは、ストリーミング スナップショット イベントをサポートしていません。
コネクタでウォーターマークが有効になっている場合、Debezium トピックのルーティング変換を構成することはできません。