建立 Cloud Storage Sink 連接器

Cloud Storage 接收器連接器可將 Kafka 主題的資料串流至 Cloud Storage bucket。這項功能有助於以經濟實惠且可擴充的方式,儲存及處理大量資料。

事前準備

建立 Cloud Storage 接收器連接器前,請確認您已備妥下列項目:

必要角色和權限

如要取得建立 Cloud Storage Sink 連接器所需的權限,請要求管理員授予您專案的受管理 Kafka 連接器編輯者 (roles/managedkafka.connectorEditor) IAM 角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。

這個預先定義的角色具備建立 Cloud Storage 接收器連接器所需的權限。如要查看確切的必要權限,請展開「Required permissions」(必要權限) 部分:

所需權限

如要建立 Cloud Storage Sink 連接器,必須具備下列權限:

  • 在上層 Connect 叢集上授予建立連接器的權限: managedkafka.connectors.create

您或許還可透過自訂角色或其他預先定義的角色取得這些權限。

如要進一步瞭解「Managed Kafka Connector Editor」角色,請參閱「Managed Service for Apache Kafka 預先定義角色」。

如果 Managed Service for Apache Kafka 叢集與 Connect 叢集位於同一個專案,則不需要其他權限。如果 Connect 叢集位於其他專案,請參閱「在其他專案中建立 Connect 叢集」。

授予寫入 Cloud Storage bucket 的權限

Connect 叢集服務帳戶 (格式為 service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com) 必須具備下列 Cloud Storage 權限:

  • storage.objects.create
  • storage.objects.delete

如要這麼做,請在包含 Cloud Storage 值區的專案中,將「Storage 物件使用者」 (roles/storage.objectUser) 角色授予 Connect 叢集服務帳戶。

Cloud Storage 接收器連接器的運作方式

Cloud Storage 接收器連接器會從一或多個 Kafka 主題提取資料,並將資料寫入單一 Cloud Storage bucket 中的物件。

以下詳細說明 Cloud Storage Sink 連接器如何複製資料:

  • 連接器會取用來源叢集內一或多個 Kafka 主題的訊息。

  • 連接器會將資料寫入您在連接器設定中指定的目標 Cloud Storage 值區。

  • 連接器會參照連接器設定中的特定屬性,在將資料寫入 Cloud Storage bucket 時格式化資料。根據預設,輸出檔案為 CSV 格式。您可以設定 format.output.type 屬性,指定不同的輸出格式,例如 JSON。

  • 連接器也會為寫入 Cloud Storage bucket 的檔案命名。您可以使用 file.name.prefixfile.name.template 屬性自訂檔案名稱。舉例來說,您可以在檔案名稱中加入 Kafka 主題名稱或訊息金鑰。

  • Kafka 記錄包含三個元件:標頭、鍵、值。

    • 您可以將 format.output.fields 設為包含標頭,在輸出檔案中加入標頭。例如:format.output.fields=value,headers

    • 如要將金鑰納入輸出檔案,請將 format.output.fields 設為包含 key。例如:format.output.fields=key,value,headers

      您也可以在 file.name.template 屬性中加入 key,依鍵分組記錄。

  • 根據預設,您可以將值納入輸出檔案,因為 format.output.fields 預設為 value

  • 連接器會將轉換及格式化後的資料寫入指定的 Cloud Storage 值區。

  • 如果您使用 file.compression.type 屬性設定檔案壓縮,連接器會壓縮儲存在 Cloud Storage bucket 中的檔案。

  • 轉換器設定受 format.output.type 屬性限制。

    • 舉例來說,如果 format.output.type 設為 csv,則鍵轉換器必須是 org.apache.kafka.connect.converters.ByteArrayConverterorg.apache.kafka.connect.storage.StringConverter,值轉換器必須是 org.apache.kafka.connect.converters.ByteArrayConverter

    • 如果 format.output.type 設為 json,即使 value.converter.schemas.enable 屬性為 true,值和鍵結構定義也不會寫入輸出檔案中的資料。

  • tasks.max 屬性可控制連接器的平行處理層級。增加 tasks.max 可提高總處理量,但實際的平行處理能力會受限於 Kafka 主題中的分區數量。

Cloud Storage 接收器連接器的屬性

建立 Cloud Storage 接收器連接器時,請指定下列屬性。

連接器名稱

連接器的名稱或 ID。如要查看資源命名準則,請參閱 Managed Service for Apache Kafka 資源命名指南。 名稱無法變更。

連接器外掛程式類型

在Google Cloud 控制台中,選取「Cloud Storage Sink」做為連接器外掛程式類型。如果您未使用使用者介面設定連接器,則必須指定連接器類別。

主題

連接器從中取用訊息的 Kafka 主題。 您可以指定一或多個主題,也可以使用規則運算式比對多個主題。例如,topic.* 可比對所有以「topic」開頭的主題。這些主題必須位於與 Connect 叢集相關聯的 Managed Service for Apache Kafka 叢集內。

Cloud Storage 值區

選擇或建立儲存資料的 Cloud Storage bucket。

設定

本節可讓您為 Cloud Storage Sink 連接器指定其他連接器專屬的設定屬性。

由於 Kafka 主題中的資料格式可能不一,例如 Avro、JSON 或原始位元組,因此設定的關鍵部分是指定轉換器。轉換器會將 Kafka 主題中使用的格式資料,轉換為 Kafka Connect 的標準內部格式。接著,Cloud Storage Sink 連接器會取得這項內部資料,並轉換成 Cloud Storage bucket 要求的格式,然後再寫入資料。

如要進一步瞭解轉換器在 Kafka Connect 中的角色、支援的轉換器類型和常見設定選項,請參閱轉換器

以下是 Cloud Storage 接收器連接器專屬的一些設定:

  • gcs.credentials.default:是否要從執行環境自動探索 Google Cloud 憑證。必須設為「true」。

  • gcs.bucket.name:指定要寫入資料的 Cloud Storage bucket 名稱。必須設定。

  • file.compression.type:設定儲存在 Cloud Storage bucket 中的檔案壓縮類型。例如 gzipsnappyzstdnone。預設值為 none

  • file.name.prefix:要加到 Cloud Storage bucket 中每個檔案名稱的前置字元。預設值為空白。

  • format.output.type:用於將資料寫入 Cloud Storage 輸出檔案的資料格式類型。支援的值為:csvjsonjsonlparquet。預設值為 csv

如需這個連接器可用的特定設定屬性清單,請參閱「Cloud Storage Sink 連接器設定」。

建立 Cloud Storage 接收器連接器

建立連接器前,請先參閱「Cloud Storage 接收器連接器屬性」說明文件。

控制台

  1. 前往 Google Cloud 控制台的「Connect Clusters」(連結叢集) 頁面。

    前往「Connect Clusters」(連結叢集)

  2. 按一下要建立連接器的 Connect 叢集。

    系統隨即會顯示「Connect cluster details」(連線叢集詳細資料) 頁面。

  3. 按一下「Create connector」(建立連接器)。

    系統會顯示「建立 Kafka 連接器」頁面。

  4. 輸入連接器名稱字串。

    如要查看連線器命名準則,請參閱 Managed Service for Apache Kafka 資源命名指南

  5. 在「Connector plugin」(連接器外掛程式) 部分,選取「Cloud Storage Sink」(Cloud Storage 接收器)

  6. 指定可串流資料的主題

  7. 選擇要儲存資料的「Storage Bucket」(儲存空間值區)

  8. (選用) 在「設定」部分中,調整其他設定。

  9. 選取「任務重新啟動政策」。詳情請參閱「工作重新啟動政策」。

  10. 點選「建立」

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 執行 gcloud managed-kafka connectors create 指令:

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    更改下列內容:

    • CONNECTOR_ID:連接器的 ID 或名稱。 如要查看連線器命名準則,請參閱 Managed Service for Apache Kafka 資源命名指南。 連接器名稱無法變更。

    • LOCATION:建立連接器的位置。這個位置必須與您建立 Connect 叢集的位置相同。

    • CONNECT_CLUSTER_ID:建立連接器的 Connect 叢集 ID。

    • CONFIG_FILE:BigQuery Sink 連接器的 YAML 設定檔路徑。

    以下是 Cloud Storage Sink 連接器的設定檔範例:

    connector.class: "io.aiven.kafka.connect.gcs.GcsSinkConnector"
    tasks.max: "1"
    topics: "GMK_TOPIC_ID"
    gcs.bucket.name: "GCS_BUCKET_NAME"
    gcs.credentials.default: "true"
    format.output.type: "json"
    name: "GCS_SINK_CONNECTOR_ID"
    value.converter: "org.apache.kafka.connect.json.JsonConverter"
    value.converter.schemas.enable: "false"
    key.converter: "org.apache.kafka.connect.storage.StringConverter"
    

    更改下列內容:

    • GMK_TOPIC_ID:Managed Service for Apache Kafka 主題的 ID,資料會從該主題流向 Cloud Storage 接收器連接器。

    • GCS_BUCKET_NAME:做為管線接收器的 Cloud Storage bucket 名稱。

    • GCS_SINK_CONNECTOR_ID:Cloud Storage Sink 連接器的 ID 或名稱。如要查看連接器命名準則,請參閱Managed Service for Apache Kafka 資源命名指南。連接器名稱無法變更。

  3. Terraform

    您可以使用 Terraform 資源建立連接器

    resource "google_managed_kafka_connector" "example-cloud-storage-sink-connector" {
      project         = data.google_project.default.project_id
      connector_id    = "my-gcs-sink-connector"
      connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
      location        = "us-central1"
    
      configs = {
        "connector.class"                = "io.aiven.kafka.connect.gcs.GcsSinkConnector"
        "tasks.max"                      = "3"
        "topics"                         = "GMK_TOPIC_ID"
        "gcs.bucket.name"                = "GCS_BUCKET_NAME"
        "gcs.credentials.default"        = "true"
        "format.output.type"             = "json"
        "name"                           = "my-gcs-sink-connector"
        "value.converter"                = "org.apache.kafka.connect.json.JsonConverter"
        "value.converter.schemas.enable" = "false"
        "key.converter"                  = "org.apache.kafka.connect.storage.StringConverter"
      }
      provider = google-beta
    }

    如要瞭解如何套用或移除 Terraform 設定,請參閱「基本 Terraform 指令」。

    Go

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Go 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Go API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證(ADC)。 詳情請參閱「為本機開發環境設定 ADC」。

    import (
    	"context"
    	"fmt"
    	"io"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    )
    
    // createCloudStorageSinkConnector creates a Cloud Storage Sink connector.
    func createCloudStorageSinkConnector(w io.Writer, projectID, region, connectClusterID, connectorID, topics, gcsBucketName, tasksMax, formatOutputType, valueConverter, valueConverterSchemasEnable, keyConverter, gcsCredentialsDefault string, opts ...option.ClientOption) error {
    	// TODO(developer): Update with your config values. Here is a sample configuration:
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "GCS_SINK_CONNECTOR_ID"
    	// topics := "GMK_TOPIC_ID"
    	// gcsBucketName := "GCS_BUCKET_NAME"
    	// tasksMax := "3"
    	// formatOutputType := "json"
    	// valueConverter := "org.apache.kafka.connect.json.JsonConverter"
    	// valueConverterSchemasEnable := "false"
    	// keyConverter := "org.apache.kafka.connect.storage.StringConverter"
    	// gcsCredentialsDefault := "true"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)
    
    	config := map[string]string{
    		"connector.class":                "io.aiven.kafka.connect.gcs.GcsSinkConnector",
    		"tasks.max":                      tasksMax,
    		"topics":                         topics,
    		"gcs.bucket.name":                gcsBucketName,
    		"gcs.credentials.default":        gcsCredentialsDefault,
    		"format.output.type":             formatOutputType,
    		"name":                           connectorID,
    		"value.converter":                valueConverter,
    		"value.converter.schemas.enable": valueConverterSchemasEnable,
    		"key.converter":                  keyConverter,
    	}
    
    	connector := &managedkafkapb.Connector{
    		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
    		Configs: config,
    	}
    
    	req := &managedkafkapb.CreateConnectorRequest{
    		Parent:      parent,
    		ConnectorId: connectorID,
    		Connector:   connector,
    	}
    
    	resp, err := client.CreateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created Cloud Storage sink connector: %s\n", resp.Name)
    	return nil
    }
    

    Java

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Java 設定操作說明進行操作。詳情請參閱 Managed Service for Apache Kafka Java API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「 為本機開發環境設定 ADC」。

    
    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreateCloudStorageSinkConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-gcs-sink-connector";
        String bucketName = "my-gcs-bucket";
        String kafkaTopicName = "kafka-topic";
        String connectorClass = "io.aiven.kafka.connect.gcs.GcsSinkConnector";
        String maxTasks = "3";
        String gcsCredentialsDefault = "true";
        String formatOutputType = "json";
        String valueConverter = "org.apache.kafka.connect.json.JsonConverter";
        String valueSchemasEnable = "false";
        String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
        createCloudStorageSinkConnector(
            projectId,
            region,
            connectClusterId,
            connectorId,
            bucketName,
            kafkaTopicName,
            connectorClass,
            maxTasks,
            gcsCredentialsDefault,
            formatOutputType,
            valueConverter,
            valueSchemasEnable,
            keyConverter);
      }
    
      public static void createCloudStorageSinkConnector(
          String projectId,
          String region,
          String connectClusterId,
          String connectorId,
          String bucketName,
          String kafkaTopicName,
          String connectorClass,
          String maxTasks,
          String gcsCredentialsDefault,
          String formatOutputType,
          String valueConverter,
          String valueSchemasEnable,
          String keyConverter)
          throws Exception {
    
        // Build the connector configuration
        Map<String, String> configMap = new HashMap<>();
        configMap.put("connector.class", connectorClass);
        configMap.put("tasks.max", maxTasks);
        configMap.put("topics", kafkaTopicName);
        configMap.put("gcs.bucket.name", bucketName);
        configMap.put("gcs.credentials.default", gcsCredentialsDefault);
        configMap.put("format.output.type", formatOutputType);
        configMap.put("name", connectorId);
        configMap.put("value.converter", valueConverter);
        configMap.put("value.converter.schemas.enable", valueSchemasEnable);
        configMap.put("key.converter", keyConverter);
    
        Connector connector = Connector.newBuilder()
            .setName(
                ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
            .putAllConfigs(configMap)
            .build();
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
              .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
              .setConnectorId(connectorId)
              .setConnector(connector)
              .build();
    
          // This operation is being handled synchronously.
          Connector response = managedKafkaConnectClient.createConnector(request);
          System.out.printf("Created Cloud Storage Sink connector: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
        }
      }
    }
    

    Python

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Python 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Python API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定 ADC」。

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest
    
    connect_client = ManagedKafkaConnectClient()
    parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
    
    configs = {
        "connector.class": "io.aiven.kafka.connect.gcs.GcsSinkConnector",
        "tasks.max": tasks_max,
        "topics": topics,
        "gcs.bucket.name": gcs_bucket_name,
        "gcs.credentials.default": "true",
        "format.output.type": format_output_type,
        "name": connector_id,
        "value.converter": value_converter,
        "value.converter.schemas.enable": value_converter_schemas_enable,
        "key.converter": key_converter,
    }
    
    connector = Connector()
    connector.name = connector_id
    connector.configs = configs
    
    request = CreateConnectorRequest(
        parent=parent,
        connector_id=connector_id,
        connector=connector,
    )
    
    try:
        operation = connect_client.create_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Created Connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")

建立連接器後,您可以編輯、刪除、暫停、停止或重新啟動連接器。

後續步驟

Apache Kafka® 是 The Apache Software Foundation 或其關聯企業在美國與/或其他國家/地區的註冊商標。