MirrorMaker 2.0 コネクタは、Kafka クラスタ間でデータを複製します。MirrorMaker 2.0 コネクタを使用すると、Kafka クラスタ間で障害復旧を実行し、Kafka ベースのアプリケーションで高可用性とフォールト トレランスを実現できます。
MirrorMaker 2.0 コネクタは、2 つの Managed Service for Apache Kafka クラスタ間、または Managed Service for Apache Kafka クラスタと外部またはセルフマネージド Kafka クラスタ間の接続を確立できます。
MirrorMaker 2.0 コネクタのユースケースには、次のようなものがあります。
データの移行。Kafka ワークロードを新しい Managed Service for Apache Kafka クラスタに移行します。
障害復旧。障害が発生した場合にビジネスの継続性を確保するために、バックアップ クラスタを作成します。
データ集計。複数の Kafka クラスタのデータを中央の Managed Service for Apache Kafka クラスタに統合して、分析を実行します。
MirrorMaker 2.0 コネクタのタイプ
Managed Service for Apache Kafka は、次の MirrorMaker 2.0 コネクタ タイプを提供します。
MirrorMaker 2.0 ソースコネクタ
MirrorMaker 2.0 ソースコネクタは、Kafka クラスタ(ソース)間でトピックとデータの複製を行います。
このコネクタは、次の移行シナリオで使用します。
外部またはセルフマネージド Kafka クラスタから Managed Service for Apache Kafka クラスタにデータを複製または移行します。
Managed Service for Apache Kafka クラスタから外部またはセルフマネージド Kafka クラスタにデータを複製または移行します。
障害復旧と高可用性の要件を満たすために、複数のリージョン間で Kafka データを複製します。
Kafka クラスタ間の基本的なデータ複製には、MirrorMaker 2.0 ソースコネクタを単独で使用できます。他の MirrorMaker 2.0 コネクタは、データ複製用の追加機能を提供します。
MirrorMaker 2.0 チェックポイント コネクタ
MirrorMaker 2.0 チェックポイント コネクタは、Kafka クラスタ間でコンシューマー オフセットをコピーします。コンシューマー オフセットは、パーティション内で最後に正常に消費されたメッセージを示します。オフセットを複製すると、ターゲット クラスタのコンシューマーがソース クラスタと同じポイントから処理を再開できます。
このコネクタを使用すると、次のユースケースが可能になります。
移行元クラスタから移行先クラスタへの切り替え時のダウンタイムを最小限に抑えます。
クラスタ間で一貫したコンシューマー状態を提供することで、シームレスなフェイルオーバーを実現します。
データをターゲット クラスタに移動するときに、コンシューマーの進行状況を保持します。
MirrorMaker 2.0 ハートビート コネクタ
MirrorMaker 2.0 ハートビート コネクタは、Kafka クラスタで定期的なハートビート メッセージを生成します。コネクタは、これらのメッセージをクラスタ内の専用トピックに書き込みます。通常、このトピックは heartbeats という名前です。
MirrorMaker 2.0 ハートビート コネクタを構成したら、MirrorMaker 2.0 ソース コネクタを使用して heartbeats トピックをターゲット クラスタに複製できます。複製されたハートビートをモニタリングすることで、次のユースケースを実装できます。
クラスタ間のデータ レプリケーションのステータスとパフォーマンスをモニタリングします。
他のデータが生成されていない場合でも、クラスタ間の接続とデータフローを確認します。
Cloud Monitoring でアラートを構成して、ハートビートのレプリケーションが停止した場合に通知を受け取れるようにします。
Heartbeat コネクタを単独で使用する場合、レプリケーションは自動的にモニタリングされません。heartbeats トピックを複製し、ターゲット クラスタに到着するハートビート メッセージを監視する必要があります。
MirrorMaker 2.0 のクラスタロールについて
MirrorMaker 2.0 を構成する際は、Kafka クラスタが果たすさまざまな役割を理解することが重要です。
プライマリ クラスタ: Managed Service for Apache Kafka のコンテキストでは、Kafka Connect クラスタが直接接続されている Managed Service for Apache Kafka クラスタです。Connect クラスタは、MirrorMaker 2.0 コネクタ インスタンスをホストします。
セカンダリ クラスタ: レプリケーションに関与する他の Kafka クラスタです。別の Managed Service for Apache Kafka クラスタまたは外部クラスタにできます。たとえば、Compute Engine、GKE、オンプレミス、別のクラウドでセルフマネージドできます。
ソースクラスタ: MirrorMaker 2.0 がデータを複製する Kafka クラスタ。
ターゲット クラスタ: MirrorMaker 2.0 がデータを複製する Kafka クラスタです。
プライマリ クラスタは、移行元または移行先として機能します。
プライマリ クラスタがソースの場合、セカンダリ クラスタはターゲットになります。データはプライマリ クラスタからセカンダリ クラスタに流れます。
プライマリ クラスタがターゲットの場合、セカンダリ クラスタはソースになります。データはセカンダリ クラスタからプライマリ クラスタに流れます。
書き込みオペレーションのレイテンシを最小限に抑えるには、ターゲット クラスタをプライマリ クラスタとして指定し、Connect クラスタをターゲット クラスタと同じリージョンに配置することをおすすめします。
コネクタのすべてのプロパティを正しく構成する必要があります。これには、セカンダリ クラスタを対象とするプロデューサー認証プロパティも含まれます。潜在的な問題の詳細については、MirrorMaker 2.0 クライアント構成を改善するをご覧ください。
始める前に
MirrorMaker 2.0 コネクタを作成するには、次の操作を行います。
Managed Service for Apache Kafka クラスタ(プライマリ)を作成します。このクラスタは、MirrorMaker 2.0 コネクタのエンドポイントの 1 つとして機能します。
セカンダリ Kafka クラスタを作成します。このクラスタは、もう一方のエンドポイントとして機能します。別の Managed Service for Apache Kafka クラスタ、または外部またはセルフマネージドの Kafka クラスタを指定できます。複数の Kafka クラスタを Connect クラスタのセカンダリ Kafka クラスタとして構成できます。
MirrorMaker 2.0 コネクタをホストする Connect クラスタを作成します。
セカンダリ Kafka クラスタの DNS ドメインが構成されていることを確認します。
Private Service Connect インターフェースが送信元と宛先の両方の Kafka クラスタに到達できるように、ファイアウォール ルールを構成します。
ソースまたはターゲットの Kafka クラスタにインターネット経由でアクセスする場合は、Connect ワーカーがインターネットにアクセスできるように Cloud NAT を構成します。
セカンダリ クラスタに外部またはセルフマネージドの Kafka クラスタが含まれている場合は、必要な認証情報が Secret リソースとして構成されていることを確認します。
ネットワーク要件の詳細については、ワーカー サブネットをご覧ください。
必要なロールと権限
コネクタの作成に必要な権限を取得するには、プロジェクトに対する Managed Kafka Connector 編集者 (roles/managedkafka.connectorEditor)IAM ロールを付与するよう管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。
この事前定義ロールには、コネクタの作成に必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。
必要な権限
コネクタを作成するには、次の権限が必要です。
-
コネクタを作成します。
managedkafka.connectors.create
カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。
別のプロジェクトに MirrorMaker 2.0 コネクタを作成する
プライマリ Managed Service for Apache Kafka クラスタが、MirrorMaker 2.0 コネクタを実行する Connect クラスタとは異なるプロジェクトにある場合は、別のプロジェクトに Connect クラスタを作成するをご覧ください。
セルフマネージド セカンダリ Kafka クラスタに接続する
セルフマネージドのセカンダリ Kafka クラスタに接続する場合は、ネットワーキングと認証に注意してください。
ネットワーキング: Connect クラスタの VPC ネットワークと、セルフマネージド クラスタまたは外部クラスタをホストするネットワーク間の接続を許可するように、適切な VPC ネットワーク設定とファイアウォール ルールが構成されていることを確認します。
VPC 内のクラスタについては、VPC ネットワークの作成と管理をご覧ください。
オンプレミス環境や他のクラウド環境に接続する場合は、Cloud VPN や Cloud Interconnect などのソリューションを検討してください。オンプレミス Kafka への接続に関する具体的なガイダンスもご覧ください。
認証と暗号化: Connect クラスタは、セルフマネージド クラスタまたは外部クラスタ(必要な場合)で認証を行い、TLS 暗号化を処理する必要があります。Kafka 認証の一般的な情報については、Apache Kafka のセキュリティに関するドキュメントをご覧ください。
認証情報に Secret Manager を使用する
Connect クラスタは Secret Manager と直接統合されます。パスワード、セルフマネージド クラスタまたは外部クラスタへの接続に必要なトラストストアとキーストアの内容など、機密性の高い構成値はすべて Secret Manager のシークレットとして保存します。
Connect クラスタ サービス アカウントに付与されたシークレットは、コネクタのランタイム環境内の
/var/secrets/ディレクトリにあるファイルとして自動的にマウントされます。ファイル名はパターン
{PROJECT_NAME}-{SECRET_NAME}-{SECRET_VERSION}に従います。プロジェクト番号ではなく、プロジェクト名を使用する必要があります。シークレットの参照方法は、Kafka プロパティがシークレットのパスワードを想定しているか、ファイルへのパスを想定しているかによって異なります。
パスワードには、Kafka の
DirectoryConfigProviderプロパティを使用します。値は${directory:/var/secrets}:{SECRET_FILENAME}の形式で指定します。例:password=${directory:/var/secrets}:my-project-db-password-1ファイルパスの場合は、マウントされたシークレット ファイルへの直接パスを指定します。例:
ssl.truststore.location=/var/secrets/my-project-kafka-truststore-3
Connect クラスタの作成時にアクセス権を付与してシークレットを構成する方法の詳細については、Secret Manager シークレットを構成するをご覧ください。
MirrorMaker ソース コネクタの仕組み
MirrorMaker ソースコネクタは、ソースクラスタの 1 つ以上の Kafka トピックからデータを取得し、そのデータを ACL とともにターゲット クラスタのトピックに複製します。
MirrorMaker ソース コネクタがデータを複製する仕組みの詳細な内訳は次のとおりです。
コネクタは、ソース クラスタ内の指定された Kafka トピックからメッセージを消費します。レプリケートするトピックは、
topics構成プロパティを使用して指定します。このプロパティには、カンマ区切りのトピック名または単一の Java スタイルの正規表現を指定できます。たとえば、topic-a,topic-bやmy-prefix-.*です。コネクタは、
topics.excludeプロパティを使用して指定した特定のトピックの複製をスキップすることもできます。除外は包含よりも優先されます。コネクタは、使用されたメッセージをターゲット クラスタに書き込みます。
コネクタには、
source.cluster.bootstrap.serversやtarget.cluster.bootstrap.serversなどの移行元クラスタと移行先クラスタの接続の詳細が必要です。コネクタには、
source.cluster.aliasとtarget.cluster.aliasで指定されたソース クラスタとターゲット クラスタのエイリアスも必要です。デフォルトでは、複製されたトピックはソース エイリアスを使用して自動的に名前変更されます。たとえば、エイリアスprimaryのソースのordersという名前のトピックは、ターゲットでprimary.ordersになります。レプリケートされたトピックに関連付けられている ACL も、ソースクラスタからターゲット クラスタに同期されます。これは、
sync.topic.acls.enabledプロパティを使用して無効にできます。クラスタで必要な場合は、ソースクラスタとターゲット クラスタの両方に接続するための認証の詳細を構成で指定する必要があります。送信元には
source.cluster.、ターゲットにはtarget.cluster.を接頭辞として、security.protocol、sasl.mechanism、sasl.jaas.configなどのプロパティを構成する必要があります。コネクタは内部トピックに依存しています。これらのプロパティ(
offset-syncs.topic.replication.factorなど)を構成する必要がある場合があります。コネクタは Kafka レコード コンバータ
key.converter、value.converter、header.converterを使用します。直接レプリケーションの場合、通常はorg.apache.kafka.connect.converters.ByteArrayConverterがデフォルトで設定され、変換は行われません(パススルー)。tasks.maxプロパティは、コネクタの並列処理のレベルを制御します。tasks.maxを増やすと、スループットが向上する可能性がありますが、有効な並列処理は、レプリケートされるソース Kafka トピックのパーティション数によって制限されることがよくあります。
MirrorMaker 2.0 コネクタのプロパティ
MirrorMaker 2.0 コネクタを作成または更新するときに、次のプロパティを指定します。
コネクタ名
コネクタの名前または ID。リソースの命名方法のガイドラインについては、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。名前は変更できません。
コネクタの種類
コネクタのタイプは次のいずれかである必要があります。
プライマリ Kafka クラスタ
Managed Service for Apache Kafka クラスタ。このフィールドは自動的に入力されます。
プライマリ Kafka クラスタをターゲット クラスタとして使用する: このオプションを選択すると、別の Kafka クラスタからプライマリ Managed Service for Apache Kafka クラスタにデータが移動します。
プライマリ Kafka クラスタをソースクラスタとして使用する: プライマリ Managed Service for Apache Kafka クラスタから別の Kafka クラスタにデータを移動する場合は、このオプションを選択します。
ターゲット クラスタまたはソースクラスタ
パイプラインのもう一方の端を形成するセカンダリ Kafka クラスタ。
Managed Service for Apache Kafka クラスタ: プルダウン メニューからクラスタを選択します。
セルフマネージドまたは外部の Kafka クラスタ: ブートストラップ アドレスを
hostname:port_number形式で入力します。例:kafka-test:9092。
トピック名または正規表現
複製するトピック。個々の名前(topic1、topic2)を指定するか、正規表現(topic.*)を使用します。このプロパティは、MirrorMaker 2.0 ソースコネクタに必要です。デフォルト値は .* です。
コンシューマー グループ名または正規表現
複製するコンシューマー グループ。個々の名前(group1、group2)を指定するか、正規表現(group.*)を使用します。このプロパティは、MirrorMaker 2.0 チェックポイント コネクタに必要です。デフォルト値は .* です。
構成
このセクションでは、MirrorMaker 2.0 コネクタのコネクタ固有の追加の構成プロパティを指定できます。
Kafka トピックのデータは、Avro、JSON、未加工のバイトなど、さまざまな形式で指定できるため、構成の重要な部分としてコンバータの指定があります。コンバータは、Kafka トピックで使用される形式のデータを Kafka Connect の標準化された内部形式に変換します。
Kafka Connect におけるコンバータの役割、サポートされているコンバータのタイプ、一般的な構成オプションの詳細については、コンバータをご覧ください。
すべての MirrorMaker 2.0 コネクタの一般的な構成は次のとおりです。
source.cluster.alias: ソースクラスタのエイリアス。target.cluster.alias: ターゲット クラスタのエイリアス。
データの複製時に特定のリソースを除外するために使用される構成:
topics.exclude: 除外されたトピック。カンマ区切りのトピック名と正規表現をサポートします。除外は包含よりも優先されます。MirrorMaker 2.0 Source コネクタに使用されます。デフォルト値はmm2.*.internal,.*.replica,__.*です。groups.exclude: グループを除外します。カンマ区切りのグループ ID と正規表現をサポートします。除外は包含よりも優先されます。MirrorMaker 2.0 チェックポイント コネクタに使用されます。デフォルト値はconsole-consumer-.*,connect-.*,__.*です。
MirrorMaker 2.0 コネクタには認証構成が必要です。
ソースまたはターゲットの Kafka クラスタが Managed Service for Apache Kafka クラスタの場合、Connect クラスタは OAuthBearer を使用して認証します。認証構成は事前構成されているため、構成を手動で設定する必要はありません。
セルフマネージドまたはオンプレミスの Kafka クラスタの場合、認証構成は Kafka クラスタがサポートする認証メカニズムによって異なります。ソース Kafka クラスタ構成の認証構成の例は次のとおりです。
source.cluster.security.protocol=SASL_SSL
source.cluster.sasl.mechanism=OAUTHBEARER
source.cluster.sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
source.cluster.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
ターゲット Kafka クラスタ構成の認証構成の例は次のとおりです。
target.cluster.security.protocol=SASL_SSL
target.cluster.sasl.mechanism=OAUTHBEARER
target.cluster.sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
target.cluster.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
使用可能な構成プロパティは、特定のコネクタによって異なります。サポートされている構成例を確認するには、サポートされている MirrorMaker 2.0 コネクタのバージョンを確認してください。次のドキュメントをご覧ください。
Kafka レコードの変換
Kafka Connect は、キーと値のデフォルトのコンバータとして org.apache.kafka.connect.converters.ByteArrayConverter を使用します。これにより、変換を行わないパススルー オプションが提供されます。
他のコンバータを使用するように header.converter、key.converter、value.converter を構成できます。
タスク数
tasks.max 値は、MirrorMaker コネクタの実行に Kafka Connect が使用する最大タスク数を構成します。コネクタの並列処理のレベルを制御します。タスク数を増やすとスループットが向上する可能性がありますが、Kafka トピック パーティションの数などの要因によって制限されます。
MirrorMaker 2.0 Source コネクタを作成する
コネクタを作成する前に、コネクタのプロパティのドキュメントを確認してください。
コンソール
Google Cloud コンソールで、[クラスタを接続] ページに移動します。
コネクタを作成する Connect クラスタをクリックします。
[クラスタの詳細を接続] ページが表示されます。
[コネクタを作成] をクリックします。
[Kafka コネクタの作成] ページが表示されます。
[コネクタ名] に文字列を入力します。
コネクタの命名方法の詳細については、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。
[コネクタ プラグイン] で、[MirrorMaker 2.0 ソース] を選択します。
[プライマリ Kafka クラスタ] で、次のいずれかのオプションを選択します。
- プライマリ Kafka クラスタをソースクラスタとして使用する: Managed Service for Apache Kafka クラスタからデータを移動します。
- プライマリ Kafka クラスタをターゲット クラスタとして使用する: データを Managed Service for Apache Kafka クラスタに移動します。
[ターゲット クラスタ] または [ソース クラスタ] で、次のいずれかを選択します。
- Managed Service for Apache Kafka クラスタ: メニューから選択します。
- セルフマネージドまたは外部 Kafka クラスタ:
hostname:port_number形式でブートストラップ アドレスを入力します。
カンマ区切りのトピック名またはトピックの正規表現を入力します。
必要なセキュリティ設定を含む、構成を確認して調整します。
構成と認証の詳細については、構成をご覧ください。
[タスクの再起動ポリシー] を選択します。詳細については、タスクの再起動ポリシーをご覧ください。
[作成] をクリックします。
gcloud
-
Google Cloud コンソールで Cloud Shell をアクティブにします。
Google Cloud コンソールの下部にある Cloud Shell セッションが開始し、コマンドライン プロンプトが表示されます。Cloud Shell はシェル環境です。Google Cloud CLI がすでにインストールされており、現在のプロジェクトの値もすでに設定されています。セッションが初期化されるまで数秒かかることがあります。
gcloud managed-kafka connectors createコマンドを実行します。gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILE次のように置き換えます。
CONNECTOR_ID: コネクタの ID または名前。コネクタの命名方法のガイドラインについては、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。コネクタの名前は変更できません。
LOCATION: コネクタを作成するロケーション。これは、Connect クラスタを作成したロケーションと同じである必要があります。
CONNECT_CLUSTER_ID: コネクタが作成される Connect クラスタの ID。
CONFIG_FILE: コネクタの YAML 構成ファイルへのパス。
MirrorMaker 2.0 Source コネクタの構成ファイルの例を次に示します。
connector.class: "org.apache.kafka.connect.mirror.MirrorSourceConnector" name: "MM2_CONNECTOR_ID" source.cluster.alias: "source" target.cluster.alias: "target" topics: "GMK_TOPIC_NAME" source.cluster.bootstrap.servers: "GMK_SOURCE_CLUSTER_DNS" target.cluster.bootstrap.servers: "GMK_TARGET_CLUSTER_DNS" offset-syncs.topic.replication.factor: "1" source.cluster.security.protocol: "SASL_SSL" source.cluster.sasl.mechanism: "OAUTHBEARER" source.cluster.sasl.login.callback.handler.class: com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler source.cluster.sasl.jaas.config: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required; target.cluster.security.protocol: "SASL_SSL" target.cluster.sasl.mechanism: "OAUTHBEARER" target.cluster.sasl.login.callback.handler.class: "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler" target.cluster.sasl.jaas.config: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
Terraform
Terraform リソースを使用してコネクタを作成できます。
Terraform 構成を適用または削除する方法については、基本的な Terraform コマンドをご覧ください。
Go
このサンプルを試す前に、 クライアント ライブラリをインストールするにある Go の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Go API リファレンス ドキュメントをご覧ください。
Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報(ADC)を設定します。詳細については、ローカル開発環境の ADC を設定するをご覧ください。
Java
このサンプルを試す前に、 クライアント ライブラリをインストールするにある Java の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Java API リファレンス ドキュメントをご覧ください。
Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、 ローカル開発環境の ADC の設定をご覧ください。
Python
このサンプルを試す前に、 クライアント ライブラリをインストールするの Python の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Python API リファレンス ドキュメントをご覧ください。
Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。