Criar um cluster do Connect

Um cluster do Connect oferece um ambiente para conectores que ajuda a mover dados de implantações do Kafka para um cluster do Serviço Gerenciado do Google Cloud para Apache Kafka ou de um cluster do Serviço Gerenciado para Apache Kafka para outro serviço Google Cloud ou cluster do Kafka. O cluster secundário do Kafka pode ser outro cluster do Serviço Gerenciado do Google Cloud para Apache Kafka, um cluster autogerenciado ou um local.

Antes de começar

Verifique se você já criou um cluster do Serviço gerenciado para Apache Kafka. Você precisa do nome do cluster do Serviço gerenciado para Apache Kafka a que o cluster do Connect será anexado.

Cada cluster do Connect está associado a um cluster do Serviço gerenciado para Apache Kafka. Esse cluster armazena o estado dos conectores em execução no cluster do Connect.

Papéis e permissões necessários para criar um cluster do Connect

Para receber as permissões necessárias para criar um cluster do Connect, peça ao administrador para conceder a você o papel do IAM de Editor de cluster gerenciado do Kafka Connect (roles/managedkafka.connectClusterEditor) no projeto. Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Esse papel predefinido contém as permissões necessárias para criar um cluster do Connect. Para acessar as permissões exatas necessárias, expanda a seção Permissões necessárias:

Permissões necessárias

As seguintes permissões são necessárias para criar um cluster do Connect:

  • Conceda a permissão para criar um cluster do Connect no local especificado: managedkafka.connectClusters.create

Essas permissões também podem ser concedidas com funções personalizadas ou outros papéis predefinidos.

Principais da ACL obrigatórios

Por padrão, os clusters do Serviço Gerenciado para Apache Kafka permitem que o cluster do Connect acesse recursos se nenhuma ACL estiver configurada. Isso é feito ao definir allow.everyone.if.no.acl.found como true, que é a configuração padrão.

No entanto, se o cluster do Serviço gerenciado para Apache Kafka tiver ACLs configuradas, o cluster do Connect não terá automaticamente as permissões de leitura e gravação para os recursos. É necessário concedê-las manualmente.

A conta de serviço do cluster do Connect usada como principal em ACLs segue este formato: User:service-{consumer project number}@gcp-sa-managedkafka.iam.gserviceaccount.com.

Se você configurou ACLs no cluster do Kafka, conceda ao cluster do Connect permissões de leitura e gravação para tópicos e permissões de leitura para grupos de consumidores usando os seguintes comandos:

/bin/kafka-acls.sh \
    --bootstrap-server BOOTSTRAP_ADDR \
    --command-config PATH_TO_CLIENT_PROPERTIES \
    --add \
    --allow-principal User:service-{consumer project number}@gcp-sa-managedkafka.iam.gserviceaccount.com \
    --operation READ --operation WRITE --topic *
/bin/kafka-acls.sh \
    --bootstrap-server BOOTSTRAP_ADDR \
    --command-config PATH_TO_CLIENT_PROPERTIES \
    --add \
    --allow-principal User:service-{consumer project number}@gcp-sa-managedkafka.iam.gserviceaccount.com \
    --operation READ --group *

Para mais informações sobre esses comandos, consulte Configurar ACLs do Apache Kafka para controle de acesso granular.

Criar um cluster do Connect em outro projeto

O Serviço Gerenciado para Apache Kafka usa um agente de serviço para acessar recursos doGoogle Cloud . O agente de serviço está associado ao projeto em que você cria o cluster.

Se você criar um cluster do Connect em um projeto diferente do cluster do Serviço Gerenciado para Apache Kafka, o cluster do Connect e o cluster do Kafka vão usar os agentes de serviço associados aos respectivos projetos. Nesse caso, o agente de serviço do cluster do Connect precisa de permissão para acessar recursos Google Cloud no projeto do cluster do Kafka.

Para conceder as permissões necessárias, atribua o papel Agente de serviço do Managed Kafka ao agente de serviço do cluster do Connect no projeto do cluster do Kafka. Por exemplo, se você criar um cluster do Kafka no projeto kafka-project e um cluster do Connect no projeto connect-project, conceda o papel de agente de serviço do Managed Kafka em kafka-project ao agente de serviço associado a connect-project.

O endereço de e-mail do agente de serviço tem o seguinte formato: service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com, em que PROJECT_NUMBER é o número do projeto. Para mais informações sobre como conceder o papel, consulte Criar e conceder papéis a agentes de serviço.

Propriedades de um cluster do Connect

Nesta seção, descrevemos as propriedades de um cluster do Connect.

Nome do cluster Connect

O nome do cluster do Connect que você está criando. Para conferir as diretrizes de nomeação de clusters Connect, acesse Diretrizes de nomeação de recursos do Serviço gerenciado para Apache Kafka. O nome de um cluster é imutável.

Cluster principal do Kafka

O cluster do Serviço gerenciado para Apache Kafka associado ao seu cluster do Connect. O cluster associado (cluster principal) armazena o estado dos conectores em execução no cluster do Connect. Em geral, o cluster principal do Serviço Gerenciado para Apache Kafka também serve como destino para todos os conectores de origem e entrada para todos os conectores de coletor em execução no cluster do Connect.

Um único cluster do Serviço Gerenciado para Apache Kafka pode ter vários clusters do Connect. Se você escolher um cluster do Serviço gerenciado para Apache Kafka em um projeto diferente, verifique se as permissões adequadas estão configuradas.

Não é possível atualizar para um cluster do Kafka diferente depois de criar o cluster do Connect.

Benefícios da colocalização de regiões para latência e custos de rede

Colocar os clusters do Serviço Gerenciado para Apache Kafka e do Connect na mesma região reduz a latência e os custos de rede. Por exemplo, suponha que seu cluster do Serviço gerenciado para Apache Kafka esteja em region-a e você esteja usando um conector de coletor para gravar dados desse cluster (origem) em uma tabela do BigQuery (coletor) que também esteja em region-a. Se você implantar o cluster do Connect em region-a, essa escolha de implantação vai minimizar a latência da operação de gravação do BigQuery e eliminar os custos de transferência de rede entre regiões do cluster do Serviço Gerenciado para Apache Kafka e o cluster do Connect.

Considerações sobre custo e latência de vários sistemas

O Kafka Connect usa conectores para mover dados entre sistemas. Um lado do conector sempre interage com um cluster do Serviço Gerenciado para Apache Kafka. Um único cluster do Kafka Connect pode executar vários conectores, cada um atuando como uma origem (extraindo dados de um sistema) ou um coletor (enviando dados para um sistema).

Embora um cluster do Connect na mesma região que o cluster do Serviço gerenciado para Apache Kafka se beneficie de uma latência de comunicação menor entre eles, cada conector também interage com outro sistema, como uma tabela do BigQuery ou outro cluster do Kafka. Mesmo que o cluster do Connect e o cluster do Serviço Gerenciado para Apache Kafka estejam no mesmo local, esse outro sistema pode estar em uma região diferente. Isso leva a maior latência e custo. A latência geral do pipeline depende dos locais de todos os três sistemas: o cluster do Serviço gerenciado para Apache Kafka, o cluster do Connect e o sistema de origem ou destino.

Por exemplo, se o cluster do Serviço gerenciado para Apache Kafka estiver em region-a, o cluster do Connect em region-b e você estiver usando um conector do Cloud Storage para um bucket em region-c, será cobrado por dois hops de rede (region-a para region-b e depois region-b para region-c, ou o inverso, dependendo da direção do conector).

Considere todas as regiões envolvidas ao planejar o posicionamento do cluster do Connect para otimizar a latência e o custo.

Configuração de capacidade

A configuração da capacidade exige que você defina o número de vCPUs e a quantidade de memória para cada vCPU do cluster do Connect. É possível atualizar a capacidade de um cluster do Connect depois de criá-lo. Confira abaixo as propriedades para configuração de capacidade:

  • vCPUs: o número de vCPUs atribuídas a um cluster do Connect. O valor mínimo é de 3 vCPUs.

  • Memória: a quantidade de memória atribuída a cada vCPU. É necessário provisionar entre 1 GiB e 8 GiB por vCPU. É possível aumentar ou diminuir a quantidade de memória dentro desses limites depois que o cluster é criado.

    Por exemplo, se você criar um cluster com 6 vCPUs, a memória mínima que poderá alocar para o cluster será de 6 GiB (1 GiB por vCPU), e a máxima será de 48 GiB (8 GiB por vCPU).

A vCPU e a memória alocadas para cada worker em um cluster do Connect têm um impacto significativo no desempenho, na capacidade e no custo do cluster. Confira um resumo de como a vCPU e a memória afetam um cluster do Connect.

Contagem de vCPU

  • O Kafka Connect divide o trabalho de um conector em tarefas. Cada tarefa pode processar dados em paralelo. Mais vCPUs significam que mais tarefas podem ser executadas simultaneamente, o que leva a uma maior capacidade de processamento.

  • Mais vCPUs aumentam os custos do cluster do Connect.

Memória

  • O Kafka Connect usa memória para armazenar dados em buffer à medida que eles fluem entre conectores e o Serviço gerenciado para Apache Kafka. Uma memória maior permite buffers maiores. Uma memória grande pode melhorar a capacidade de transferência, especialmente para fluxos de dados de alto volume. Os conectores que lidam com mensagens ou registros muito grandes exigem memória suficiente para processá-los sem encontrar exceções de OutOfMemoryError.

  • Mais memória aumenta o custo do cluster do Connect.

  • Se você estiver usando uma lógica de transformação pesada, vai precisar de mais alocação de memória.

Seu objetivo é escolher a configuração de capacidade certa para seu cluster do Connect. Para isso, é preciso entender a capacidade de processamento que seu cluster do Connect pode lidar.

Sub-rede de worker (principal)

A sub-rede de worker, também conhecida como sub-rede principal, conecta sua rede VPC ao cluster do Connect. Essa sub-rede permite que os trabalhadores do cluster alcancem os endpoints de fontes e receptores na rede do consumidor, como clusters do Serviço Gerenciado para Apache Kafka ou clusters do Kafka auto-hospedados.

Confira alguns requisitos para configurar a sub-rede de worker:

  • A sub-rede do worker é obrigatória.

  • A sub-rede precisa estar na mesma região do cluster do Connect.

  • A sub-rede precisa estar na mesma VPC principal que uma das sub-redes conectadas do cluster principal do Kafka.

  • O intervalo de CIDR da sub-rede precisa ter um tamanho mínimo de /22 (1.024 endereços).

Os workers do cluster recebem endereços IP na sub-rede de worker usando uma interface do Private Service Connect. Os workers podem alcançar qualquer destino de rede acessível na rede VPC da sub-rede, com os seguintes requisitos:

  • O endpoint não pode estar no intervalo CIDR 172.16.0.0/14. Esse intervalo é reservado para uso interno do Serviço gerenciado para Apache Kafka Connect.
  • As regras de firewall precisam permitir o tráfego. Consulte Configurar a segurança para anexos de rede.
  • Para o tráfego da Internet, configure um Cloud NAT. Por exemplo, um Cloud NAT é necessário para que um conector do MirrorMaker replique dados de um cluster do Kafka acessível pela Internet.
  • Para acessar endpoints do Private Service Connect que estão em uma VPC diferente da VPC da sub-rede do worker, verifique se você está usando uma configuração de consumidor compatível (por exemplo, NCC). Para mais informações, consulte Sobre como acessar serviços publicados por meio de endpoints.

Domínios DNS solucionáveis

Os domínios DNS resolvíveis, também conhecidos como nomes de domínio DNS, permitem que os endereços DNS na rede VPC do consumidor sejam disponibilizados para a VPC do locatário. Isso permite que o cluster do Connect resolva nomes DNS para endereços IP, facilitando a comunicação com outros serviços, incluindo outros clusters do Kafka para conectores do MirrorMaker.

Para domínios DNS resolvíveis, selecione um cluster do Serviço Gerenciado para Apache Kafka. Não é necessário configurar o nome de domínio DNS para o cluster principal do Serviço gerenciado para Apache Kafka, já que o endereço de bootstrap dele é incluído automaticamente na lista de domínios DNS solucionáveis.

No entanto, também é possível especificar um domínio DNS manualmente, o que é necessário se você selecionar um cluster externo do Kafka. O domínio de DNS do cluster principal do Serviço Gerenciado para Apache Kafka é incluído automaticamente. Outros clusters do Kafka ainda exigem a configuração de domínios DNS.

Recursos do Secret Manager

Alguns conectores exigem dados sensíveis, como senhas, como parte da configuração. Para gerenciar esse tipo de dado com segurança, armazene-o no Secret Manager e conceda ao cluster do Connect acesso ao secret.

Para usar secrets do Secret Manager com o Kafka Connect, faça o seguinte:

  1. Conceda o papel Acessador de secrets do Secret Manager (roles/secretmanager.secretAccessor) à conta de serviço do Kafka gerenciado. Essa função permite que o cluster do Connect acesse secrets.

  2. Crie um secret que contenha os dados sensíveis. Para mais informações, consulte Criar um secret.

  3. Ao criar ou atualizar o cluster do Connect, especifique os secrets a que o cluster tem acesso. É possível especificar até 32 secrets por cluster do Connect.

Os secrets são montados como arquivos nos workers do cluster. Os conectores têm acesso somente leitura a esses arquivos. Ao criar um conector, as propriedades de configuração dele podem fazer referência aos secrets.

  • Para fazer referência ao caminho de um arquivo secreto, use o seguinte formato:

    /var/secrets/PROJECT_NAME-SECRET_NAME-SECRET_VERSION
    

    Exemplo: ssl.truststore.location=/var/secrets/project1-truststore-1

  • Para usar o valor de um secret como um valor de configuração (por exemplo, uma senha), use o seguinte formato:

    ${directory:/var/secrets:PROJECT_NAME-SECRET_NAME-SECRET_VERSION}
    

    Exemplo: password=${directory:/var/secrets:project1-database_password-3}

Substitua:

  • PROJECT_NAME: o nome do projeto Google Cloud .
  • SECRET_NAME: o nome do secret.
  • SECRET_VERSION: a versão do secret.

Rótulos

Os rótulos são pares de chave-valor que ajudam na organização e identificação. Elas ajudam a organizar clusters do Connect. É possível anexar um rótulo a cada cluster do Connect e filtrar os recursos com base nesses rótulos. Exemplos de rótulos são environment:prod, application:web-app.

Criar um cluster do Connect

Antes de criar um cluster, consulte a documentação sobre propriedades do cluster do Connect.

A criação de um cluster do Connect leva de 20 a 30 minutos.

Console

  1. No console do Google Cloud , acesse a página Conectar clusters.

    Acessar o Connect Clusters

  2. Clique em Criar.

  3. No campo Nome do cluster do Connect, insira um nome para o cluster do Connect. Para mais informações, consulte Diretrizes de nomeação de recursos do Serviço gerenciado para Apache Kafka.

  4. Na lista Cluster principal do Kafka, selecione um cluster do Serviço gerenciado para Apache Kafka. Para mais informações, consulte Cluster principal do Kafka.

  5. Na lista Região, selecione um local para o cluster do Connect. Para mais informações sobre como selecionar um local, consulte Cluster principal do Kafka.

  6. Na seção Configuração de capacidade, insira valores nos seguintes campos ou mantenha os valores padrão.

    • No campo vCPUs, insira o número de CPUs virtuais para o cluster.

    • No campo Memória, insira a quantidade de memória por CPU, em GiB. O valor não pode exceder 8 GiB por CPU.

    Para mais informações, consulte Configuração de capacidade.

  7. Na seção Configuração de rede, selecione uma rede VPC na lista Rede ou deixe o valor padrão. Essa lista é preenchida quando você seleciona o cluster principal do Kafka.

  8. Na seção Sub-rede de nós de trabalho, selecione uma sub-rede na lista Sub-rede ou deixe o valor padrão. Para mais informações, consulte Sub-rede de worker. O campo Caminho de URI da sub-rede é preenchido automaticamente quando você seleciona a sub-rede.

  9. Opcional: adicione um domínio DNS solucionável. O domínio de DNS do cluster principal do Kafka é adicionado automaticamente como um domínio DNS solucionável. Para especificar outros domínios DNS, faça o seguinte:

    1. Expanda a seção Domínios DNS solucionáveis.

    2. Clique em Adicionar domínio de DNS.

    3. Para adicionar o domínio DNS de um cluster do Serviço gerenciado para Apache Kafka, selecione o cluster na lista Cluster do Kafka. Caso contrário, insira o domínio de DNS no campo Domínio de DNS.

    4. Clique em Concluído.

  10. Opcional: para adicionar recursos do Secret Manager, faça o seguinte:

    1. Expanda a seção Recursos do Secret Manager.

    2. Clique em Adicionar recurso secreto.

    3. Na lista Secret, selecione um secret.

    4. Na lista Versão do secret, selecione uma versão do secret.

    5. Clique em Concluído.

  11. Opcional: adicione rótulos para organizar seu projeto. Para adicionar um rótulo, faça o seguinte:

    1. Expanda a seção Rótulos.

    2. Clique em Adicionar rótulo.

    3. No campo Chave, insira a chave do rótulo.

    4. No campo Valor, insira o valor do rótulo.

  12. Clique em Criar.

gcloud

  1. No console do Google Cloud , ative o Cloud Shell.

    Ativar o Cloud Shell

    Na parte de baixo do console Google Cloud , uma sessão do Cloud Shell é iniciada e exibe um prompt de linha de comando. O Cloud Shell é um ambiente shell com a CLI do Google Cloud já instalada e com valores já definidos para o projeto atual. A inicialização da sessão pode levar alguns segundos.

  2. Execute o comando gcloud managed-kafka connect-clusters create:

    gcloud managed-kafka connect-clusters create CONNECT_CLUSTER_ID \
        --location=LOCATION \
        --cpu=CPU \
        --memory=MEMORY \
        --primary-subnet=WORKER_SUBNET \
        --kafka-cluster=KAFKA_CLUSTER \
        [--project=PROJECT_ID] \
        [--secret=SECRET] \
        [--dns-name=DNS_DOMAIN_NAME] \
        [--config-file=CONFIG_FILE] \
        [--labels=LABELS]
        [--async]
    

    Substitua:

    • CONNECT_CLUSTER_ID: o ID ou nome do cluster do Connect. Para conferir as diretrizes de nomeação de clusters Connect, acesse Diretrizes de nomeação de recursos do Serviço gerenciado para Apache Kafka. O nome de um cluster do Connect é imutável.

    • LOCATION: o local em que você cria o cluster do Connect. Precisa ser uma região Google Cloudcompatível. Não é possível mudar o local de um cluster do Connect depois da criação. Consulte a lista de locais disponíveis em Locais do Serviço gerenciado para Apache Kafka. Para mais informações sobre recomendações de local, consulte Cluster principal do Kafka.

    • CPU: o número de vCPUs para o cluster do Connect. O valor mínimo é 3 vCPUs. Consulte Contagem de vCPU.

    • MEMORY: a quantidade de memória para o cluster do Connect. Use as unidades "MB", "MiB", "GB", "GiB", "TB" ou "TiB". Por exemplo, "3GiB". É necessário provisionar entre 1 GiB e 8 GiB por vCPU. Consulte Memória.

    • WORKER_SUBNET: a sub-rede de trabalho do cluster do Connect.

      O formato da sub-rede é projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_ID.

      A sub-rede de worker precisa estar na mesma região do cluster do Connect.

    • PROJECT_ID: (opcional) o ID do projetoGoogle Cloud . Se não for fornecido, o projeto atual será usado.

    • KAFKA_CLUSTER: o ID ou nome totalmente qualificado do cluster principal do Serviço gerenciado para Apache Kafka associado ao cluster do Connect. Consulte Cluster do Kafka. O formato do cluster do Kafka é projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID.

      Não é possível atualizar para um cluster do Kafka diferente depois de criar o cluster do Connect.

    • SECRET: (opcional) secrets a serem carregados nos workers. As versões exatas do Secret Manager precisam ser fornecidas. Aliases não são aceitos. É possível carregar até 32 secrets em um cluster. Formato: projects/PROJECT_ID/secrets/SECRET_NAME/versions/VERSION_ID

    • DNS_DOMAIN_NAME: (opcional) nomes de domínio DNS da sub-rede a serem disponibilizados para o Connect Cluster. O cluster do Connect pode acessar recursos usando nomes de domínio em vez de depender de endereços IP. Consulte Peering de DNS.

    • LABELS: (opcional) rótulos a serem associados ao cluster. Para mais informações sobre o formato dos rótulos, consulte Rótulos. Lista de pares de rótulos KEY=VALUE a serem adicionados. As chaves precisam começar com um caractere minúsculo e conter apenas hifens (-), sublinhados (_), caracteres minúsculos e números. Os valores precisam conter apenas hifens (-), sublinhados (_), caracteres minúsculos e números.

    • CONFIG_FILE: (opcional) o caminho para o arquivo JSON ou YAML que contém a configuração substituída dos padrões do cluster ou conector. Esse arquivo também aceita JSON ou YAML inline.

    • --async: (opcional) retorna imediatamente, sem aguardar a conclusão da operação em andamento. Com a flag --async, você pode continuar com outras tarefas enquanto a criação do cluster acontece em segundo plano. Se você não usar a flag, o sistema vai aguardar a conclusão da operação antes de retornar uma resposta. Aguarde até que o cluster seja totalmente atualizado antes de continuar com outras tarefas.

    Você vai receber uma resposta semelhante a esta:

    Create request issued for: [sample-connectcluster]
    Check operation [projects/test-project/locations/us-east1/operations/operation-1753590328249-63ae19098cc06-64300a0a-06512d02] for status.
    

    Armazene o OPERATION_ID para acompanhar o progresso. Por exemplo, o valor aqui é operation-1753590328249-63ae19098cc06-64300a0a-06512d02.

Terraform

É possível usar um recurso do Terraform para criar um cluster do Connect.

resource "google_managed_kafka_connect_cluster" "default" {
  provider           = google-beta
  project            = data.google_project.default.project_id
  connect_cluster_id = "my-connect-cluster-id"
  location           = "us-central1"
  kafka_cluster      = google_managed_kafka_cluster.default.id
  capacity_config {
    vcpu_count   = 12
    memory_bytes = 12884901888 # 12 GiB
  }
  gcp_config {
    access_config {
      network_configs {
        primary_subnet = google_compute_subnetwork.default.id
      }
    }
  }
}

Para saber como aplicar ou remover uma configuração do Terraform, consulte Comandos básicos do Terraform.

Go

Antes de testar esta amostra, siga as instruções de configuração do Go em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Go do serviço gerenciado para Apache Kafka.

Para autenticar o serviço gerenciado para Apache Kafka, configure o Application Default Credentials(ADC). Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func createConnectCluster(w io.Writer, projectID, region, clusterID, kafkaCluster string, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-connect-cluster"
	// kafkaCluster := "projects/my-project-id/locations/us-central1/clusters/my-kafka-cluster"
	ctx := context.Background()
	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
	}
	defer client.Close()

	locationPath := fmt.Sprintf("projects/%s/locations/%s", projectID, region)
	clusterPath := fmt.Sprintf("%s/connectClusters/%s", locationPath, clusterID)

	// Capacity configuration with 12 vCPU and 12 GiB RAM
	capacityConfig := &managedkafkapb.CapacityConfig{
		VcpuCount:   12,
		MemoryBytes: 12884901888, // 12 GiB in bytes
	}

	// Optionally, you can also specify accessible subnets and resolvable DNS
	// domains as part of your network configuration. For example:
	// networkConfigs := []*managedkafkapb.ConnectNetworkConfig{
	// 	{
	// 		PrimarySubnet:      primarySubnet,
	// 		AdditionalSubnets:  []string{"subnet-1", "subnet-2"},
	// 		DnsDomainNames:     []string{"domain-1", "domain-2"},
	// 	},
	// }

	connectCluster := &managedkafkapb.ConnectCluster{
		Name:           clusterPath,
		KafkaCluster:   kafkaCluster,
		CapacityConfig: capacityConfig,
	}

	req := &managedkafkapb.CreateConnectClusterRequest{
		Parent:           locationPath,
		ConnectClusterId: clusterID,
		ConnectCluster:   connectCluster,
	}
	op, err := client.CreateConnectCluster(ctx, req)
	if err != nil {
		return fmt.Errorf("client.CreateConnectCluster got err: %w", err)
	}
	// The duration of this operation can vary considerably, typically taking 5-15 minutes.
	resp, err := op.Wait(ctx)
	if err != nil {
		return fmt.Errorf("op.Wait got err: %w", err)
	}
	fmt.Fprintf(w, "Created connect cluster: %s\n", resp.Name)
	return nil
}

Java

Antes de testar esta amostra, siga as instruções de configuração do Java em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Java do serviço gerenciado para Apache Kafka.

Para autenticar o serviço gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.


import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.longrunning.OperationSnapshot;
import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.RetryingFuture;
import com.google.api.gax.retrying.TimedRetryAlgorithm;
import com.google.cloud.managedkafka.v1.CapacityConfig;
import com.google.cloud.managedkafka.v1.ConnectAccessConfig;
import com.google.cloud.managedkafka.v1.ConnectCluster;
import com.google.cloud.managedkafka.v1.ConnectGcpConfig;
import com.google.cloud.managedkafka.v1.ConnectNetworkConfig;
import com.google.cloud.managedkafka.v1.CreateConnectClusterRequest;
import com.google.cloud.managedkafka.v1.LocationName;
import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
import com.google.cloud.managedkafka.v1.ManagedKafkaConnectSettings;
import com.google.cloud.managedkafka.v1.OperationMetadata;
import java.time.Duration;
import java.util.concurrent.ExecutionException;

public class CreateConnectCluster {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-connect-cluster";
    String subnet = "my-subnet"; // e.g. projects/my-project/regions/my-region/subnetworks/my-subnet
    String kafkaCluster = "my-kafka-cluster"; // The Kafka cluster to connect to
    int cpu = 12;
    long memoryBytes = 12884901888L; // 12 GiB
    createConnectCluster(projectId, region, clusterId, subnet, kafkaCluster, cpu, memoryBytes);
  }

  public static void createConnectCluster(
      String projectId,
      String region,
      String clusterId,
      String subnet,
      String kafkaCluster,
      int cpu,
      long memoryBytes)
      throws Exception {
    CapacityConfig capacityConfig = CapacityConfig.newBuilder().setVcpuCount(cpu)
        .setMemoryBytes(memoryBytes).build();
    ConnectNetworkConfig networkConfig = ConnectNetworkConfig.newBuilder()
        .setPrimarySubnet(subnet)
        .build();
    // Optionally, you can also specify additional accessible subnets and resolvable
    // DNS domains as part of your network configuration. For example:
    // .addAllAdditionalSubnets(List.of("subnet-1", "subnet-2"))
    // .addAllDnsDomainNames(List.of("dns-1", "dns-2"))
    ConnectGcpConfig gcpConfig = ConnectGcpConfig.newBuilder()
        .setAccessConfig(ConnectAccessConfig.newBuilder().addNetworkConfigs(networkConfig).build())
        .build();
    ConnectCluster connectCluster = ConnectCluster.newBuilder()
        .setCapacityConfig(capacityConfig)
        .setGcpConfig(gcpConfig)
        .setKafkaCluster(kafkaCluster)
        .build();

    // Create the settings to configure the timeout for polling operations
    ManagedKafkaConnectSettings.Builder settingsBuilder = ManagedKafkaConnectSettings.newBuilder();
    TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
        RetrySettings.newBuilder()
            .setTotalTimeoutDuration(Duration.ofHours(1L))
            .build());
    settingsBuilder.createConnectClusterOperationSettings()
        .setPollingAlgorithm(timedRetryAlgorithm);

    try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient
        .create(settingsBuilder.build())) {
      CreateConnectClusterRequest request = CreateConnectClusterRequest.newBuilder()
          .setParent(LocationName.of(projectId, region).toString())
          .setConnectClusterId(clusterId)
          .setConnectCluster(connectCluster)
          .build();

      // The duration of this operation can vary considerably, typically taking
      // between 10-30 minutes.
      OperationFuture<ConnectCluster, OperationMetadata> future = managedKafkaConnectClient
          .createConnectClusterOperationCallable().futureCall(request);

      // Get the initial LRO and print details.
      OperationSnapshot operation = future.getInitialFuture().get();
      System.out.printf(
          "Connect cluster creation started. Operation name: %s\nDone: %s\nMetadata: %s\n",
          operation.getName(), operation.isDone(), future.getMetadata().get().toString());

      while (!future.isDone()) {
        // The pollingFuture gives us the most recent status of the operation
        RetryingFuture<OperationSnapshot> pollingFuture = future.getPollingFuture();
        OperationSnapshot currentOp = pollingFuture.getAttemptResult().get();
        System.out.printf("Polling Operation:\nName: %s\n Done: %s\n",
            currentOp.getName(),
            currentOp.isDone());
      }

      // NOTE: future.get() blocks completion until the operation is complete (isDone
      // = True)
      ConnectCluster response = future.get();
      System.out.printf("Created connect cluster: %s\n", response.getName());
    } catch (ExecutionException e) {
      System.err.printf("managedKafkaConnectClient.createConnectCluster got err: %s\n", 
          e.getMessage());
      throw e;
    }
  }
}

Python

Antes de testar esta amostra, siga as instruções de configuração do Python em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Python do serviço gerenciado para Apache Kafka.

Para autenticar o serviço gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

from google.api_core.exceptions import GoogleAPICallError
from google.cloud import managedkafka_v1
from google.cloud.managedkafka_v1.services.managed_kafka_connect import ManagedKafkaConnectClient
from google.cloud.managedkafka_v1.types import ConnectCluster, CreateConnectClusterRequest, ConnectNetworkConfig

# TODO(developer): Update with your values.
# project_id = "my-project-id"
# region = "us-central1"
# connect_cluster_id = "my-connect-cluster"
# kafka_cluster_id = "my-kafka-cluster"
# primary_subnet = "projects/my-project-id/regions/us-central1/subnetworks/default"
# cpu = 12
# memory_bytes = 12884901888  # 12 GiB

connect_client = ManagedKafkaConnectClient()
kafka_client = managedkafka_v1.ManagedKafkaClient()

parent = connect_client.common_location_path(project_id, region)
kafka_cluster_path = kafka_client.cluster_path(project_id, region, kafka_cluster_id)

connect_cluster = ConnectCluster()
connect_cluster.name = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
connect_cluster.kafka_cluster = kafka_cluster_path
connect_cluster.capacity_config.vcpu_count = cpu
connect_cluster.capacity_config.memory_bytes = memory_bytes
connect_cluster.gcp_config.access_config.network_configs = [ConnectNetworkConfig(primary_subnet=primary_subnet)]
# Optionally, you can also specify accessible subnets and resolvable DNS domains as part of your network configuration.
# For example:
# connect_cluster.gcp_config.access_config.network_configs = [
#     ConnectNetworkConfig(
#         primary_subnet=primary_subnet,
#         additional_subnets=additional_subnets,
#         dns_domain_names=dns_domain_names,
#     )
# ]

request = CreateConnectClusterRequest(
    parent=parent,
    connect_cluster_id=connect_cluster_id,
    connect_cluster=connect_cluster,
)

try:
    operation = connect_client.create_connect_cluster(request=request)
    print(f"Waiting for operation {operation.operation.name} to complete...")
    # Creating a Connect cluster can take 10-40 minutes.
    response = operation.result(timeout=3000)
    print("Created Connect cluster:", response)
except GoogleAPICallError as e:
    print(f"The operation failed with error: {e}")

Monitorar a operação de criação do cluster

Só é possível executar o comando a seguir se você tiver executado a CLI gcloud para criar o cluster do Connect.

  • A criação de um cluster do Connect geralmente leva de 20 a 30 minutos. Para acompanhar o progresso da criação do cluster, o comando gcloud managed-kafka connect-clusters create usa uma operação de longa duração (LRO), que pode ser monitorada com o seguinte comando:

    gcloud managed-kafka operations describe OPERATION_ID \
        --location=LOCATION
    

    Substitua:

    • OPERATION_ID com o valor do ID da operação da seção anterior.
    • LOCATION com o valor do local da seção anterior.

A seguir

Apache Kafka® é uma marca registrada da The Apache Software Foundation ou afiliadas nos Estados Unidos e/ou em outros países.