Criar um cluster do Connect

Um cluster do Connect oferece um ambiente para conectores que ajuda a mover dados de implantações do Kafka para um cluster do serviço gerenciado do Google Cloud para Apache Kafka ou de um cluster do serviço gerenciado para Apache Kafka para outro serviço Google Cloud ou cluster do Kafka. O cluster secundário do Kafka pode ser outro cluster do Serviço gerenciado do Google Cloud para Apache Kafka, um cluster autogerenciado ou um local.

Antes de começar

Verifique se você já criou um cluster do Managed Service para Apache Kafka. Você precisa do nome do cluster do Serviço gerenciado para Apache Kafka a que o cluster do Connect será anexado.

Cada cluster do Connect está associado a um cluster do Serviço gerenciado para Apache Kafka. Esse cluster armazena o estado dos conectores em execução no cluster do Connect.

Papéis e permissões necessários para criar um cluster do Connect

Para receber as permissões necessárias para criar um cluster do Connect, peça ao administrador para conceder a você o papel do IAM de Editor de cluster gerenciado do Kafka Connect (roles/managedkafka.connectClusterEditor) no projeto. Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Esse papel predefinido contém as permissões necessárias para criar um cluster do Connect. Para acessar as permissões exatas necessárias, expanda a seção Permissões necessárias:

Permissões necessárias

As seguintes permissões são necessárias para criar um cluster do Connect:

  • Conceda a permissão para criar um cluster do Connect no local especificado: managedkafka.connectClusters.create

Essas permissões também podem ser concedidas com funções personalizadas ou outros papéis predefinidos.

Para mais informações sobre essa função, consulte Funções predefinidas do Managed Service para Apache Kafka.

Principais da ACL obrigatórios

Por padrão, os clusters do Serviço gerenciado para Apache Kafka permitem que o cluster do Connect acesse recursos se nenhuma ACL estiver configurada. Isso é feito ao definir allow.everyone.if.no.acl.found como true, que é a configuração padrão.

No entanto, se o cluster do Serviço gerenciado para Apache Kafka tiver ACLs configuradas, o cluster do Connect não terá automaticamente as permissões de leitura e gravação para os recursos. Você precisa concedê-las manualmente.

A conta de serviço do cluster do Connect usada como principal em ACLs segue este formato: User:service-{consumer project number}@gcp-sa-managedkafka.iam.gserviceaccount.com.

Se você configurou ACLs no cluster do Kafka, conceda ao cluster do Connect permissões de leitura e gravação para tópicos e permissões de leitura para grupos de consumidores usando os seguintes comandos:

/bin/kafka-acls.sh \
    --bootstrap-server BOOTSTRAP_ADDR \
    --command-config PATH_TO_CLIENT_PROPERTIES \
    --add \
    --allow-principal User:service-{consumer project number}@gcp-sa-managedkafka.iam.gserviceaccount.com \
    --operation READ --operation WRITE --topic *
/bin/kafka-acls.sh \
    --bootstrap-server BOOTSTRAP_ADDR \
    --command-config PATH_TO_CLIENT_PROPERTIES \
    --add \
    --allow-principal User:service-{consumer project number}@gcp-sa-managedkafka.iam.gserviceaccount.com \
    --operation READ --group *

Para mais informações sobre esses comandos, consulte Configurar ACLs do Apache Kafka para controle de acesso granular.

Criar um cluster do Connect em outro projeto

Ao criar um cluster do Connect, ele compartilha o mesmo agente de serviço com o cluster do Serviço gerenciado para Apache Kafka que está no mesmo projeto. Se este cluster do Serviço Gerenciado para Apache Kafka for designado como o cluster principal do Kafka anexado ao cluster do Connect, não serão necessárias outras permissões.

O agente de serviço tem o formato service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com. O número do projeto é do projeto que contém o cluster do Connect e o cluster do Serviço gerenciado para Apache Kafka.

Se o cluster do Connect estiver no projeto A e o cluster associado do Serviço gerenciado para Apache Kafka estiver no projeto B, siga estas etapas:

  1. Verifique se a API Managed Kafka está ativada nos projetos A e B.

    Ativar a API

  2. Identifique o agente de serviço do cluster do Connect no projeto A.

    O agente de serviço tem o formato service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com.

  3. No projeto B, conceda à conta de serviço do cluster do Connect o papel de cliente do Kafka gerenciado (roles/managedkafka.client).

    Essa função concede as permissões necessárias para se conectar ao cluster do Serviço Gerenciado para Apache Kafka e realizar operações como leitura e gravação de dados.

    Para mais informações sobre como conceder o papel, consulte Criar e conceder papéis a agentes de serviço.

Sempre siga o princípio de privilégio mínimo ao conceder permissões. Conceda apenas as permissões necessárias para garantir a segurança e evitar acessos não autorizados.

Propriedades de um cluster do Connect

Nesta seção, descrevemos as propriedades de um cluster do Connect.

Nome do cluster Connect

O nome do cluster do Connect que você está criando. Para conferir as diretrizes de nomeação de clusters Connect, acesse Diretrizes de nomeação de recursos do Serviço gerenciado para Apache Kafka. O nome de um cluster é imutável.

Cluster principal do Kafka

O cluster do Serviço gerenciado para Apache Kafka associado ao seu cluster do Connect. O cluster associado (cluster principal) armazena o estado dos conectores em execução no cluster do Connect. Em geral, o cluster principal do Serviço gerenciado para Apache Kafka também serve como destino para todos os conectores de origem e entrada para todos os conectores de coletor em execução no cluster do Connect.

Um único cluster do Serviço Gerenciado para Apache Kafka pode ter vários clusters do Connect. Se você escolher um cluster do Managed Service para Apache Kafka em um projeto diferente, verifique se as permissões adequadas estão configuradas.

Não é possível atualizar para um cluster do Kafka diferente depois de criar o cluster do Connect.

Benefícios da colocalização de regiões para latência e custos de rede

Colocar os clusters do Serviço gerenciado para Apache Kafka e do Connect na mesma região reduz a latência e os custos de rede. Por exemplo, suponha que seu cluster do Serviço gerenciado para Apache Kafka esteja em region-a e você esteja usando um conector de coletor para gravar dados desse cluster (origem) em uma tabela do BigQuery (coletor) que também esteja em region-a. Se você implantar o cluster do Connect em region-a, essa escolha de implantação vai minimizar a latência da operação de gravação do BigQuery e eliminar os custos de transferência de rede entre regiões do cluster do Serviço gerenciado para Apache Kafka e o cluster do Connect.

Considerações sobre custo e latência de vários sistemas

O Kafka Connect usa conectores para mover dados entre sistemas. Um lado do conector sempre interage com um cluster do Serviço Gerenciado para Apache Kafka. Um único cluster do Kafka Connect pode executar vários conectores, cada um atuando como uma fonte (extraindo dados de um sistema) ou um coletor (enviando dados para um sistema).

Embora um cluster do Connect na mesma região que o cluster do Serviço gerenciado para Apache Kafka se beneficie de uma latência de comunicação menor entre eles, cada conector também interage com outro sistema, como uma tabela do BigQuery ou outro cluster do Kafka. Mesmo que o cluster do Connect e o cluster do Serviço Gerenciado para Apache Kafka estejam no mesmo local, esse outro sistema pode estar em uma região diferente. Isso leva a maior latência e custo. A latência geral do pipeline depende dos locais dos três sistemas: o cluster do Serviço gerenciado para Apache Kafka, o cluster do Connect e o sistema de origem ou destino.

Por exemplo, se o cluster do Serviço gerenciado para Apache Kafka estiver em region-a, o cluster do Connect em region-b e você estiver usando um conector do Cloud Storage para um bucket em region-c, será cobrado por dois saltos de rede (region-a para region-b e depois region-b para region-c ou o inverso, dependendo da direção do conector).

Considere cuidadosamente todas as regiões envolvidas ao planejar o posicionamento do cluster do Connect para otimizar a latência e o custo.

Configuração de capacidade

A configuração da capacidade exige que você defina o número de vCPUs e a quantidade de memória para cada vCPU do cluster do Connect. É possível atualizar a capacidade de um cluster do Connect depois de criá-lo. Confira abaixo as propriedades para configuração de capacidade:

  • vCPUs: o número de vCPUs atribuídas a um cluster do Connect. O valor mínimo é 3 vCPUs.

  • Memória: a quantidade de memória atribuída a cada vCPU. É necessário provisionar entre 1 GiB e 8 GiB por vCPU. É possível aumentar ou diminuir a quantidade de memória dentro desses limites depois que o cluster é criado.

    Por exemplo, se você criar um cluster com 6 vCPUs, a memória mínima que poderá alocar para o cluster será de 6 GiB (1 GiB por vCPU), e a máxima será de 48 GiB (8 GiB por vCPU).

A vCPU e a memória alocadas para cada worker em um cluster do Connect têm um impacto significativo no desempenho, na capacidade e no custo do cluster. Confira um resumo de como a vCPU e a memória afetam um cluster do Connect.

Contagem de vCPU

  • O Kafka Connect divide o trabalho de um conector em tarefas. Cada tarefa pode processar dados em paralelo. Mais vCPUs significam que mais tarefas podem ser executadas simultaneamente, o que leva a uma maior capacidade de processamento.

  • Mais vCPUs aumentam os custos do cluster do Connect.

Memória

  • O Kafka Connect usa memória para armazenar dados em buffer à medida que eles fluem entre conectores e o Serviço gerenciado para Apache Kafka. Uma memória maior permite buffers maiores. Uma memória grande pode melhorar a capacidade de transferência, especialmente para fluxos de dados de alto volume. Os conectores que lidam com mensagens ou registros muito grandes exigem memória suficiente para processá-los sem encontrar exceções OutOfMemoryError.

  • Mais memória aumenta o custo do cluster do Connect.

  • Se você estiver usando uma lógica de transformação pesada, vai precisar de mais alocação de memória.

Seu objetivo é escolher a configuração de capacidade certa para seu cluster do Connect. Para isso, é preciso entender a capacidade de processamento que seu cluster do Connect pode lidar.

Sub-rede de worker (principal)

A sub-rede de worker, também conhecida como sub-rede principal, conecta sua rede VPC ao cluster do Connect. Essa sub-rede permite que os trabalhadores do cluster alcancem os endpoints de fontes e receptores na rede do consumidor, como clusters do Serviço Gerenciado para Apache Kafka ou clusters do Kafka auto-hospedados.

Confira alguns requisitos para configurar a sub-rede de worker:

  • A sub-rede do worker é obrigatória.

  • A sub-rede precisa estar na mesma região do cluster do Connect.

  • A sub-rede precisa estar na mesma VPC principal que uma das sub-redes conectadas do cluster principal do Kafka.

  • O intervalo de CIDR da sub-rede precisa ter um tamanho mínimo de /22 (1.024 endereços).

Os workers do cluster recebem endereços IP na sub-rede de worker usando uma interface do Private Service Connect. Os workers podem alcançar qualquer destino de rede acessível na rede VPC da sub-rede, com os seguintes requisitos:

  • O endpoint não pode estar no intervalo CIDR 172.16.0.0/14. Esse intervalo é reservado para uso interno do Serviço gerenciado para Apache Kafka Connect.
  • As regras de firewall precisam permitir o tráfego. Consulte Configurar a segurança para anexos de rede.
  • Para o tráfego da Internet, configure um Cloud NAT. Por exemplo, um Cloud NAT é necessário para que um conector do MirrorMaker replique dados de um cluster do Kafka acessível pela Internet.
  • Para acessar endpoints do Private Service Connect que estão em uma VPC diferente da VPC da sub-rede do worker, verifique se você está usando uma configuração de consumidor compatível (por exemplo, NCC). Para mais informações, consulte Sobre como acessar serviços publicados por meio de endpoints.

Domínios DNS solucionáveis

Os domínios DNS resolvíveis, também conhecidos como nomes de domínio DNS, permitem que os endereços DNS na rede VPC do consumidor sejam disponibilizados para a VPC do locatário. Isso permite que o cluster do Connect resolva nomes DNS para endereços IP, facilitando a comunicação com outros serviços, incluindo outros clusters do Kafka para conectores do MirrorMaker.

Para domínios DNS resolvíveis, selecione um cluster do Serviço gerenciado para Apache Kafka. Não é necessário configurar o nome de domínio DNS para o cluster principal do Serviço gerenciado para Apache Kafka, já que o endereço de bootstrap dele é incluído automaticamente na lista de domínios DNS solucionáveis.

No entanto, também é possível especificar um domínio DNS manualmente, o que é necessário se você selecionar um cluster externo do Kafka. O domínio de DNS do cluster principal do Serviço Gerenciado para Apache Kafka é incluído automaticamente. Outros clusters do Kafka ainda exigem a configuração de domínios DNS.

Recursos do Secret Manager

Especifique o Secret Manager para carregar nos workers. Esses secrets são armazenados com segurança no Secret Manager e disponibilizados para seu cluster do Connect.

Você pode usar o Secret Manager nas configurações de conector. Por exemplo, é possível carregar um arquivo de chave no cluster do Connect e fazer com que o conector leia o arquivo. Os Secret Managers são montados como arquivos em workers.

Os clusters conectados se integram diretamente ao Secret Manager. Você precisa usar o Secret Manager para armazenar e gerenciar seus secrets.

O formato para especificar um secret é: projects/{PROJECT_ID}/secrets/{SECRET_NAME}/versions/{VERSION_ID}

  • PROJECT_ID: o ID do projeto em que seu secret do Secret Manager reside.

  • SECRET_NAME: o nome do secret no Secret Manager.

  • VERSION_ID: o número da versão específica do secret. É um número como "1", "2", "3".

É possível carregar até 32 secrets em um único cluster do Connect.

Verifique se o agente de serviço que executa seus workers do Connect tem o papel secretmanager.secretAccessor (Acessador de secrets do Secret Manager) nos secrets que você quer usar. Essa função permite que o cluster do Connect recupere os valores de secret do Secret Manager.

Rótulos

Os rótulos são pares de chave-valor que ajudam na organização e identificação. Elas ajudam a organizar clusters do Connect. É possível anexar um rótulo a cada cluster do Connect e filtrar os recursos com base nesses rótulos. Exemplos de rótulos são environment:prod, application:web-app.

Criar um cluster do Connect

Antes de criar um cluster, consulte a documentação sobre propriedades do cluster do Connect.

A criação de um cluster do Connect leva de 20 a 30 minutos.

Console

  1. No console do Google Cloud , acesse a página Conectar clusters.

    Acessar o Connect Clusters

  2. Clique em Criar.

    A página Criar um cluster do Connect é aberta.

  3. Em Nome do cluster do Connect, insira uma string.

    Para mais informações sobre como nomear um cluster do Connect, consulte as Diretrizes de nomeação de um recurso do Serviço gerenciado para Apache Kafka.

  4. Em Cluster principal do Kafka, selecione um cluster do Serviço gerenciado para Apache Kafka no menu.

    Para mais informações sobre as funções que esse cluster do Serviço Gerenciado para Apache Kafka realiza, consulte Cluster principal do Kafka.

  5. Em Local, selecione um local compatível no menu Região ou mantenha o valor padrão.

    Para mais informações sobre como selecionar o local certo, consulte Cluster principal do Kafka.

  6. Em Configuração de capacidade, insira valores para vCPUs e Memória ou mantenha os valores padrão.

    Em vCPUs, insira o número de CPUs virtuais do cluster.

    Em Memória, insira a quantidade de memória por CPU em GiB. Uma mensagem de erro será exibida se a memória por CPU for maior que 8 GiB.

    Para mais informações sobre como dimensionar um cluster do Serviço gerenciado para Apache Kafka, consulte Configuração de capacidade.

  7. Em Configuração de rede, no menu Rede, selecione ou mantenha a rede do cluster principal do Serviço gerenciado para Apache Kafka.

  8. Em Sub-rede de worker, selecione ou mantenha a sub-rede no menu.

    O campo Caminho de URI da sub-rede é preenchido automaticamente. Para mais informações, consulte Sub-rede de worker.

  9. Em Domínios DNS solucionáveis, o domínio DNS do cluster principal do Kafka é adicionado automaticamente como um domínio DNS solucionável.

    Para adicionar outros domínios de DNS, expanda a seção, se necessário.

  10. Clique em Adicionar um domínio DNS.

    Selecione um cluster do Kafka no menu.

    O domínio DNS é preenchido automaticamente. Também é possível digitar o nome de domínio DNS de um cluster externo do Kafka.

    Clique em Concluído.

  11. Em Recursos do Secret Manager, expanda a seção se necessário.

  12. Clique em Adicionar recurso de secret.

  13. Selecione um secret no menu Secret e uma versão no menu Versão do secret. Também é possível criar um novo Secret.

    Verifique se o agente de serviço que executa seus workers do Connect tem o papel de acessador de secrets do Secret Manager nos secrets que você quer usar. Para mais informações sobre o Secret Manager, consulte Recursos do Secret Manager.

  14. Clique em Concluído.

  15. Clique em Adicionar recurso secret se precisar adicionar mais secrets.

  16. Em Rótulos, expanda a seção se necessário.

    Para organizar o projeto, adicione rótulos arbitrários como pares de chave-valor aos recursos.

    Clique em Adicionar rótulo para incluir diferentes ambientes, serviços, proprietários, equipes e assim por diante.

  17. Clique em Criar.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Execute o comando gcloud managed-kafka connect-clusters create:

    gcloud managed-kafka connect-clusters create CONNECT_CLUSTER_ID \
        --location=LOCATION \
        --cpu=CPU \
        --memory=MEMORY \
        --primary-subnet=WORKER_SUBNET \
        --kafka-cluster=KAFKA_CLUSTER \
        [--project=PROJECT_ID] \
        [--secret=SECRET] \
        [--dns-name=DNS_DOMAIN_NAME] \
        [--config-file=CONFIG_FILE] \
        [--labels=LABELS]
        [--async]
    

    Substitua:

    • CONNECT_CLUSTER_ID: o ID ou nome do cluster do Connect. Para conferir as diretrizes de nomeação de clusters Connect, acesse Diretrizes de nomeação de recursos do Serviço gerenciado para Apache Kafka. O nome de um cluster do Connect é imutável.

    • LOCATION: o local em que você cria o cluster do Connect. Precisa ser uma região Google Cloudcompatível. Não é possível mudar o local de um cluster do Connect depois da criação. Para conferir uma lista de locais disponíveis, consulte Locais do Serviço gerenciado para Apache Kafka. Para mais informações sobre recomendações de local, consulte Cluster principal do Kafka.

    • CPU: o número de vCPUs para o cluster do Connect. O valor mínimo é 3 vCPUs. Consulte Contagem de vCPU.

    • MEMORY: a quantidade de memória para o cluster do Connect. Use as unidades "MB", "MiB", "GB", "GiB", "TB" ou "TiB". Por exemplo, "3GiB". É necessário provisionar entre 1 GiB e 8 GiB por vCPU. Consulte Memória.

    • WORKER_SUBNET: a sub-rede de worker do cluster do Connect.

      O formato da sub-rede é projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_ID.

      A sub-rede de worker precisa estar na mesma região do cluster do Connect.

    • PROJECT_ID: (opcional) o ID do projetoGoogle Cloud . Se não for fornecido, o projeto atual será usado.

    • KAFKA_CLUSTER: o ID ou nome totalmente qualificado do cluster principal do Serviço gerenciado para Apache Kafka associado ao cluster do Connect. Consulte Cluster do Kafka. O formato do cluster do Kafka é projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID.

      Não é possível atualizar para um cluster do Kafka diferente depois de criar o cluster do Connect.

    • SECRET: (opcional) secrets a serem carregados nos workers. As versões exatas do secret do Secret Manager precisam ser fornecidas. Não há suporte para aliases. É possível carregar até 32 secrets em um cluster. Formato: projects/PROJECT_ID/secrets/SECRET_NAME/versions/VERSION_ID

    • DNS_DOMAIN_NAME: (opcional) nomes de domínio DNS da sub-rede a serem disponibilizados para o Connect Cluster. O cluster do Connect pode acessar recursos usando nomes de domínio em vez de depender de endereços IP. Consulte Peering de DNS.

    • LABELS: (opcional) rótulos a serem associados ao cluster. Para mais informações sobre o formato dos rótulos, consulte Rótulos. Lista de pares de rótulos KEY=VALUE a serem adicionados. As chaves precisam começar com um caractere minúsculo e conter apenas hifens (-), sublinhados (_), caracteres minúsculos e números. Os valores precisam conter apenas hifens (-), sublinhados (_), caracteres minúsculos e números.

    • CONFIG_FILE: (opcional) o caminho para o arquivo JSON ou YAML que contém a configuração substituída dos padrões do cluster ou conector. Esse arquivo também aceita JSON ou YAML inline.

    • --async: (opcional) retorna imediatamente, sem aguardar a conclusão da operação em andamento. Com a flag --async, você pode continuar com outras tarefas enquanto a criação do cluster acontece em segundo plano. Se você não usar a flag, o sistema vai aguardar a conclusão da operação antes de retornar uma resposta. Aguarde até que o cluster seja totalmente atualizado antes de continuar com outras tarefas.

    Você vai receber uma resposta semelhante a esta:

    Create request issued for: [sample-connectcluster]
    Check operation [projects/test-project/locations/us-east1/operations/operation-1753590328249-63ae19098cc06-64300a0a-06512d02] for status.
    

    Armazene o OPERATION_ID para acompanhar o progresso. Por exemplo, o valor aqui é operation-1753590328249-63ae19098cc06-64300a0a-06512d02.

  3. Terraform

    É possível usar um recurso do Terraform para criar um cluster do Connect.

    resource "google_managed_kafka_connect_cluster" "default" {
      provider           = google-beta
      project            = data.google_project.default.project_id
      connect_cluster_id = "my-connect-cluster-id"
      location           = "us-central1"
      kafka_cluster      = google_managed_kafka_cluster.default.id
      capacity_config {
        vcpu_count   = 12
        memory_bytes = 12884901888 # 12 GiB
      }
      gcp_config {
        access_config {
          network_configs {
            primary_subnet = google_compute_subnetwork.default.id
          }
        }
      }
    }

    Para saber como aplicar ou remover uma configuração do Terraform, consulte Comandos básicos do Terraform.

    Go

    Antes de testar esta amostra, siga as instruções de configuração do Go em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Go do serviço gerenciado para Apache Kafka.

    Para autenticar o Managed Service para Apache Kafka, configure o Application Default Credentials(ADC). Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func createConnectCluster(w io.Writer, projectID, region, clusterID, kafkaCluster string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-connect-cluster"
    	// kafkaCluster := "projects/my-project-id/locations/us-central1/clusters/my-kafka-cluster"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	locationPath := fmt.Sprintf("projects/%s/locations/%s", projectID, region)
    	clusterPath := fmt.Sprintf("%s/connectClusters/%s", locationPath, clusterID)
    
    	// Capacity configuration with 12 vCPU and 12 GiB RAM
    	capacityConfig := &managedkafkapb.CapacityConfig{
    		VcpuCount:   12,
    		MemoryBytes: 12884901888, // 12 GiB in bytes
    	}
    
    	// Optionally, you can also specify accessible subnets and resolvable DNS
    	// domains as part of your network configuration. For example:
    	// networkConfigs := []*managedkafkapb.ConnectNetworkConfig{
    	// 	{
    	// 		PrimarySubnet:      primarySubnet,
    	// 		AdditionalSubnets:  []string{"subnet-1", "subnet-2"},
    	// 		DnsDomainNames:     []string{"domain-1", "domain-2"},
    	// 	},
    	// }
    
    	connectCluster := &managedkafkapb.ConnectCluster{
    		Name:           clusterPath,
    		KafkaCluster:   kafkaCluster,
    		CapacityConfig: capacityConfig,
    	}
    
    	req := &managedkafkapb.CreateConnectClusterRequest{
    		Parent:           locationPath,
    		ConnectClusterId: clusterID,
    		ConnectCluster:   connectCluster,
    	}
    	op, err := client.CreateConnectCluster(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateConnectCluster got err: %w", err)
    	}
    	// The duration of this operation can vary considerably, typically taking 5-15 minutes.
    	resp, err := op.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("op.Wait got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created connect cluster: %s\n", resp.Name)
    	return nil
    }
    

    Java

    Antes de testar esta amostra, siga as instruções de configuração do Java em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Java do serviço gerenciado para Apache Kafka.

    Para autenticar o serviço gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.api.gax.longrunning.OperationSnapshot;
    import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
    import com.google.api.gax.retrying.RetrySettings;
    import com.google.api.gax.retrying.RetryingFuture;
    import com.google.api.gax.retrying.TimedRetryAlgorithm;
    import com.google.cloud.managedkafka.v1.CapacityConfig;
    import com.google.cloud.managedkafka.v1.ConnectAccessConfig;
    import com.google.cloud.managedkafka.v1.ConnectCluster;
    import com.google.cloud.managedkafka.v1.ConnectGcpConfig;
    import com.google.cloud.managedkafka.v1.ConnectNetworkConfig;
    import com.google.cloud.managedkafka.v1.CreateConnectClusterRequest;
    import com.google.cloud.managedkafka.v1.LocationName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectSettings;
    import com.google.cloud.managedkafka.v1.OperationMetadata;
    import java.time.Duration;
    import java.util.concurrent.ExecutionException;
    
    public class CreateConnectCluster {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-connect-cluster";
        String subnet = "my-subnet"; // e.g. projects/my-project/regions/my-region/subnetworks/my-subnet
        String kafkaCluster = "my-kafka-cluster"; // The Kafka cluster to connect to
        int cpu = 12;
        long memoryBytes = 12884901888L; // 12 GiB
        createConnectCluster(projectId, region, clusterId, subnet, kafkaCluster, cpu, memoryBytes);
      }
    
      public static void createConnectCluster(
          String projectId,
          String region,
          String clusterId,
          String subnet,
          String kafkaCluster,
          int cpu,
          long memoryBytes)
          throws Exception {
        CapacityConfig capacityConfig = CapacityConfig.newBuilder().setVcpuCount(cpu)
            .setMemoryBytes(memoryBytes).build();
        ConnectNetworkConfig networkConfig = ConnectNetworkConfig.newBuilder()
            .setPrimarySubnet(subnet)
            .build();
        // Optionally, you can also specify additional accessible subnets and resolvable
        // DNS domains as part of your network configuration. For example:
        // .addAllAdditionalSubnets(List.of("subnet-1", "subnet-2"))
        // .addAllDnsDomainNames(List.of("dns-1", "dns-2"))
        ConnectGcpConfig gcpConfig = ConnectGcpConfig.newBuilder()
            .setAccessConfig(ConnectAccessConfig.newBuilder().addNetworkConfigs(networkConfig).build())
            .build();
        ConnectCluster connectCluster = ConnectCluster.newBuilder()
            .setCapacityConfig(capacityConfig)
            .setGcpConfig(gcpConfig)
            .setKafkaCluster(kafkaCluster)
            .build();
    
        // Create the settings to configure the timeout for polling operations
        ManagedKafkaConnectSettings.Builder settingsBuilder = ManagedKafkaConnectSettings.newBuilder();
        TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
            RetrySettings.newBuilder()
                .setTotalTimeoutDuration(Duration.ofHours(1L))
                .build());
        settingsBuilder.createConnectClusterOperationSettings()
            .setPollingAlgorithm(timedRetryAlgorithm);
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient
            .create(settingsBuilder.build())) {
          CreateConnectClusterRequest request = CreateConnectClusterRequest.newBuilder()
              .setParent(LocationName.of(projectId, region).toString())
              .setConnectClusterId(clusterId)
              .setConnectCluster(connectCluster)
              .build();
    
          // The duration of this operation can vary considerably, typically taking
          // between 10-30 minutes.
          OperationFuture<ConnectCluster, OperationMetadata> future = managedKafkaConnectClient
              .createConnectClusterOperationCallable().futureCall(request);
    
          // Get the initial LRO and print details.
          OperationSnapshot operation = future.getInitialFuture().get();
          System.out.printf(
              "Connect cluster creation started. Operation name: %s\nDone: %s\nMetadata: %s\n",
              operation.getName(), operation.isDone(), future.getMetadata().get().toString());
    
          while (!future.isDone()) {
            // The pollingFuture gives us the most recent status of the operation
            RetryingFuture<OperationSnapshot> pollingFuture = future.getPollingFuture();
            OperationSnapshot currentOp = pollingFuture.getAttemptResult().get();
            System.out.printf("Polling Operation:\nName: %s\n Done: %s\n",
                currentOp.getName(),
                currentOp.isDone());
          }
    
          // NOTE: future.get() blocks completion until the operation is complete (isDone
          // = True)
          ConnectCluster response = future.get();
          System.out.printf("Created connect cluster: %s\n", response.getName());
        } catch (ExecutionException e) {
          System.err.printf("managedKafkaConnectClient.createConnectCluster got err: %s\n", 
              e.getMessage());
          throw e;
        }
      }
    }

    Python

    Antes de testar esta amostra, siga as instruções de configuração do Python em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Python do serviço gerenciado para Apache Kafka.

    Para autenticar o serviço gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud import managedkafka_v1
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import ManagedKafkaConnectClient
    from google.cloud.managedkafka_v1.types import ConnectCluster, CreateConnectClusterRequest, ConnectNetworkConfig
    
    # TODO(developer): Update with your values.
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # kafka_cluster_id = "my-kafka-cluster"
    # primary_subnet = "projects/my-project-id/regions/us-central1/subnetworks/default"
    # cpu = 12
    # memory_bytes = 12884901888  # 12 GiB
    
    connect_client = ManagedKafkaConnectClient()
    kafka_client = managedkafka_v1.ManagedKafkaClient()
    
    parent = connect_client.common_location_path(project_id, region)
    kafka_cluster_path = kafka_client.cluster_path(project_id, region, kafka_cluster_id)
    
    connect_cluster = ConnectCluster()
    connect_cluster.name = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
    connect_cluster.kafka_cluster = kafka_cluster_path
    connect_cluster.capacity_config.vcpu_count = cpu
    connect_cluster.capacity_config.memory_bytes = memory_bytes
    connect_cluster.gcp_config.access_config.network_configs = [ConnectNetworkConfig(primary_subnet=primary_subnet)]
    # Optionally, you can also specify accessible subnets and resolvable DNS domains as part of your network configuration.
    # For example:
    # connect_cluster.gcp_config.access_config.network_configs = [
    #     ConnectNetworkConfig(
    #         primary_subnet=primary_subnet,
    #         additional_subnets=additional_subnets,
    #         dns_domain_names=dns_domain_names,
    #     )
    # ]
    
    request = CreateConnectClusterRequest(
        parent=parent,
        connect_cluster_id=connect_cluster_id,
        connect_cluster=connect_cluster,
    )
    
    try:
        operation = connect_client.create_connect_cluster(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        # Creating a Connect cluster can take 10-40 minutes.
        response = operation.result(timeout=3000)
        print("Created Connect cluster:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")
    

Monitorar a operação de criação do cluster

Só é possível executar o comando a seguir se você tiver executado a CLI gcloud para criar o cluster do Connect.

  • A criação de um cluster do Connect geralmente leva de 20 a 30 minutos. Para acompanhar o progresso da criação do cluster, o comando gcloud managed-kafka connect-clusters create usa uma operação de longa duração (LRO, na sigla em inglês), que pode ser monitorada com o seguinte comando:

    gcloud managed-kafka operations describe OPERATION_ID \
        --location=LOCATION
    

    Substitua:

    • OPERATION_ID com o valor do ID da operação da seção anterior.
    • LOCATION com o valor do local da seção anterior.

A seguir

Apache Kafka® é uma marca registrada da The Apache Software Foundation ou afiliadas nos Estados Unidos e/ou em outros países.