Criar um conector de gravador do Cloud Storage

Com os conectores de coletor do Cloud Storage, é possível transmitir dados dos tópicos do Kafka para buckets do Cloud Storage. Isso é útil para armazenar e processar grandes volumes de dados de maneira econômica e escalonável.

Antes de começar

Antes de criar um conector de coletor do Cloud Storage, verifique se você tem o seguinte:

Papéis e permissões necessárias

Para receber as permissões necessárias para criar um conector de gravador do Cloud Storage, peça ao administrador para conceder a você o papel do IAM Editor de conector gerenciado do Kafka (roles/managedkafka.connectorEditor) no seu projeto. Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Esse papel predefinido contém as permissões necessárias para criar um conector de gravador do Cloud Storage. Para acessar as permissões exatas necessárias, expanda a seção Permissões necessárias:

Permissões necessárias

As seguintes permissões são necessárias para criar um conector de gravador do Cloud Storage:

  • Conceda a permissão para criar um conector no cluster pai do Connect: managedkafka.connectors.create

Essas permissões também podem ser concedidas com funções personalizadas ou outros papéis predefinidos.

Para mais informações sobre o papel Editor do conector gerenciado do Kafka, consulte Papéis predefinidos do Serviço Gerenciado para Apache Kafka.

Se o cluster do Serviço gerenciado para Apache Kafka estiver no mesmo projeto que o cluster do Connect, não serão necessárias outras permissões. Se o cluster do Connect estiver em um projeto diferente, consulte Criar um cluster do Connect em um projeto diferente.

Conceder permissões para gravar no bucket do Cloud Storage

A conta de serviço do cluster do Connect, que segue o formato service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com, requer as seguintes permissões do Cloud Storage:

  • storage.objects.create
  • storage.objects.delete

Para isso, conceda o papel Usuário de objetos do Storage (roles/storage.objectUser) à conta de serviço do cluster do Connect no projeto que contém o bucket do Cloud Storage.

Como funciona um conector de coletor do Cloud Storage

Um conector de coletor do Cloud Storage extrai dados de um ou mais tópicos do Kafka e grava esses dados em objetos em um único bucket do Cloud Storage.

Confira um detalhamento de como o conector de gravador do Cloud Storage copia dados:

  • O conector consome mensagens de um ou mais tópicos do Kafka no cluster de origem.

  • O conector grava os dados no bucket de destino do Cloud Storage especificado na configuração do conector.

  • O conector formata os dados à medida que os grava no bucket do Cloud Storage referenciando propriedades específicas na configuração do conector. Por padrão, os arquivos de saída estão no formato CSV. É possível configurar a propriedade format.output.type para especificar diferentes formatos de saída, como JSON.

  • O conector também nomeia os arquivos gravados no bucket do Cloud Storage. É possível personalizar os nomes dos arquivos usando as propriedades file.name.prefix e file.name.template. Por exemplo, é possível incluir o nome do tópico do Kafka ou as chaves de mensagem no nome do arquivo.

  • Um registro do Kafka tem três componentes: cabeçalhos, chaves e valores.

    • Para incluir cabeçalhos no arquivo de saída, defina format.output.fields. Por exemplo, format.output.fields=value,headers.

    • Para incluir chaves no arquivo de saída, defina format.output.fields para incluir key. Por exemplo, format.output.fields=key,value,headers.

      As chaves também podem ser usadas para agrupar registros incluindo key na propriedade file.name.template.

  • É possível incluir valores no arquivo de saída por padrão, já que format.output.fields é definido como value.

  • O conector grava os dados convertidos e formatados no bucket do Cloud Storage especificado.

  • O conector compacta os arquivos armazenados no bucket do Cloud Storage se você configurar a compactação de arquivos usando a propriedade file.compression.type.

  • As configurações do conversor são restritas pela propriedade format.output.type.

    • Por exemplo, quando format.output.type é definido como csv, o conversor de chave precisa ser org.apache.kafka.connect.converters.ByteArrayConverter ou org.apache.kafka.connect.storage.StringConverter, e o conversor de valor precisa ser org.apache.kafka.connect.converters.ByteArrayConverter.

    • Quando format.output.type é definido como json, o esquema de valor e chave não é gravado com os dados no arquivo de saída, mesmo que a propriedade value.converter.schemas.enable seja verdadeira.

  • A propriedade tasks.max controla o nível de paralelismo do conector. Aumentar tasks.max pode melhorar a capacidade, mas o paralelismo real é limitado pelo número de partições nos tópicos do Kafka.

Propriedades de um conector de coletor do Cloud Storage

Ao criar um conector de coletor do Cloud Storage, especifique as seguintes propriedades.

Nome do conector

O nome ou ID do conector. Para conferir as diretrizes sobre como nomear o recurso, consulte Diretrizes para nomear um recurso do Serviço gerenciado para Apache Kafka. O nome é imutável.

Tipo de plug-in do conector

Selecione Gravador do Cloud Storage como o tipo de plug-in do conector no consoleGoogle Cloud . Se você não usar a interface do usuário para configurar o conector, também precisará especificar a classe dele.

Tópicos

Os tópicos do Kafka de que o conector consome mensagens. É possível especificar um ou mais temas ou usar uma expressão regular para corresponder a vários temas. Por exemplo, topic.* para corresponder a todos os tópicos que começam com "tópico". Esses tópicos precisam existir no cluster do Serviço gerenciado para Apache Kafka associado ao seu cluster do Connect.

Bucket do Cloud Storage

Escolha ou crie o bucket do Cloud Storage onde os dados serão armazenados.

Configuração

Nesta seção, é possível especificar outras propriedades de configuração específicas do conector para o conector de gravador do Cloud Storage.

Como os dados nos tópicos do Kafka podem estar em vários formatos, como Avro, JSON ou bytes brutos, uma parte fundamental da configuração envolve especificar conversores. Os conversores traduzem dados do formato usado nos tópicos do Kafka para o formato interno padronizado do Kafka Connect. Em seguida, o conector de gravador do Cloud Storage usa esses dados internos e os transforma no formato exigido pelo bucket do Cloud Storage antes de gravar.

Para mais informações gerais sobre a função dos conversores no Kafka Connect, tipos de conversores compatíveis e opções de configuração comuns, consulte conversores.

Confira algumas configurações específicas do conector de gravador do Cloud Storage:

  • gcs.credentials.default: se as credenciais Google Cloud serão descobertas automaticamente no ambiente de execução. Precisa ser definido como true.

  • gcs.bucket.name: especifica o nome do bucket do Cloud Storage em que os dados são gravados. Obrigatório.

  • file.compression.type: define o tipo de compactação para arquivos armazenados no bucket do Cloud Storage. Exemplos: gzip, snappy, zstd e none. O valor padrão é none.

  • file.name.prefix: o prefixo a ser adicionado ao nome de cada arquivo armazenado no bucket do Cloud Storage. O valor padrão é vazio.

  • format.output.type: o tipo de formato de dados usado para gravar dados nos arquivos de saída do Cloud Storage. Os valores aceitos são: csv, json, jsonl e parquet. O valor padrão é csv.

Para uma lista das propriedades de configuração disponíveis específicas para esse conector, consulte as configurações do conector de gravador do Cloud Storage.

Criar um conector de coletor do Cloud Storage

Antes de criar um conector, consulte a documentação sobre as Propriedades de um conector de coletor do Cloud Storage.

Console

  1. No console do Google Cloud , acesse a página Conectar clusters.

    Acessar o Connect Clusters

  2. Clique no cluster do Connect para o qual você quer criar o conector.

    A página Detalhes do cluster de conexão é exibida.

  3. Clique em Criar conector.

    A página Criar conector do Kafka é exibida.

  4. Para o nome do conector, insira uma string.

    Para conferir as diretrizes de nomeação de um conector, consulte Diretrizes de nomeação de um recurso do Serviço gerenciado para Apache Kafka.

  5. Em Plug-in do conector, selecione Gravador do Cloud Storage.

  6. Especifique os tópicos de onde você pode transmitir dados.

  7. Escolha o bucket de armazenamento para armazenar os dados.

  8. (Opcional) Configure outras opções na seção Configuração.

  9. Selecione a Política de reinicialização da tarefa. Para mais informações, consulte Política de reinicialização de tarefas.

  10. Clique em Criar.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Execute o comando gcloud managed-kafka connectors create:

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    Substitua:

    • CONNECTOR_ID: o ID ou nome do conector. Para conferir as diretrizes de nomeação de um conector, consulte Diretrizes de nomeação de um recurso do Serviço gerenciado para Apache Kafka. O nome de um conector é imutável.

    • LOCATION: o local em que você cria o conector. Precisa ser o mesmo local em que você criou o cluster do Connect.

    • CONNECT_CLUSTER_ID: o ID do cluster do Connect em que o conector é criado.

    • CONFIG_FILE: o caminho para o arquivo de configuração YAML do conector de coletor do BigQuery.

    Confira um exemplo de arquivo de configuração para o conector de gravador do Cloud Storage:

    connector.class: "io.aiven.kafka.connect.gcs.GcsSinkConnector"
    tasks.max: "1"
    topics: "GMK_TOPIC_ID"
    gcs.bucket.name: "GCS_BUCKET_NAME"
    gcs.credentials.default: "true"
    format.output.type: "json"
    name: "GCS_SINK_CONNECTOR_ID"
    value.converter: "org.apache.kafka.connect.json.JsonConverter"
    value.converter.schemas.enable: "false"
    key.converter: "org.apache.kafka.connect.storage.StringConverter"
    

    Substitua:

    • GMK_TOPIC_ID: o ID do tópico do Serviço gerenciado para Apache Kafka de onde os dados fluem para o conector de coletor do Cloud Storage.

    • GCS_BUCKET_NAME: o nome do bucket do Cloud Storage que atua como um coletor para o pipeline.

    • GCS_SINK_CONNECTOR_ID: o ID ou nome do conector de gravador do Cloud Storage. Para conferir as diretrizes de nomeação de um conector, consulte Diretrizes de nomeação de um recurso do Serviço gerenciado para Apache Kafka. O nome de um conector é imutável.

  3. Terraform

    É possível usar um recurso do Terraform para criar um conector.

    resource "google_managed_kafka_connector" "example-cloud-storage-sink-connector" {
      project         = data.google_project.default.project_id
      connector_id    = "my-gcs-sink-connector"
      connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
      location        = "us-central1"
    
      configs = {
        "connector.class"                = "io.aiven.kafka.connect.gcs.GcsSinkConnector"
        "tasks.max"                      = "3"
        "topics"                         = "GMK_TOPIC_ID"
        "gcs.bucket.name"                = "GCS_BUCKET_NAME"
        "gcs.credentials.default"        = "true"
        "format.output.type"             = "json"
        "name"                           = "my-gcs-sink-connector"
        "value.converter"                = "org.apache.kafka.connect.json.JsonConverter"
        "value.converter.schemas.enable" = "false"
        "key.converter"                  = "org.apache.kafka.connect.storage.StringConverter"
      }
      provider = google-beta
    }

    Para saber como aplicar ou remover uma configuração do Terraform, consulte Comandos básicos do Terraform.

    Go

    Antes de testar esta amostra, siga as instruções de configuração do Go em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Go do serviço gerenciado para Apache Kafka.

    Para autenticar o Managed Service para Apache Kafka, configure o Application Default Credentials(ADC). Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    )
    
    // createCloudStorageSinkConnector creates a Cloud Storage Sink connector.
    func createCloudStorageSinkConnector(w io.Writer, projectID, region, connectClusterID, connectorID, topics, gcsBucketName, tasksMax, formatOutputType, valueConverter, valueConverterSchemasEnable, keyConverter, gcsCredentialsDefault string, opts ...option.ClientOption) error {
    	// TODO(developer): Update with your config values. Here is a sample configuration:
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "GCS_SINK_CONNECTOR_ID"
    	// topics := "GMK_TOPIC_ID"
    	// gcsBucketName := "GCS_BUCKET_NAME"
    	// tasksMax := "3"
    	// formatOutputType := "json"
    	// valueConverter := "org.apache.kafka.connect.json.JsonConverter"
    	// valueConverterSchemasEnable := "false"
    	// keyConverter := "org.apache.kafka.connect.storage.StringConverter"
    	// gcsCredentialsDefault := "true"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)
    
    	config := map[string]string{
    		"connector.class":                "io.aiven.kafka.connect.gcs.GcsSinkConnector",
    		"tasks.max":                      tasksMax,
    		"topics":                         topics,
    		"gcs.bucket.name":                gcsBucketName,
    		"gcs.credentials.default":        gcsCredentialsDefault,
    		"format.output.type":             formatOutputType,
    		"name":                           connectorID,
    		"value.converter":                valueConverter,
    		"value.converter.schemas.enable": valueConverterSchemasEnable,
    		"key.converter":                  keyConverter,
    	}
    
    	connector := &managedkafkapb.Connector{
    		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
    		Configs: config,
    	}
    
    	req := &managedkafkapb.CreateConnectorRequest{
    		Parent:      parent,
    		ConnectorId: connectorID,
    		Connector:   connector,
    	}
    
    	resp, err := client.CreateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created Cloud Storage sink connector: %s\n", resp.Name)
    	return nil
    }
    

    Java

    Antes de testar esta amostra, siga as instruções de configuração do Java em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Java do serviço gerenciado para Apache Kafka.

    Para autenticar o serviço gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

    
    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreateCloudStorageSinkConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-gcs-sink-connector";
        String bucketName = "my-gcs-bucket";
        String kafkaTopicName = "kafka-topic";
        String connectorClass = "io.aiven.kafka.connect.gcs.GcsSinkConnector";
        String maxTasks = "3";
        String gcsCredentialsDefault = "true";
        String formatOutputType = "json";
        String valueConverter = "org.apache.kafka.connect.json.JsonConverter";
        String valueSchemasEnable = "false";
        String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
        createCloudStorageSinkConnector(
            projectId,
            region,
            connectClusterId,
            connectorId,
            bucketName,
            kafkaTopicName,
            connectorClass,
            maxTasks,
            gcsCredentialsDefault,
            formatOutputType,
            valueConverter,
            valueSchemasEnable,
            keyConverter);
      }
    
      public static void createCloudStorageSinkConnector(
          String projectId,
          String region,
          String connectClusterId,
          String connectorId,
          String bucketName,
          String kafkaTopicName,
          String connectorClass,
          String maxTasks,
          String gcsCredentialsDefault,
          String formatOutputType,
          String valueConverter,
          String valueSchemasEnable,
          String keyConverter)
          throws Exception {
    
        // Build the connector configuration
        Map<String, String> configMap = new HashMap<>();
        configMap.put("connector.class", connectorClass);
        configMap.put("tasks.max", maxTasks);
        configMap.put("topics", kafkaTopicName);
        configMap.put("gcs.bucket.name", bucketName);
        configMap.put("gcs.credentials.default", gcsCredentialsDefault);
        configMap.put("format.output.type", formatOutputType);
        configMap.put("name", connectorId);
        configMap.put("value.converter", valueConverter);
        configMap.put("value.converter.schemas.enable", valueSchemasEnable);
        configMap.put("key.converter", keyConverter);
    
        Connector connector = Connector.newBuilder()
            .setName(
                ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
            .putAllConfigs(configMap)
            .build();
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
              .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
              .setConnectorId(connectorId)
              .setConnector(connector)
              .build();
    
          // This operation is being handled synchronously.
          Connector response = managedKafkaConnectClient.createConnector(request);
          System.out.printf("Created Cloud Storage Sink connector: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
        }
      }
    }
    

    Python

    Antes de testar esta amostra, siga as instruções de configuração do Python em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Python do serviço gerenciado para Apache Kafka.

    Para autenticar o serviço gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest
    
    connect_client = ManagedKafkaConnectClient()
    parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
    
    configs = {
        "connector.class": "io.aiven.kafka.connect.gcs.GcsSinkConnector",
        "tasks.max": tasks_max,
        "topics": topics,
        "gcs.bucket.name": gcs_bucket_name,
        "gcs.credentials.default": "true",
        "format.output.type": format_output_type,
        "name": connector_id,
        "value.converter": value_converter,
        "value.converter.schemas.enable": value_converter_schemas_enable,
        "key.converter": key_converter,
    }
    
    connector = Connector()
    connector.name = connector_id
    connector.configs = configs
    
    request = CreateConnectorRequest(
        parent=parent,
        connector_id=connector_id,
        connector=connector,
    )
    
    try:
        operation = connect_client.create_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Created Connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")

Depois de criar um conector, é possível editar, excluir, pausar, interromper ou reiniciar.

A seguir

Apache Kafka® é uma marca registrada da The Apache Software Foundation ou afiliadas nos Estados Unidos e/ou em outros países.