Cloud Storage Sink コネクタを使用すると、Kafka トピックから Cloud Storage バケットにデータをストリーミングできます。これは、大量のデータを費用対効果が高くスケーラブルな方法で保存して処理する場合に便利です。
始める前に
Cloud Storage Sink コネクタを作成する前に、次のことを確認してください。
Connect クラスタの Managed Service for Apache Kafka クラスタを作成します。これは、Connect クラスタに関連付けられているプライマリ Kafka クラスタです。これは、コネクタ パイプラインの一端を形成するソース クラスタでもあります。
Cloud Storage シンクコネクタをホストする Connect クラスタを作成します。
Kafka からストリーミングされたデータを保存する Cloud Storage バケットを作成します。
ソース クラスタ内に Kafka トピックを作成して構成します。データは、この Kafka トピックから宛先 Cloud Storage バケットに移動します。
必要なロールと権限
Cloud Storage シンク コネクタの作成に必要な権限を取得するには、プロジェクトに対する Managed Kafka Connector 編集者 (roles/managedkafka.connectorEditor)IAM ロールを付与するよう管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。
この事前定義ロールには、Cloud Storage Sink コネクタの作成に必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。
必要な権限
Cloud Storage Sink コネクタを作成するには、次の権限が必要です。
-
親 Connect クラスタでコネクタの作成権限を付与します。
managedkafka.connectors.create
カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。
Managed Kafka Connector 編集者ロールの詳細については、Managed Service for Apache Kafka の事前定義ロールをご覧ください。
Managed Service for Apache Kafka クラスタが Connect クラスタと同じプロジェクトにある場合、追加の権限は必要ありません。Connect クラスタが別のプロジェクトにある場合は、別のプロジェクトに Connect クラスタを作成するをご覧ください。
Cloud Storage バケットに書き込む権限を付与する
Connect クラスタ サービス アカウント(形式は service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com)には、次の Cloud Storage 権限が必要です。
storage.objects.createstorage.objects.delete
これを行うには、Cloud Storage バケットを含むプロジェクトの Connect クラスタ サービス アカウントに Storage オブジェクト ユーザー(roles/storage.objectUser)ロールを付与します。
Cloud Storage シンクコネクタの仕組み
Cloud Storage Sink コネクタは、1 つ以上の Kafka トピックからデータを取得し、そのデータを単一の Cloud Storage バケット内のオブジェクトに書き込みます。
Cloud Storage シンク コネクタがデータをコピーする仕組みの詳細を次に示します。
コネクタは、ソースクラスタ内の 1 つ以上の Kafka トピックからメッセージを消費します。
コネクタは、コネクタ構成で指定したターゲット Cloud Storage バケットにデータを書き込みます。
コネクタは、コネクタ構成の特定のプロパティを参照して、データを Cloud Storage バケットに書き込むときにデータをフォーマットします。デフォルトでは、出力ファイルは CSV 形式です。
format.output.typeプロパティを構成して、JSON などのさまざまな出力形式を指定できます。コネクタは、Cloud Storage バケットに書き込まれるファイルの名前も指定します。
file.name.prefixプロパティとfile.name.templateプロパティを使用して、ファイル名をカスタマイズできます。たとえば、ファイル名に Kafka トピック名やメッセージキーを含めることができます。Kafka レコードには、ヘッダー、キー、値の 3 つのコンポーネントがあります。
format.output.fieldsを設定してヘッダーを含めることで、出力ファイルにヘッダーを含めることができます。例:format.output.fields=value,headersformat.output.fieldsを設定してkeyを含めることで、出力ファイルにキーを含めることができます。例:format.output.fields=key,value,headersキーは、
file.name.templateプロパティにkeyを含めることで、レコードをグループ化するためにも使用できます。
format.output.fieldsのデフォルトはvalueであるため、デフォルトで出力ファイルに値を含めることができます。コネクタは、変換およびフォーマットされたデータを指定された Cloud Storage バケットに書き込みます。
file.compression.typeプロパティを使用してファイル圧縮を構成すると、コネクタは Cloud Storage バケットに保存されているファイルを圧縮します。コンバータ構成は
format.output.typeプロパティによって制限されます。たとえば、
format.output.typeがcsvに設定されている場合、キー コンバータはorg.apache.kafka.connect.converters.ByteArrayConverterまたはorg.apache.kafka.connect.storage.StringConverterで、値コンバータはorg.apache.kafka.connect.converters.ByteArrayConverterである必要があります。format.output.typeがjsonに設定されている場合、value.converter.schemas.enableプロパティが true であっても、値とキーのスキーマは出力ファイルのデータとともに書き込まれません。
tasks.maxプロパティは、コネクタの並列処理のレベルを制御します。tasks.maxを増やすとスループットが向上しますが、実際の並列処理は Kafka トピックのパーティション数によって制限されます。
Cloud Storage シンクコネクタのプロパティ
Cloud Storage シンクコネクタを作成するときに、次のプロパティを指定します。
コネクタ名
コネクタの名前または ID。リソースの命名方法のガイドラインについては、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。名前は変更できません。
コネクタ プラグインのタイプ
Google Cloud コンソールで、コネクタ プラグイン タイプとして [Cloud Storage シンク] を選択します。ユーザー インターフェースを使用してコネクタを構成しない場合は、コネクタ クラスも指定する必要があります。
トピック
コネクタがメッセージを消費する Kafka トピック。1 つ以上のトピックを指定することも、正規表現を使用して複数のトピックを照合することもできます。たとえば、topic.* は「topic」で始まるすべてのトピックに一致します。これらのトピックは、Connect クラスタに関連付けられている Managed Service for Apache Kafka クラスタ内に存在する必要があります。
Cloud Storage バケット
データが保存される Cloud Storage バケットを選択または作成します。
構成
このセクションでは、Cloud Storage Sink コネクタのコネクタ固有の追加の構成プロパティを指定できます。
Kafka トピックのデータは、Avro、JSON、未加工のバイトなど、さまざまな形式で指定できるため、構成の重要な部分としてコンバータの指定があります。コンバータは、Kafka トピックで使用される形式のデータを Kafka Connect の標準化された内部形式に変換します。Cloud Storage シンク コネクタは、この内部データを取得し、Cloud Storage バケットで必要な形式に変換してから書き込みます。
Kafka Connect のコンバータの役割、サポートされているコンバータのタイプ、一般的な構成オプションの詳細については、コンバータをご覧ください。
Cloud Storage シンクコネクタに固有の構成を次に示します。
gcs.credentials.default: 実行環境から Google Cloud 認証情報を自動的に検出するかどうか。trueに設定する必要があります。gcs.bucket.name: データの書き込み先となる Cloud Storage バケットの名前を指定します。設定する必要があります。file.compression.type: Cloud Storage バケットに保存されているファイルの圧縮タイプを設定します。たとえば、gzip、snappy、zstd、noneなどがあります。デフォルト値はnoneです。file.name.prefix: Cloud Storage バケットに保存される各ファイルの名前に追加される接頭辞。デフォルト値は空です。format.output.type: Cloud Storage 出力ファイルへのデータの書き込みに使用されるデータ形式のタイプ。サポートされている値は、csv、json、jsonl、parquetです。デフォルト値はcsvです。
このコネクタに固有の構成プロパティのリストについては、Cloud Storage シンク コネクタの構成をご覧ください。
Cloud Storage シンクコネクタを作成する
コネクタを作成する前に、Cloud Storage シンクコネクタのプロパティのドキュメントを確認してください。
コンソール
Google Cloud コンソールで、[クラスタを接続] ページに移動します。
コネクタを作成する Connect クラスタをクリックします。
[クラスタの詳細を接続] ページが表示されます。
[コネクタを作成] をクリックします。
[Kafka コネクタの作成] ページが表示されます。
コネクタ名には文字列を入力します。
コネクタの命名方法のガイドラインについては、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。
[コネクタ プラグイン] で [Cloud Storage シンク] を選択します。
データをストリーミングできるトピックを指定します。
データを保存する Storage バケットを選択します。
(省略可)[構成] セクションで追加の設定を行います。
[タスクの再起動ポリシー] を選択します。詳細については、タスクの再起動ポリシーをご覧ください。
[作成] をクリックします。
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
gcloud managed-kafka connectors createコマンドを実行します。gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILE次のように置き換えます。
CONNECTOR_ID: コネクタの ID または名前。コネクタの命名方法のガイドラインについては、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。コネクタの名前は変更できません。
LOCATION: コネクタを作成するロケーション。これは、Connect クラスタを作成したロケーションと同じである必要があります。
CONNECT_CLUSTER_ID: コネクタが作成される Connect クラスタの ID。
CONFIG_FILE: BigQuery Sink コネクタの YAML 構成ファイルへのパス。
Cloud Storage Sink コネクタの構成ファイルの例を次に示します。
connector.class: "io.aiven.kafka.connect.gcs.GcsSinkConnector" tasks.max: "1" topics: "GMK_TOPIC_ID" gcs.bucket.name: "GCS_BUCKET_NAME" gcs.credentials.default: "true" format.output.type: "json" name: "GCS_SINK_CONNECTOR_ID" value.converter: "org.apache.kafka.connect.json.JsonConverter" value.converter.schemas.enable: "false" key.converter: "org.apache.kafka.connect.storage.StringConverter"次のように置き換えます。
GMK_TOPIC_ID: データが Cloud Storage Sink コネクタに流れる Managed Service for Apache Kafka トピックの ID。
GCS_BUCKET_NAME: パイプラインのシンクとして機能する Cloud Storage バケットの名前。
GCS_SINK_CONNECTOR_ID: Cloud Storage Sink コネクタの ID または名前。コネクタの命名方法のガイドラインについては、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。コネクタの名前は変更できません。
Terraform
Terraform リソースを使用してコネクタを作成できます。
Terraform 構成を適用または削除する方法については、基本的な Terraform コマンドをご覧ください。
Go
このサンプルを試す前に、 クライアント ライブラリをインストールするにある Go の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Go API のリファレンス ドキュメントをご覧ください。
Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報(ADC)を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。
Java
このサンプルを試す前に、 クライアント ライブラリをインストールするにある Java の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Java API リファレンス ドキュメントをご覧ください。
Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、 ローカル開発環境の ADC の設定をご覧ください。
Python
このサンプルを試す前に、 クライアント ライブラリをインストールするの Python の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Python API リファレンス ドキュメントをご覧ください。
Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。
コネクタを作成した後は、コネクタの編集、削除、一時停止、停止、再起動を行うことができます。