BigQuery Sink コネクタを使用すると、Kafka から BigQuery にデータをストリーミングして、BigQuery 内でリアルタイムのデータ取り込みと分析を行うことができます。BigQuery Sink コネクタは、1 つ以上の Kafka トピックからレコードを消費し、単一の BigQuery データセット内の 1 つ以上のテーブルにデータを書き込みます。
始める前に
BigQuery シンクコネクタを作成する前に、次のものがあることを確認してください。
Connect クラスタの Managed Service for Apache Kafka クラスタを作成します。このクラスタは、Connect クラスタに関連付けられているプライマリ Kafka クラスタです。このクラスタは、BigQuery シンクコネクタ パイプラインの一端を形成するソースクラスタでもあります。
BigQuery Sink コネクタをホストする Connect クラスタを作成します。
Kafka からストリーミングされたデータを保存する BigQuery データセットを作成します。
ソース クラスタ内に Kafka トピックを作成して構成します。データは、この Kafka トピックから宛先 BigQuery データセットに移動します。
必要なロールと権限
BigQuery Sink コネクタの作成に必要な権限を取得するには、プロジェクトに対する Managed Kafka Connector 編集者 (roles/managedkafka.connectorEditor)IAM ロールを付与するよう管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。
この事前定義ロールには、BigQuery Sink コネクタの作成に必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。
必要な権限
BigQuery Sink コネクタを作成するには、次の権限が必要です。
-
親 Connect クラスタでコネクタの作成権限を付与します。
managedkafka.connectors.create
カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。
Managed Kafka Connector 編集者ロールの詳細については、Managed Service for Apache Kafka の事前定義ロールをご覧ください。
Managed Service for Apache Kafka クラスタが Connect クラスタと同じプロジェクトにある場合、追加の権限は必要ありません。クラスタが別のプロジェクトにある場合は、別のプロジェクトに Connect クラスタを作成するをご覧ください。
BigQuery テーブルへの書き込み権限を付与する
Connect クラスタのサービス アカウント(service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com 形式)には、BigQuery テーブルへの書き込み権限が必要です。これを行うには、BigQuery テーブルを含むプロジェクトの Connect クラスタ サービス アカウントに BigQuery データ編集者(roles/bigquery.dataEditor)ロールを付与します。
BigQuery シンクコネクタのスキーマ
BigQuery シンクコネクタは、構成された値コンバータ(value.converter)を使用して、Kafka レコード値をフィールドに解析します。次に、フィールドを BigQuery テーブルの同じ名前の列に書き込みます。
コネクタの動作にはスキーマが必要です。スキーマは次の方法で指定できます。
- メッセージベースのスキーマ: スキーマは各メッセージの一部として含まれます。
- テーブルベースのスキーマ: コネクタは、BigQuery テーブル スキーマからメッセージ スキーマを推測します。
- スキーマ レジストリ: コネクタは、Managed Service for Apache Kafka スキーマ レジストリ(プレビュー)などのスキーマ レジストリからスキーマを読み取ります。
以降のセクションでは、これらのオプションについて説明します。
メッセージベースのスキーマ
このモードでは、各 Kafka レコードに JSON スキーマが含まれます。コネクタは、スキーマを使用してレコードデータを BigQuery テーブルの行として書き込みます。
メッセージベースのスキーマを使用するには、コネクタで次のプロパティを設定します。
value.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter.schemas.enable=true
Kafka レコード値の例:
{
"schema": {
"type": "struct",
"fields": [
{
"field": "user",
"type": "string",
"optional": false
},
{
"field": "age",
"type": "int64",
"optional": false
}
]
},
"payload": {
"user": "userId",
"age": 30
}
}
宛先テーブルがすでに存在する場合、BigQuery テーブルのスキーマは埋め込みメッセージのスキーマと互換性がある必要があります。autoCreateTables=true の場合、コネクタは必要に応じて宛先テーブルを自動的に作成します。詳細については、テーブルの作成をご覧ください。
メッセージ スキーマの変更に合わせてコネクタで BigQuery テーブル スキーマを更新する場合は、allowNewBigQueryFields、allowSchemaUnionization、または allowBigQueryRequiredFieldRelaxation を true に設定します。
テーブルベースのスキーマ
このモードでは、Kafka レコードには明示的なスキーマのないプレーンな JSON データが含まれます。コネクタは、宛先テーブルからスキーマを推測します。
要件:
- BigQuery テーブルはすでに存在している必要があります。
- Kafka レコードデータはテーブル スキーマと互換性がある必要があります。
- このモードでは、受信メッセージに基づく動的スキーマ更新はサポートされていません。
テーブルベースのスキーマを使用するには、コネクタで次のプロパティを設定します。
value.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter.schemas.enable=falsebigQueryPartitionDecorator=false
BigQuery テーブルで毎日パーティショニングする時間ベースのパーティショニングを使用している場合、bigQueryPartitionDecorator は true になります。それ以外の場合は、このプロパティを false に設定します。
Kafka レコード値の例:
{
"user": "userId",
"age": 30
}
スキーマ レジストリ
このモードでは、各 Kafka レコードに Apache Avro データが含まれ、メッセージ スキーマはスキーマ レジストリに保存されます。
スキーマ レジストリで BigQuery Sink コネクタを使用するには、コネクタで次のプロパティを設定します。
value.converter=io.confluent.connect.avro.AvroConvertervalue.converter.schema.registry.url=SCHEMA_REGISTRY_URL
SCHEMA_REGISTRY_URL は、スキーマ レジストリの URL に置き換えます。
Managed Service for Apache Kafka スキーマ レジストリでコネクタを使用するには、次のプロパティを設定します。
value.converter.bearer.auth.credentials.source=GCP
詳細については、スキーマ レジストリで Kafka Connect を使用するをご覧ください。
BigQuery 内の Apache Iceberg 用 BigLake テーブル
BigQuery シンク コネクタは、シンク ターゲットとして BigQuery 内の Apache Iceberg 用 BigLake テーブル(以下、BigQuery 内の BigLake Iceberg テーブル)をサポートしています。
BigQuery の BigLake Iceberg テーブルは、 Google Cloudでオープン形式のレイクハウスを構築するための基盤になります。BigQuery の BigLake Iceberg テーブルは、BigQuery テーブルと同じフルマネージド エクスペリエンスを提供します。また、Parquet を使用してお客様所有のストレージ バケットにデータを保存することで、オープン形式の Apache Iceberg テーブル形式と相互運用が可能です。
Apache Iceberg テーブルの作成方法については、Apache Iceberg テーブルを作成するをご覧ください。
BigQuery シンクコネクタを作成する
コンソール
Google Cloud コンソールで、[クラスタを接続] ページに移動します。
コネクタを作成する Connect クラスタをクリックします。
[コネクタを作成] をクリックします。
コネクタ名には文字列を入力します。
コネクタの命名方法のガイドラインについては、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。
[コネクタ プラグイン] で、[BigQuery シンク] を選択します。
[トピック] セクションで、読み取る Kafka トピックを指定します。トピックのリストまたはトピック名と照合する正規表現を指定できます。
オプション 1: [Kafka トピックのリストを選択する] を選択します。[Kafka トピック] リストで、1 つ以上のトピックを選択します。[OK] をクリックします。
オプション 2: [トピックの正規表現を使用する] を選択します。[トピックの正規表現] フィールドに正規表現を入力します。
[データセット] をクリックし、BigQuery データセットを指定します。既存のデータセットを選択することも、新しいデータセットを作成することもできます。
省略可: [構成] ボックスで、構成プロパティを追加するか、デフォルトのプロパティを編集します。詳細については、コネクタを構成するをご覧ください。
[タスクの再起動ポリシー] を選択します。詳細については、タスクの再起動ポリシーをご覧ください。
[作成] をクリックします。
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
gcloud managed-kafka connectors createコマンドを実行します。gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILE次のように置き換えます。
CONNECTOR_ID: コネクタの ID または名前。コネクタの命名方法のガイドラインについては、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。コネクタの名前は変更できません。
LOCATION: コネクタを作成するロケーション。これは、Connect クラスタを作成したロケーションと同じである必要があります。
CONNECT_CLUSTER_ID: コネクタが作成される Connect クラスタの ID。
CONFIG_FILE: BigQuery Sink コネクタの YAML 構成ファイルへのパス。
BigQuery Sink コネクタの構成ファイルの例を次に示します。
name: "BQ_SINK_CONNECTOR_ID" project: "GCP_PROJECT_ID" topics: "GMK_TOPIC_ID" tasks.max: 3 connector.class: "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector" key.converter: "org.apache.kafka.connect.storage.StringConverter" value.converter: "org.apache.kafka.connect.json.JsonConverter" value.converter.schemas.enable: "false" defaultDataset: "BQ_DATASET_ID"次のように置き換えます。
BQ_SINK_CONNECTOR_ID: BigQuery Sink コネクタの ID または名前。コネクタの命名方法のガイドラインについては、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。コネクタの名前は変更できません。
GCP_PROJECT_ID: BigQuery データセットが存在する Google Cloudプロジェクトの ID。
GMK_TOPIC_ID: データが BigQuery シンクコネクタに流れる Managed Service for Apache Kafka トピックの ID。
BQ_DATASET_ID: パイプラインのシンクとして機能する BigQuery データセットの ID。
Terraform
Terraform リソースを使用してコネクタを作成できます。
Terraform 構成を適用または削除する方法については、基本的な Terraform コマンドをご覧ください。
Go
このサンプルを試す前に、 クライアント ライブラリをインストールするにある Go の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Go API のリファレンス ドキュメントをご覧ください。
Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報(ADC)を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。
Java
このサンプルを試す前に、 クライアント ライブラリをインストールするにある Java の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Java API リファレンス ドキュメントをご覧ください。
Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、 ローカル開発環境の ADC の設定をご覧ください。
Python
このサンプルを試す前に、 クライアント ライブラリをインストールするの Python の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Python API リファレンス ドキュメントをご覧ください。
Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。
コネクタを作成した後は、コネクタの編集、削除、一時停止、停止、再起動を行うことができます。
コネクタを構成する
このセクションでは、コネクタで設定できる構成プロパティについて説明します。このコネクタに固有のプロパティの一覧については、BigQuery Sink コネクタの構成をご覧ください。
テーブル名
デフォルトでは、コネクタはトピック名を BigQuery テーブル名として使用します。別のテーブル名を使用するには、次の形式で topic2TableMap プロパティを設定します。
topic2TableMap=TOPIC_1:TABLE_1,TOPIC_2:TABLE_2,...
テーブルの作成
BigQuery シンクコネクタは、宛先テーブルが存在しない場合に作成できます。
autoCreateTables=trueの場合、コネクタは存在しない BigQuery テーブルの作成を試みます。この設定がデフォルトの動作です。autoCreateTables=falseの場合、コネクタはテーブルを作成しません。宛先テーブルが存在しない場合は、エラーが発生します。
autoCreateTables が true の場合、次の構成プロパティを使用して、コネクタが新しいテーブルを作成して構成する方法をより詳細に制御できます。
allBQFieldsNullableclusteringPartitionFieldNamesconvertDoubleSpecialValuespartitionExpirationMssanitizeFieldNamessanitizeTopicstimestampPartitionFieldName
これらのプロパティの詳細については、BigQuery Sink コネクタの構成をご覧ください。
Kafka メタデータ
kafkaDataFieldName フィールドと kafkaKeyFieldName フィールドをそれぞれ構成することで、メタデータ情報やキー情報などの Kafka からの追加データを BigQuery テーブルにマッピングできます。メタデータ情報の例としては、Kafka トピック、パーティション、オフセット、挿入時間などがあります。