创建 BigQuery Sink 连接器

借助 BigQuery 接收器连接器,您可以将数据从 Kafka 流式传输到 BigQuery,从而在 BigQuery 中实现实时数据注入和分析。BigQuery 接收器连接器会从一个或多个 Kafka 主题中提取记录,并将数据写入单个 BigQuery 数据集中的一个或多个表。

准备工作

在创建 BigQuery 接收器连接器之前,请确保您具备以下条件:

所需的角色和权限

如需获得创建 BigQuery Sink 连接器所需的权限,请让您的管理员为您授予项目的 Managed Kafka Connector Editor (roles/managedkafka.connectorEditor) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含创建 BigQuery Sink 连接器所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

您需要具备以下权限才能创建 BigQuery Sink 连接器:

  • 在父级 Connect 集群上授予创建连接器的权限: managedkafka.connectors.create

您也可以使用自定义角色或其他预定义角色来获取这些权限。

如需详细了解 Managed Kafka Connector Editor 角色,请参阅 Managed Service for Apache Kafka 预定义角色

如果您的 Managed Service for Apache Kafka 集群与 Connect 集群位于同一项目中,则无需进一步授予权限。如果集群位于其他项目中,请参阅在其他项目中创建连接集群

授予向 BigQuery 表写入数据的权限

Connect 集群服务账号(格式为 service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com)需要具有写入 BigQuery 表的权限。为此,请向包含 BigQuery 表的项目中的 Connect 集群服务账号授予 BigQuery Data Editor (roles/bigquery.dataEditor) 角色。

BigQuery 接收器连接器的架构

BigQuery 接收器连接器使用配置的值转换器 (value.converter) 将 Kafka 记录值解析为字段。然后,它会将这些字段写入 BigQuery 表中具有相同名称的列。

连接器需要架构才能运行。您可以通过以下方式提供架构:

接下来的部分将介绍这些选项。

基于消息的架构

在此模式下,每个 Kafka 记录都包含一个 JSON 架构。连接器使用架构将记录数据写入为 BigQuery 表行。

如需使用基于消息的架构,请在连接器上设置以下属性:

  • value.converter=org.apache.kafka.connect.json.JsonConverter
  • value.converter.schemas.enable=true

Kafka 记录值示例:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "field": "user",
        "type": "string",
        "optional": false
      },
      {
        "field": "age",
        "type": "int64",
        "optional": false
      }
    ]
  },
  "payload": {
    "user": "userId",
    "age": 30
  }
}

如果目标表已存在,则 BigQuery 表架构必须与嵌入式消息架构兼容。如果值为 autoCreateTables=true,连接器会在需要时自动创建目标表。如需了解详情,请参阅创建表格

如果您希望连接器在消息架构发生变化时更新 BigQuery 表架构,请将 allowNewBigQueryFieldsallowSchemaUnionizationallowBigQueryRequiredFieldRelaxation 设置为 true

基于表格的架构

在此模式下,Kafka 记录包含没有明确架构的纯 JSON 数据。连接器会根据目标表推断架构。

要求:

  • BigQuery 表必须已存在。
  • Kafka 记录数据必须与表架构兼容。
  • 此模式不支持根据传入的消息动态更新架构。

如需使用基于表的架构,请在连接器上设置以下属性:

  • value.converter=org.apache.kafka.connect.json.JsonConverter
  • value.converter.schemas.enable=false
  • bigQueryPartitionDecorator=false

如果 BigQuery 表使用基于时间的分区(采用每日分区),则 bigQueryPartitionDecorator 可以是 true。否则,请将此属性设置为 false

Kafka 记录值示例:

{
  "user": "userId",
  "age": 30
}

架构注册表

在此模式下,每条 Kafka 记录都包含 Apache Avro 数据,并且消息架构存储在架构注册表中。

如需将 BigQuery Sink 连接器与架构注册表搭配使用,请在连接器上设置以下属性:

  • value.converter=io.confluent.connect.avro.AvroConverter
  • value.converter.schema.registry.url=SCHEMA_REGISTRY_URL

SCHEMA_REGISTRY_URL 替换为架构注册表的网址。

如需将连接器与 Managed Service for Apache Kafka 架构注册表搭配使用,请设置以下属性:

  • value.converter.bearer.auth.credentials.source=GCP

如需了解详情,请参阅将 Kafka Connect 与架构注册表搭配使用

BigQuery 中的 Apache Iceberg BigLake 表

BigQuery Sink 连接器支持将适用于 Apache Iceberg 的 BigQuery BigLake 表(以下称为“BigQuery 中的 BigLake Iceberg 表”)作为接收器目标。

BigQuery 中的 BigLake Iceberg 表为在 Google Cloud上构建开放格式湖仓一体提供了基础。BigQuery 中的 BigLake Iceberg 表提供与 BigQuery 表相同的全代管式体验,但使用 Parquet 将数据存储在客户拥有的存储分区中,以便与 Apache Iceberg 开放表格式进行互操作。

如需了解如何创建 Apache Iceberg 表,请参阅创建 Apache Iceberg 表

创建 BigQuery 接收器连接器

控制台

  1. 在 Google Cloud 控制台中,前往连接集群页面。

    前往“关联集群”

  2. 点击要创建连接器的 Connect 集群。

  3. 点击创建连接器

  4. 对于连接器名称,请输入一个字符串。

    有关如何命名连接器的指南,请参阅 Managed Service for Apache Kafka 资源命名指南

  5. 对于连接器插件,选择 BigQuery Sink

  6. 主题部分中,指定要从中读取数据的 Kafka 主题。您可以指定主题列表或正则表达式,以匹配主题名称。

    • 方法 1:选择选择一个 Kafka 主题列表。在 Kafka 主题列表中,选择一个或多个主题。点击确定

    • 方法 2:选择使用主题正则表达式。在主题正则表达式字段中,输入正则表达式。

  7. 点击数据集,然后指定一个 BigQuery 数据集。您可以选择现有数据集,也可以创建新数据集。

  8. 可选:在配置框中,添加配置属性或修改默认属性。如需了解详情,请参阅配置连接器

  9. 选择任务重启政策。如需了解详情,请参阅任务重启政策

  10. 点击创建

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 运行 gcloud managed-kafka connectors create 命令:

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    替换以下内容:

    • CONNECTOR_ID:连接器的 ID 或名称。 有关如何命名连接器的指南,请参阅 Managed Service for Apache Kafka 资源命名指南。连接器的名称不可变。

    • LOCATION:您创建连接器的位置。此位置必须与您创建 Connect 集群的位置相同。

    • CONNECT_CLUSTER_ID:创建连接器的 Connect 集群的 ID。

    • CONFIG_FILE:BigQuery Sink 连接器的 YAML 配置文件路径。

    以下是 BigQuery Sink 连接器的配置文件示例:

    name: "BQ_SINK_CONNECTOR_ID"
    project: "GCP_PROJECT_ID"
    topics: "GMK_TOPIC_ID"
    tasks.max: 3
    connector.class: "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector"
    key.converter: "org.apache.kafka.connect.storage.StringConverter"
    value.converter: "org.apache.kafka.connect.json.JsonConverter"
    value.converter.schemas.enable: "false"
    defaultDataset: "BQ_DATASET_ID"
    

    替换以下内容:

    • BQ_SINK_CONNECTOR_ID:BigQuery Sink 连接器的 ID 或名称。如需了解如何命名连接器的指南,请参阅 Managed Service for Apache Kafka 资源命名指南。 连接器的名称不可变。

    • GCP_PROJECT_ID:BigQuery 数据集所在的 Google Cloud项目的 ID。

    • GMK_TOPIC_ID:Managed Service for Apache Kafka 主题的 ID,数据从该主题流向 BigQuery Sink 连接器。

    • BQ_DATASET_ID:作为流水线接收器的 BigQuery 数据集的 ID。

  3. Terraform

    您可以使用 Terraform 资源创建连接器

    resource "google_managed_kafka_connector" "example-bigquery-sink-connector" {
      project         = data.google_project.default.project_id
      connector_id    = "my-bigquery-sink-connector"
      connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
      location        = "us-central1"
    
      configs = {
        "name"                           = "my-bigquery-sink-connector"
        "project"                        = data.google_project.default.project_id
        "topics"                         = "GMK_TOPIC_ID"
        "tasks.max"                      = "3"
        "connector.class"                = "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector"
        "key.converter"                  = "org.apache.kafka.connect.storage.StringConverter"
        "value.converter"                = "org.apache.kafka.connect.json.JsonConverter"
        "value.converter.schemas.enable" = "false"
        "defaultDataset"                 = "BQ_DATASET_ID"
      }
    
      provider = google-beta
    }

    如需了解如何应用或移除 Terraform 配置,请参阅基本 Terraform 命令

    Go

    在尝试此示例之前,请按照 安装客户端库中的 Go 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Go API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据(ADC)。 如需了解详情,请参阅为本地开发环境设置 ADC

    import (
    	"context"
    	"fmt"
    	"io"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    )
    
    // createBigQuerySinkConnector creates a BigQuery Sink connector.
    func createBigQuerySinkConnector(w io.Writer, projectID, region, connectClusterID, connectorID, topics, tasksMax, keyConverter, valueConverter, valueConverterSchemasEnable, defaultDataset string, opts ...option.ClientOption) error {
    	// TODO(developer): Update with your config values. Here is a sample configuration:
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "BQ_SINK_CONNECTOR_ID"
    	// topics := "GMK_TOPIC_ID"
    	// tasksMax := "3"
    	// keyConverter := "org.apache.kafka.connect.storage.StringConverter"
    	// valueConverter := "org.apache.kafka.connect.json.JsonConverter"
    	// valueConverterSchemasEnable := "false"
    	// defaultDataset := "BQ_DATASET_ID"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)
    
    	// BigQuery Sink sample connector configuration
    	config := map[string]string{
    		"name":                           connectorID,
    		"project":                        projectID,
    		"topics":                         topics,
    		"tasks.max":                      tasksMax,
    		"connector.class":                "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
    		"key.converter":                  keyConverter,
    		"value.converter":                valueConverter,
    		"value.converter.schemas.enable": valueConverterSchemasEnable,
    		"defaultDataset":                 defaultDataset,
    	}
    
    	connector := &managedkafkapb.Connector{
    		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
    		Configs: config,
    	}
    
    	req := &managedkafkapb.CreateConnectorRequest{
    		Parent:      parent,
    		ConnectorId: connectorID,
    		Connector:   connector,
    	}
    
    	resp, err := client.CreateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created BigQuery sink connector: %s\n", resp.Name)
    	return nil
    }
    

    Java

    在尝试此示例之前,请按照 安装客户端库中的 Java 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Java API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅 为本地开发环境设置 ADC

    
    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreateBigQuerySinkConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-bigquery-sink-connector";
        String bigqueryProjectId = "my-bigquery-project-id";
        String datasetName = "my-dataset";
        String kafkaTopicName = "kafka-topic";
        String maxTasks = "3";
        String connectorClass = "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector";
        String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
        String valueConverter = "org.apache.kafka.connect.json.JsonConverter";
        String valueSchemasEnable = "false";
        createBigQuerySinkConnector(
            projectId,
            region,
            connectClusterId,
            connectorId,
            bigqueryProjectId,
            datasetName,
            kafkaTopicName,
            maxTasks,
            connectorClass,
            keyConverter,
            valueConverter,
            valueSchemasEnable);
      }
    
      public static void createBigQuerySinkConnector(
          String projectId,
          String region,
          String connectClusterId,
          String connectorId,
          String bigqueryProjectId,
          String datasetName,
          String kafkaTopicName,
          String maxTasks,
          String connectorClass,
          String keyConverter,
          String valueConverter,
          String valueSchemasEnable)
          throws Exception {
    
        // Build the connector configuration
        Map<String, String> configMap = new HashMap<>();
        configMap.put("name", connectorId);
        configMap.put("project", bigqueryProjectId);
        configMap.put("topics", kafkaTopicName);
        configMap.put("tasks.max", maxTasks);
        configMap.put("connector.class", connectorClass);
        configMap.put("key.converter", keyConverter);
        configMap.put("value.converter", valueConverter);
        configMap.put("value.converter.schemas.enable", valueSchemasEnable);
        configMap.put("defaultDataset", datasetName);
    
        Connector connector =
            Connector.newBuilder()
                .setName(ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
                .putAllConfigs(configMap)
                .build();
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          CreateConnectorRequest request =
              CreateConnectorRequest.newBuilder()
                  .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
                  .setConnectorId(connectorId)
                  .setConnector(connector)
                  .build();
    
          // This operation is being handled synchronously.
          Connector response = managedKafkaConnectClient.createConnector(request);
          System.out.printf("Created BigQuery Sink connector: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
        }
      }
    }
    

    Python

    在尝试此示例之前,请按照 安装客户端库中的 Python 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Python API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置 ADC

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest
    
    connect_client = ManagedKafkaConnectClient()
    parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
    
    configs = {
        "name": connector_id,
        "project": project_id,
        "topics": topics,
        "tasks.max": tasks_max,
        "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
        "key.converter": key_converter,
        "value.converter": value_converter,
        "value.converter.schemas.enable": value_converter_schemas_enable,
        "defaultDataset": default_dataset,
    }
    
    connector = Connector()
    connector.name = connector_id
    connector.configs = configs
    
    request = CreateConnectorRequest(
        parent=parent,
        connector_id=connector_id,
        connector=connector,
    )
    
    try:
        operation = connect_client.create_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Created Connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")

创建连接器后,您可以修改、删除、暂停、停止或重启连接器。

配置连接器

本部分介绍了一些可在连接器上设置的配置属性。如需查看此连接器特有的属性的完整列表,请参阅 BigQuery Sink 连接器配置

表名称

默认情况下,连接器使用主题名称作为 BigQuery 表名称。如需使用其他表名称,请按以下格式设置 topic2TableMap 属性:

topic2TableMap=TOPIC_1:TABLE_1,TOPIC_2:TABLE_2,...

创建表格

如果目标表不存在,BigQuery 接收器连接器可以创建这些表。

  • 如果值为 autoCreateTables=true,连接器会尝试创建任何不存在的 BigQuery 表。此设置是默认行为。

  • 如果值为 autoCreateTables=false,连接器不会创建任何表。如果目标表不存在,则会发生错误。

autoCreateTablestrue 时,您可以使用以下配置属性来更精细地控制连接器创建和配置新表的方式:

  • allBQFieldsNullable
  • clusteringPartitionFieldNames
  • convertDoubleSpecialValues
  • partitionExpirationMs
  • sanitizeFieldNames
  • sanitizeTopics
  • timestampPartitionFieldName

如需了解这些属性,请参阅 BigQuery Sink 连接器配置

Kafka 元数据

您可以通过分别配置 kafkaDataFieldNamekafkaKeyFieldName 字段,将 Kafka 中的其他数据(例如元数据信息和键信息)映射到 BigQuery 表中。元数据信息的示例包括 Kafka 主题、分区、偏移量和插入时间。

后续步骤

Apache Kafka® 是 Apache Software Foundation 或其关联公司在美国和/或其他国家/地区的注册商标。