BigQuery Sink コネクタを作成する

BigQuery Sink コネクタを使用すると、Kafka から BigQuery にデータをストリーミングして、BigQuery 内でリアルタイムのデータ取り込みと分析を行うことができます。BigQuery Sink コネクタは、1 つ以上の Kafka トピックからレコードを消費し、単一の BigQuery データセット内の 1 つ以上のテーブルにデータを書き込みます。

始める前に

BigQuery シンクコネクタを作成する前に、次のものがあることを確認してください。

必要なロールと権限

BigQuery Sink コネクタの作成に必要な権限を取得するには、プロジェクトに対する Managed Kafka Connector 編集者 roles/managedkafka.connectorEditor)IAM ロールを付与するよう管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。

この事前定義ロールには、BigQuery Sink コネクタの作成に必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。

必要な権限

BigQuery Sink コネクタを作成するには、次の権限が必要です。

  • 親 Connect クラスタでコネクタの作成権限を付与します。 managedkafka.connectors.create

カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。

Managed Kafka Connector 編集者ロールの詳細については、Managed Service for Apache Kafka の事前定義ロールをご覧ください。

Managed Service for Apache Kafka クラスタが Connect クラスタと同じプロジェクトにある場合、追加の権限は必要ありません。クラスタが別のプロジェクトにある場合は、別のプロジェクトに Connect クラスタを作成するをご覧ください。

BigQuery テーブルへの書き込み権限を付与する

Connect クラスタのサービス アカウント(service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com 形式)には、BigQuery テーブルへの書き込み権限が必要です。これを行うには、BigQuery テーブルを含むプロジェクトの Connect クラスタ サービス アカウントに BigQuery データ編集者roles/bigquery.dataEditor)ロールを付与します。

BigQuery シンクコネクタのスキーマ

BigQuery シンクコネクタは、構成された値コンバータ(value.converter)を使用して、Kafka レコード値をフィールドに解析します。次に、フィールドを BigQuery テーブルの同じ名前の列に書き込みます。

コネクタの動作にはスキーマが必要です。スキーマは次の方法で指定できます。

  • メッセージベースのスキーマ: スキーマは各メッセージの一部として含まれます。
  • テーブルベースのスキーマ: コネクタは、BigQuery テーブル スキーマからメッセージ スキーマを推測します。
  • スキーマ レジストリ: コネクタは、Managed Service for Apache Kafka スキーマ レジストリプレビュー)などのスキーマ レジストリからスキーマを読み取ります。

以降のセクションでは、これらのオプションについて説明します。

メッセージベースのスキーマ

このモードでは、各 Kafka レコードに JSON スキーマが含まれます。コネクタは、スキーマを使用してレコードデータを BigQuery テーブルの行として書き込みます。

メッセージベースのスキーマを使用するには、コネクタで次のプロパティを設定します。

  • value.converter=org.apache.kafka.connect.json.JsonConverter
  • value.converter.schemas.enable=true

Kafka レコード値の例:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "field": "user",
        "type": "string",
        "optional": false
      },
      {
        "field": "age",
        "type": "int64",
        "optional": false
      }
    ]
  },
  "payload": {
    "user": "userId",
    "age": 30
  }
}

宛先テーブルがすでに存在する場合、BigQuery テーブルのスキーマは埋め込みメッセージのスキーマと互換性がある必要があります。autoCreateTables=true の場合、コネクタは必要に応じて宛先テーブルを自動的に作成します。詳細については、テーブルの作成をご覧ください。

メッセージ スキーマの変更に合わせてコネクタで BigQuery テーブル スキーマを更新する場合は、allowNewBigQueryFieldsallowSchemaUnionization、または allowBigQueryRequiredFieldRelaxationtrue に設定します。

テーブルベースのスキーマ

このモードでは、Kafka レコードには明示的なスキーマのないプレーンな JSON データが含まれます。コネクタは、宛先テーブルからスキーマを推測します。

要件:

  • BigQuery テーブルはすでに存在している必要があります。
  • Kafka レコードデータはテーブル スキーマと互換性がある必要があります。
  • このモードでは、受信メッセージに基づく動的スキーマ更新はサポートされていません。

テーブルベースのスキーマを使用するには、コネクタで次のプロパティを設定します。

  • value.converter=org.apache.kafka.connect.json.JsonConverter
  • value.converter.schemas.enable=false
  • bigQueryPartitionDecorator=false

BigQuery テーブルで毎日パーティショニングする時間ベースのパーティショニングを使用している場合、bigQueryPartitionDecoratortrue になります。それ以外の場合は、このプロパティを false に設定します。

Kafka レコード値の例:

{
  "user": "userId",
  "age": 30
}

スキーマ レジストリ

このモードでは、各 Kafka レコードに Apache Avro データが含まれ、メッセージ スキーマはスキーマ レジストリに保存されます。

スキーマ レジストリで BigQuery Sink コネクタを使用するには、コネクタで次のプロパティを設定します。

  • value.converter=io.confluent.connect.avro.AvroConverter
  • value.converter.schema.registry.url=SCHEMA_REGISTRY_URL

SCHEMA_REGISTRY_URL は、スキーマ レジストリの URL に置き換えます。

Managed Service for Apache Kafka スキーマ レジストリでコネクタを使用するには、次のプロパティを設定します。

  • value.converter.bearer.auth.credentials.source=GCP

詳細については、スキーマ レジストリで Kafka Connect を使用するをご覧ください。

BigQuery 内の Apache Iceberg 用 BigLake テーブル

BigQuery シンク コネクタは、シンク ターゲットとして BigQuery 内の Apache Iceberg 用 BigLake テーブル(以下、BigQuery 内の BigLake Iceberg テーブル)をサポートしています。

BigQuery の BigLake Iceberg テーブルは、 Google Cloudでオープン形式のレイクハウスを構築するための基盤になります。BigQuery の BigLake Iceberg テーブルは、BigQuery テーブルと同じフルマネージド エクスペリエンスを提供します。また、Parquet を使用してお客様所有のストレージ バケットにデータを保存することで、オープン形式の Apache Iceberg テーブル形式と相互運用が可能です。

Apache Iceberg テーブルの作成方法については、Apache Iceberg テーブルを作成するをご覧ください。

BigQuery シンクコネクタを作成する

コンソール

  1. Google Cloud コンソールで、[クラスタを接続] ページに移動します。

    [Connect クラスタ] に移動

  2. コネクタを作成する Connect クラスタをクリックします。

  3. [コネクタを作成] をクリックします。

  4. コネクタ名には文字列を入力します。

    コネクタの命名方法のガイドラインについては、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。

  5. [コネクタ プラグイン] で、[BigQuery シンク] を選択します。

  6. [トピック] セクションで、読み取る Kafka トピックを指定します。トピックのリストまたはトピック名と照合する正規表現を指定できます。

    • オプション 1: [Kafka トピックのリストを選択する] を選択します。[Kafka トピック] リストで、1 つ以上のトピックを選択します。[OK] をクリックします。

    • オプション 2: [トピックの正規表現を使用する] を選択します。[トピックの正規表現] フィールドに正規表現を入力します。

  7. [データセット] をクリックし、BigQuery データセットを指定します。既存のデータセットを選択することも、新しいデータセットを作成することもできます。

  8. 省略可: [構成] ボックスで、構成プロパティを追加するか、デフォルトのプロパティを編集します。詳細については、コネクタを構成するをご覧ください。

  9. [タスクの再起動ポリシー] を選択します。詳細については、タスクの再起動ポリシーをご覧ください。

  10. [作成] をクリックします。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. gcloud managed-kafka connectors create コマンドを実行します。

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    次のように置き換えます。

    • CONNECTOR_ID: コネクタの ID または名前。コネクタの命名方法のガイドラインについては、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。コネクタの名前は変更できません。

    • LOCATION: コネクタを作成するロケーション。これは、Connect クラスタを作成したロケーションと同じである必要があります。

    • CONNECT_CLUSTER_ID: コネクタが作成される Connect クラスタの ID。

    • CONFIG_FILE: BigQuery Sink コネクタの YAML 構成ファイルへのパス。

    BigQuery Sink コネクタの構成ファイルの例を次に示します。

    name: "BQ_SINK_CONNECTOR_ID"
    project: "GCP_PROJECT_ID"
    topics: "GMK_TOPIC_ID"
    tasks.max: 3
    connector.class: "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector"
    key.converter: "org.apache.kafka.connect.storage.StringConverter"
    value.converter: "org.apache.kafka.connect.json.JsonConverter"
    value.converter.schemas.enable: "false"
    defaultDataset: "BQ_DATASET_ID"
    

    次のように置き換えます。

    • BQ_SINK_CONNECTOR_ID: BigQuery Sink コネクタの ID または名前。コネクタの命名方法のガイドラインについては、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。コネクタの名前は変更できません。

    • GCP_PROJECT_ID: BigQuery データセットが存在する Google Cloudプロジェクトの ID。

    • GMK_TOPIC_ID: データが BigQuery シンクコネクタに流れる Managed Service for Apache Kafka トピックの ID。

    • BQ_DATASET_ID: パイプラインのシンクとして機能する BigQuery データセットの ID。

  3. Terraform

    Terraform リソースを使用してコネクタを作成できます。

    resource "google_managed_kafka_connector" "example-bigquery-sink-connector" {
      project         = data.google_project.default.project_id
      connector_id    = "my-bigquery-sink-connector"
      connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
      location        = "us-central1"
    
      configs = {
        "name"                           = "my-bigquery-sink-connector"
        "project"                        = data.google_project.default.project_id
        "topics"                         = "GMK_TOPIC_ID"
        "tasks.max"                      = "3"
        "connector.class"                = "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector"
        "key.converter"                  = "org.apache.kafka.connect.storage.StringConverter"
        "value.converter"                = "org.apache.kafka.connect.json.JsonConverter"
        "value.converter.schemas.enable" = "false"
        "defaultDataset"                 = "BQ_DATASET_ID"
      }
    
      provider = google-beta
    }

    Terraform 構成を適用または削除する方法については、基本的な Terraform コマンドをご覧ください。

    Go

    このサンプルを試す前に、 クライアント ライブラリをインストールするにある Go の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Go API のリファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報(ADC)を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。

    import (
    	"context"
    	"fmt"
    	"io"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    )
    
    // createBigQuerySinkConnector creates a BigQuery Sink connector.
    func createBigQuerySinkConnector(w io.Writer, projectID, region, connectClusterID, connectorID, topics, tasksMax, keyConverter, valueConverter, valueConverterSchemasEnable, defaultDataset string, opts ...option.ClientOption) error {
    	// TODO(developer): Update with your config values. Here is a sample configuration:
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "BQ_SINK_CONNECTOR_ID"
    	// topics := "GMK_TOPIC_ID"
    	// tasksMax := "3"
    	// keyConverter := "org.apache.kafka.connect.storage.StringConverter"
    	// valueConverter := "org.apache.kafka.connect.json.JsonConverter"
    	// valueConverterSchemasEnable := "false"
    	// defaultDataset := "BQ_DATASET_ID"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)
    
    	// BigQuery Sink sample connector configuration
    	config := map[string]string{
    		"name":                           connectorID,
    		"project":                        projectID,
    		"topics":                         topics,
    		"tasks.max":                      tasksMax,
    		"connector.class":                "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
    		"key.converter":                  keyConverter,
    		"value.converter":                valueConverter,
    		"value.converter.schemas.enable": valueConverterSchemasEnable,
    		"defaultDataset":                 defaultDataset,
    	}
    
    	connector := &managedkafkapb.Connector{
    		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
    		Configs: config,
    	}
    
    	req := &managedkafkapb.CreateConnectorRequest{
    		Parent:      parent,
    		ConnectorId: connectorID,
    		Connector:   connector,
    	}
    
    	resp, err := client.CreateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created BigQuery sink connector: %s\n", resp.Name)
    	return nil
    }
    

    Java

    このサンプルを試す前に、 クライアント ライブラリをインストールするにある Java の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Java API リファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、 ローカル開発環境の ADC の設定をご覧ください。

    
    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreateBigQuerySinkConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-bigquery-sink-connector";
        String bigqueryProjectId = "my-bigquery-project-id";
        String datasetName = "my-dataset";
        String kafkaTopicName = "kafka-topic";
        String maxTasks = "3";
        String connectorClass = "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector";
        String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
        String valueConverter = "org.apache.kafka.connect.json.JsonConverter";
        String valueSchemasEnable = "false";
        createBigQuerySinkConnector(
            projectId,
            region,
            connectClusterId,
            connectorId,
            bigqueryProjectId,
            datasetName,
            kafkaTopicName,
            maxTasks,
            connectorClass,
            keyConverter,
            valueConverter,
            valueSchemasEnable);
      }
    
      public static void createBigQuerySinkConnector(
          String projectId,
          String region,
          String connectClusterId,
          String connectorId,
          String bigqueryProjectId,
          String datasetName,
          String kafkaTopicName,
          String maxTasks,
          String connectorClass,
          String keyConverter,
          String valueConverter,
          String valueSchemasEnable)
          throws Exception {
    
        // Build the connector configuration
        Map<String, String> configMap = new HashMap<>();
        configMap.put("name", connectorId);
        configMap.put("project", bigqueryProjectId);
        configMap.put("topics", kafkaTopicName);
        configMap.put("tasks.max", maxTasks);
        configMap.put("connector.class", connectorClass);
        configMap.put("key.converter", keyConverter);
        configMap.put("value.converter", valueConverter);
        configMap.put("value.converter.schemas.enable", valueSchemasEnable);
        configMap.put("defaultDataset", datasetName);
    
        Connector connector =
            Connector.newBuilder()
                .setName(ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
                .putAllConfigs(configMap)
                .build();
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          CreateConnectorRequest request =
              CreateConnectorRequest.newBuilder()
                  .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
                  .setConnectorId(connectorId)
                  .setConnector(connector)
                  .build();
    
          // This operation is being handled synchronously.
          Connector response = managedKafkaConnectClient.createConnector(request);
          System.out.printf("Created BigQuery Sink connector: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
        }
      }
    }
    

    Python

    このサンプルを試す前に、 クライアント ライブラリをインストールするの Python の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Python API リファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest
    
    connect_client = ManagedKafkaConnectClient()
    parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
    
    configs = {
        "name": connector_id,
        "project": project_id,
        "topics": topics,
        "tasks.max": tasks_max,
        "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
        "key.converter": key_converter,
        "value.converter": value_converter,
        "value.converter.schemas.enable": value_converter_schemas_enable,
        "defaultDataset": default_dataset,
    }
    
    connector = Connector()
    connector.name = connector_id
    connector.configs = configs
    
    request = CreateConnectorRequest(
        parent=parent,
        connector_id=connector_id,
        connector=connector,
    )
    
    try:
        operation = connect_client.create_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Created Connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")

コネクタを作成した後は、コネクタの編集、削除、一時停止、停止、再起動を行うことができます。

コネクタを構成する

このセクションでは、コネクタで設定できる構成プロパティについて説明します。このコネクタに固有のプロパティの一覧については、BigQuery Sink コネクタの構成をご覧ください。

テーブル名

デフォルトでは、コネクタはトピック名を BigQuery テーブル名として使用します。別のテーブル名を使用するには、次の形式で topic2TableMap プロパティを設定します。

topic2TableMap=TOPIC_1:TABLE_1,TOPIC_2:TABLE_2,...

テーブルの作成

BigQuery シンクコネクタは、宛先テーブルが存在しない場合に作成できます。

  • autoCreateTables=true の場合、コネクタは存在しない BigQuery テーブルの作成を試みます。この設定がデフォルトの動作です。

  • autoCreateTables=false の場合、コネクタはテーブルを作成しません。宛先テーブルが存在しない場合は、エラーが発生します。

autoCreateTablestrue の場合、次の構成プロパティを使用して、コネクタが新しいテーブルを作成して構成する方法をより詳細に制御できます。

  • allBQFieldsNullable
  • clusteringPartitionFieldNames
  • convertDoubleSpecialValues
  • partitionExpirationMs
  • sanitizeFieldNames
  • sanitizeTopics
  • timestampPartitionFieldName

これらのプロパティの詳細については、BigQuery Sink コネクタの構成をご覧ください。

Kafka メタデータ

kafkaDataFieldName フィールドと kafkaKeyFieldName フィールドをそれぞれ構成することで、メタデータ情報やキー情報などの Kafka からの追加データを BigQuery テーブルにマッピングできます。メタデータ情報の例としては、Kafka トピック、パーティション、オフセット、挿入時間などがあります。

次のステップ

Apache Kafka® は、Apache Software Foundation または米国その他の諸国における関連会社の商標です。