MirrorMaker 2.0 は、Kafka クラスタ間でトピックを複製するツールです。次の MirrorMaker 2.0 コネクタを作成できます。
MirrorMaker 2.0 Source
MirrorMaker 2.0 Checkpoint
MirrorMaker 2.0 Heartbeat
MirrorMaker 2.0 ソースコネクタは、ソースクラスタからターゲット クラスタにデータをミラーリングするため、常に必要です。また、ACL も同期されます。MirrorMaker 2.0 のチェックポイント コネクタとハートビート コネクタは省略可能です。ソース コネクタを作成せずに、MirrorMaker 2.0 チェックポイント コネクタとハートビート コネクタを作成することもできます。
これらのコネクタの詳細については、コネクタの概要をご覧ください。
MirrorMaker 2.0 のクラスタロールについて
MirrorMaker 2.0 を構成する際は、Kafka クラスタが果たすさまざまな役割を理解することが重要です。
プライマリ クラスタ: Managed Service for Apache Kafka のコンテキストでは、Kafka Connect クラスタが直接接続されている Managed Service for Apache Kafka クラスタです。Connect クラスタは、MirrorMaker 2.0 コネクタ インスタンスをホストします。
セカンダリ クラスタ: レプリケーションに関与する他の Kafka クラスタです。別の Managed Service for Apache Kafka クラスタまたは外部クラスタを指定できます。たとえば、Compute Engine、GKE、オンプレミス、別のクラウドでセルフマネージドできます。
ソースクラスタ: MirrorMaker 2.0 がデータを複製する Kafka クラスタ。
ターゲット クラスタ: MirrorMaker 2.0 がデータを複製する Kafka クラスタです。
プライマリ クラスタは、移行元または移行先として機能します。
プライマリ クラスタがソースの場合、セカンダリ クラスタはターゲットになります。データはプライマリ クラスタからセカンダリ クラスタに流れます。
プライマリ クラスタがターゲットの場合、セカンダリ クラスタはソースになります。データはセカンダリ クラスタからプライマリ クラスタに流れます。
書き込みオペレーションのレイテンシを最小限に抑えるには、ターゲット クラスタをプライマリ クラスタとして指定し、Connect クラスタをターゲット クラスタと同じリージョンに配置することをおすすめします。
コネクタのすべてのプロパティを正しく構成する必要があります。これには、セカンダリ クラスタを対象とするプロデューサー認証プロパティも含まれます。潜在的な問題の詳細については、MirrorMaker 2.0 クライアント構成を改善するをご覧ください。
始める前に
MirrorMaker 2.0 コネクタを作成するには、次の操作を行います。
Managed Service for Apache Kafka クラスタ(プライマリ)を作成します。このクラスタは、MirrorMaker 2.0 コネクタのエンドポイントの 1 つとして機能します。
セカンダリ Kafka クラスタを作成します。このクラスタは、もう一方のエンドポイントとして機能します。別の Managed Service for Apache Kafka クラスタ、または外部またはセルフマネージドの Kafka クラスタを指定できます。複数の Kafka クラスタを Connect クラスタのセカンダリ Kafka クラスタとして構成できます。
MirrorMaker 2.0 コネクタをホストする Connect クラスタを作成します。
セカンダリ Kafka クラスタの DNS ドメインが構成されていることを確認します。
Private Service Connect インターフェースが送信元と宛先の両方の Kafka クラスタに到達できるように、ファイアウォール ルールを構成します。
ソースまたはターゲットの Kafka クラスタのいずれかにインターネット経由でアクセスする場合は、Connect ワーカーがインターネットにアクセスできるように Cloud NAT を構成します。
セカンダリ クラスタに外部またはセルフマネージドの Kafka クラスタが含まれている場合は、必要な認証情報が Secret リソースとして構成されていることを確認します。
ネットワーク要件の詳細については、ワーカー サブネットをご覧ください。
必要なロールと権限
MirrorMaker 2.0 コネクタの作成に必要な権限を取得するには、プロジェクトに対する Managed Kafka Connector 編集者 (roles/managedkafka.connectorEditor)IAM ロールを付与するよう管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。
この事前定義ロールには、MirrorMaker 2.0 コネクタの作成に必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。
必要な権限
MirrorMaker 2.0 コネクタを作成するには、次の権限が必要です。
-
親 Connect クラスタでコネクタの作成権限を付与します。
managedkafka.connectors.create
カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。
Managed Kafka Connector 編集者ロールの詳細については、Managed Service for Apache Kafka の事前定義ロールをご覧ください。
別のプロジェクトに MirrorMaker 2.0 コネクタを作成する
プライマリ Managed Service for Apache Kafka クラスタが、MirrorMaker 2.0 コネクタを実行する Connect クラスタとは異なるプロジェクトにある場合は、別のプロジェクトに Connect クラスタを作成するをご覧ください。
セルフマネージド セカンダリ Kafka クラスタに接続する
セルフマネージドのセカンダリ Kafka クラスタに接続する場合は、ネットワーキングと認証に注意してください。
ネットワーキング: Connect クラスタの VPC ネットワークと、セルフマネージド クラスタまたは外部クラスタをホストするネットワーク間の接続を許可するように、適切な VPC ネットワーク設定とファイアウォール ルールが構成されていることを確認します。
VPC 内のクラスタについては、VPC ネットワークの作成と管理をご覧ください。
オンプレミス環境や他のクラウド環境に接続する場合は、Cloud VPN や Cloud Interconnect などのソリューションを検討してください。オンプレミス Kafka への接続に関する具体的なガイダンスもご覧ください。
認証と暗号化: Connect クラスタは、セルフマネージド クラスタまたは外部クラスタ(必要な場合)で認証を行い、TLS 暗号化を処理する必要があります。Kafka 認証の一般的な情報については、Apache Kafka のセキュリティに関するドキュメントをご覧ください。
認証情報に Secret Manager を使用する
Connect クラスタは Secret Manager と直接統合されます。パスワード、セルフマネージド クラスタまたは外部クラスタへの接続に必要なトラストストアとキーストアの内容など、機密性の高い構成値はすべて Secret Manager のシークレットとして保存します。
Connect クラスタ サービス アカウントに付与されたシークレットは、コネクタのランタイム環境内の
/var/secrets/ディレクトリにあるファイルとして自動的にマウントされます。ファイル名はパターン
{PROJECT_NAME}-{SECRET_NAME}-{SECRET_VERSION}に従います。プロジェクト番号ではなく、プロジェクト名を使用する必要があります。シークレットの参照方法は、Kafka プロパティがシークレットのパスワードを想定しているか、ファイルのパスを想定しているかによって異なります。
パスワードには、Kafka の
DirectoryConfigProviderプロパティを使用します。値は${directory:/var/secrets}:{SECRET_FILENAME}の形式で指定します。例:password=${directory:/var/secrets}:my-project-db-password-1ファイルパスの場合は、マウントされたシークレット ファイルへの直接パスを指定します。例:
ssl.truststore.location=/var/secrets/my-project-kafka-truststore-3
Connect クラスタの作成時にアクセス権を付与してシークレットを構成する方法の詳細については、Secret Manager シークレットを構成するをご覧ください。
MirrorMaker ソース コネクタの仕組み
MirrorMaker ソースコネクタは、ソースクラスタの 1 つ以上の Kafka トピックからデータを取得し、そのデータを ACL とともにターゲット クラスタのトピックに複製します。
MirrorMaker ソース コネクタがデータを複製する仕組みの詳細を以下に示します。
コネクタは、ソース クラスタ内の指定された Kafka トピックからメッセージを消費します。レプリケートするトピックは、
topics構成プロパティを使用して指定します。このプロパティには、カンマ区切りのトピック名または単一の Java スタイルの正規表現を指定できます。たとえば、topic-a,topic-bやmy-prefix-.*です。コネクタは、
topics.excludeプロパティを使用して指定した特定のトピックの複製をスキップすることもできます。除外は包含よりも優先されます。コネクタは、使用されたメッセージをターゲット クラスタに書き込みます。
コネクタには、
source.cluster.bootstrap.serversやtarget.cluster.bootstrap.serversなどの移行元クラスタと移行先クラスタの接続の詳細が必要です。コネクタには、
source.cluster.aliasとtarget.cluster.aliasで指定されたソース クラスタとターゲット クラスタのエイリアスも必要です。デフォルトでは、複製されたトピックはソース エイリアスを使用して自動的に名前変更されます。たとえば、エイリアスprimaryのソースのordersという名前のトピックは、ターゲットでprimary.ordersになります。レプリケートされたトピックに関連付けられている ACL も、ソースクラスタからターゲット クラスタに同期されます。これは、
sync.topic.acls.enabledプロパティを使用して無効にできます。クラスタで必要な場合は、移行元クラスタと移行先クラスタの両方に接続するための認証の詳細を構成で指定する必要があります。送信元には
source.cluster.、ターゲットにはtarget.cluster.を接頭辞として、security.protocol、sasl.mechanism、sasl.jaas.configなどのプロパティを構成する必要があります。コネクタは内部トピックに依存しています。これらのプロパティ(
offset-syncs.topic.replication.factorなど)を構成する必要がある場合があります。コネクタは、Kafka レコード コンバータ
key.converter、value.converter、header.converterを使用します。直接レプリケーションの場合、通常はorg.apache.kafka.connect.converters.ByteArrayConverterがデフォルトで設定されます。これは変換を行いません(パススルー)。tasks.maxプロパティは、コネクタの並列処理のレベルを制御します。tasks.maxを増やすとスループットが向上する可能性がありますが、有効な並列処理は、レプリケートされるソース Kafka トピックのパーティション数によって制限されることがよくあります。
MirrorMaker 2.0 コネクタのプロパティ
MirrorMaker 2.0 コネクタを作成または更新するときに、次のプロパティを指定します。
コネクタ名
コネクタの名前または ID。リソースの命名方法のガイドラインについては、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。名前は変更できません。
コネクタの種類
コネクタのタイプは次のいずれかである必要があります。
プライマリ Kafka クラスタ
Managed Service for Apache Kafka クラスタ。このフィールドは自動的に入力されます。
プライマリ Kafka クラスタをターゲット クラスタとして使用する: このオプションを選択すると、別の Kafka クラスタからプライマリ Managed Service for Apache Kafka クラスタにデータが移動します。
プライマリ Kafka クラスタをソースクラスタとして使用する: このオプションを選択すると、プライマリ Managed Service for Apache Kafka クラスタから別の Kafka クラスタにデータが移動します。
ターゲット クラスタまたはソースクラスタ
パイプラインのもう一方の端を形成するセカンダリ Kafka クラスタ。
Managed Service for Apache Kafka クラスタ: プルダウン メニューからクラスタを選択します。
セルフマネージドまたは外部の Kafka クラスタ: ブートストラップ アドレスを
hostname:port_number形式で入力します。例:kafka-test:9092。
トピック名または正規表現
複製するトピック。個々の名前(topic1、topic2)を指定するか、正規表現(topic.*)を使用します。このプロパティは、MirrorMaker 2.0 ソース コネクタに必要です。デフォルト値は .* です。
コンシューマー グループ名または正規表現
複製するコンシューマー グループ。個々の名前(group1、group2)を指定するか、正規表現(group.*)を使用します。このプロパティは、MirrorMaker 2.0 チェックポイント コネクタに必要です。デフォルト値は .* です。
構成
このセクションでは、MirrorMaker 2.0 コネクタのコネクタ固有の追加の構成プロパティを指定できます。
Kafka トピックのデータは、Avro、JSON、未加工のバイトなど、さまざまな形式で指定できるため、構成の重要な部分としてコンバータの指定があります。コンバータは、Kafka トピックで使用される形式のデータを Kafka Connect の標準化された内部形式に変換します。
Kafka Connect におけるコンバータの役割、サポートされているコンバータのタイプ、一般的な構成オプションの詳細については、コンバータをご覧ください。
すべての MirrorMaker 2.0 コネクタの一般的な構成は次のとおりです。
source.cluster.alias: ソースクラスタのエイリアス。target.cluster.alias: ターゲット クラスタのエイリアス。
データの複製時に特定のリソースを除外するために使用される構成:
topics.exclude: 除外されたトピック。カンマ区切りのトピック名と正規表現をサポートします。除外は包含よりも優先されます。MirrorMaker 2.0 Source コネクタに使用されます。デフォルト値はmm2.*.internal,.*.replica,__.*です。groups.exclude: グループを除外します。カンマ区切りのグループ ID と正規表現をサポートします。除外は包含よりも優先されます。MirrorMaker 2.0 チェックポイント コネクタに使用されます。デフォルト値はconsole-consumer-.*,connect-.*,__.*です。
MirrorMaker 2.0 コネクタには認証構成が必要です。
移行元または移行先の Kafka クラスタが Managed Service for Apache Kafka クラスタの場合、Connect クラスタは OAuthBearer を使用して認証を行います。認証構成は事前構成されているため、構成を手動で設定する必要はありません。
セルフマネージドまたはオンプレミスの Kafka クラスタの場合、認証構成は Kafka クラスタがサポートする認証メカニズムによって異なります。ソース Kafka クラスタ構成の認証構成の例は次のとおりです。
source.cluster.security.protocol=SASL_SSL
source.cluster.sasl.mechanism=OAUTHBEARER
source.cluster.sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
source.cluster.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
ターゲット Kafka クラスタ構成の認証構成の例は次のとおりです。
target.cluster.security.protocol=SASL_SSL
target.cluster.sasl.mechanism=OAUTHBEARER
target.cluster.sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
target.cluster.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
使用可能な構成プロパティは、コネクタによって異なります。サポートされている構成例を確認するには、サポートされている MirrorMaker 2.0 コネクタのバージョンを確認してください。次のドキュメントをご覧ください。
Kafka レコードの変換
Kafka Connect は、キーと値のデフォルトのコンバータとして org.apache.kafka.connect.converters.ByteArrayConverter を使用します。これにより、変換を行わないパススルー オプションが提供されます。
他のコンバータを使用するように header.converter、key.converter、value.converter を構成できます。
タスク数
tasks.max 値は、MirrorMaker コネクタの実行に Kafka Connect が使用する最大タスク数を構成します。コネクタの並列処理のレベルを制御します。タスク数を増やすとスループットが向上する可能性がありますが、Kafka トピック パーティションの数などの要因によって制限されます。
MirrorMaker 2.0 ソースコネクタを作成する
コネクタを作成する前に、コネクタのプロパティのドキュメントを確認してください。
コンソール
Google Cloud コンソールで、[クラスタを接続] ページに移動します。
コネクタを作成する Connect クラスタをクリックします。
[クラスタの詳細を接続] ページが表示されます。
[コネクタを作成] をクリックします。
[Kafka コネクタの作成] ページが表示されます。
[コネクタ名] に文字列を入力します。
コネクタの命名方法の詳細については、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。
[コネクタ プラグイン] で、[MirrorMaker 2.0 ソース] を選択します。
[プライマリ Kafka クラスタ] で、次のいずれかのオプションを選択します。
- プライマリ Kafka クラスタをソースクラスタとして使用する: Managed Service for Apache Kafka クラスタからデータを移動します。
- プライマリ Kafka クラスタをターゲット クラスタとして使用する: データを Managed Service for Apache Kafka クラスタに移動します。
[移行先クラスタ] または [移行元クラスタ] で、次のいずれかを選択します。
- Managed Service for Apache Kafka クラスタ: メニューから選択します。
- セルフマネージドまたは外部 Kafka クラスタ:
hostname:port_numberの形式でブートストラップ アドレスを入力します。
カンマ区切りのトピック名またはトピックの正規表現を入力します。
必要なセキュリティ設定を含む、構成を確認して調整します。
構成と認証の詳細については、構成をご覧ください。
[タスクの再起動ポリシー] を選択します。詳細については、タスクの再起動ポリシーをご覧ください。
[作成] をクリックします。
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
gcloud managed-kafka connectors createコマンドを実行します。gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILE次のように置き換えます。
CONNECTOR_ID: コネクタの ID または名前。コネクタの命名方法のガイドラインについては、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。コネクタの名前は変更できません。
LOCATION: コネクタを作成するロケーション。これは、Connect クラスタを作成したロケーションと同じである必要があります。
CONNECT_CLUSTER_ID: コネクタが作成される Connect クラスタの ID。
CONFIG_FILE: BigQuery Sink コネクタの YAML 構成ファイルへのパス。
MirrorMaker 2.0 Source コネクタの構成ファイルの例を次に示します。
connector.class: "org.apache.kafka.connect.mirror.MirrorSourceConnector" name: "MM2_CONNECTOR_ID" source.cluster.alias: "source" target.cluster.alias: "target" topics: "GMK_TOPIC_NAME" source.cluster.bootstrap.servers: "GMK_SOURCE_CLUSTER_DNS" target.cluster.bootstrap.servers: "GMK_TARGET_CLUSTER_DNS" offset-syncs.topic.replication.factor: "1" source.cluster.security.protocol: "SASL_SSL" source.cluster.sasl.mechanism: "OAUTHBEARER" source.cluster.sasl.login.callback.handler.class: com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler source.cluster.sasl.jaas.config: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required; target.cluster.security.protocol: "SASL_SSL" target.cluster.sasl.mechanism: "OAUTHBEARER" target.cluster.sasl.login.callback.handler.class: "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler" target.cluster.sasl.jaas.config: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
Terraform
Terraform リソースを使用してコネクタを作成できます。
Terraform 構成を適用または削除する方法については、基本的な Terraform コマンドをご覧ください。
Go
このサンプルを試す前に、 クライアント ライブラリをインストールするにある Go の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Go API のリファレンス ドキュメントをご覧ください。
Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報(ADC)を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。
Java
このサンプルを試す前に、 クライアント ライブラリをインストールするにある Java の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Java API リファレンス ドキュメントをご覧ください。
Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、 ローカル開発環境の ADC の設定をご覧ください。
Python
このサンプルを試す前に、 クライアント ライブラリをインストールするの Python の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Python API リファレンス ドキュメントをご覧ください。
Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。