Créer un cluster Connect

Un cluster Connect fournit un environnement pour les connecteurs qui permet de transférer des données depuis des déploiements Kafka existants vers un cluster Google Cloud Managed Service pour Apache Kafka, ou de transférer des données depuis un cluster Managed Service pour Apache Kafka vers un autre service Google Cloud ou un autre cluster Kafka. Le cluster Kafka secondaire peut être un autre cluster Google Cloud Managed Service pour Apache Kafka, un cluster autogéré ou un cluster sur site.

Avant de commencer

Assurez-vous d'avoir déjà créé un cluster Managed Service pour Apache Kafka. Vous avez besoin du nom du cluster Managed Service pour Apache Kafka auquel le cluster Connect sera associé.

Chaque cluster Connect est associé à un cluster Managed Service pour Apache Kafka. Ce cluster stocke l'état des connecteurs exécutés sur le cluster Connect.

Rôles et autorisations requis pour créer un cluster Connect

Pour obtenir les autorisations nécessaires pour créer un cluster Connect, demandez à votre administrateur de vous accorder le rôle IAM Éditeur de cluster Kafka Connect géré (roles/managedkafka.connectClusterEditor) sur votre projet. Pour en savoir plus sur l'attribution de rôles, consultez Gérer l'accès aux projets, aux dossiers et aux organisations.

Ce rôle prédéfini contient les autorisations requises pour créer un cluster Connect. Pour connaître les autorisations exactes requises, développez la section Autorisations requises :

Autorisations requises

Les autorisations suivantes sont requises pour créer un cluster Connect :

  • Accordez l'autorisation de créer un cluster Connect à l'emplacement spécifié : managedkafka.connectClusters.create

Vous pouvez également obtenir ces autorisations avec des rôles personnalisés ou d'autres rôles prédéfinis.

Pour en savoir plus sur ce rôle, consultez Rôles prédéfinis de Managed Service pour Apache Kafka.

Principaux LCA requis

Par défaut, les clusters Managed Service pour Apache Kafka permettent au cluster Connect d'accéder aux ressources si aucune ACL n'est configurée. Pour ce faire, définissez allow.everyone.if.no.acl.found sur true, qui est le paramètre par défaut.

Toutefois, si le cluster Managed Service pour Apache Kafka dispose de LCA configurées, le cluster Connect n'a pas automatiquement les autorisations de lecture et d'écriture sur les ressources. Vous devez les accorder manuellement.

Le compte de service du cluster Connect utilisé comme compte principal dans les LCA suit le format suivant : User:service-{consumer project number}@gcp-sa-managedkafka.iam.gserviceaccount.com.

Si vous avez configuré des LCA sur votre cluster Kafka, accordez au cluster Connect les autorisations de lecture et d'écriture sur les sujets, ainsi que l'autorisation de lecture sur les groupes de consommateurs à l'aide des commandes suivantes :

/bin/kafka-acls.sh \
    --bootstrap-server BOOTSTRAP_ADDR \
    --command-config PATH_TO_CLIENT_PROPERTIES \
    --add \
    --allow-principal User:service-{consumer project number}@gcp-sa-managedkafka.iam.gserviceaccount.com \
    --operation READ --operation WRITE --topic *
/bin/kafka-acls.sh \
    --bootstrap-server BOOTSTRAP_ADDR \
    --command-config PATH_TO_CLIENT_PROPERTIES \
    --add \
    --allow-principal User:service-{consumer project number}@gcp-sa-managedkafka.iam.gserviceaccount.com \
    --operation READ --group *

Pour en savoir plus sur ces commandes, consultez Configurer les LCA Apache Kafka pour un contrôle des accès précis.

Créer un cluster Connect dans un autre projet

Lorsque vous créez un cluster Connect, il partage le même agent de service que le cluster Managed Service pour Apache Kafka qui se trouve dans le même projet. Si ce cluster Managed Service pour Apache Kafka est désigné comme cluster Kafka principal associé au cluster Connect, aucune autorisation supplémentaire n'est requise.

L'agent de service est au format service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com. Le numéro de projet est celui du projet contenant le cluster Connect et le cluster Managed Service pour Apache Kafka.

Si votre cluster Connect se trouve dans le projet A et que le cluster Managed Service pour Apache Kafka associé se trouve dans le projet B, procédez comme suit :

  1. Assurez-vous que l'API Managed Kafka est activée pour les projets A et B.

    Activer l'API

  2. Identifiez l'agent de service du cluster Connect dans le projet A.

    L'agent de service est au format service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com.

  3. Dans le projet B, attribuez le rôle Client Managed Kafka (roles/managedkafka.client) au compte de service du cluster Connect.

    Ce rôle accorde les autorisations nécessaires pour se connecter au cluster Managed Service pour Apache Kafka et effectuer des opérations telles que la lecture et l'écriture de données.

    Pour savoir comment attribuer ce rôle, consultez Créer et attribuer des rôles aux agents de service.

Respectez toujours le principe du moindre privilège lorsque vous accordez des autorisations. N'accordez que les autorisations nécessaires pour assurer la sécurité et empêcher tout accès non autorisé.

Propriétés d'un cluster Connect

Cette section décrit les propriétés d'un cluster Connect.

Nom du cluster Connect

Nom du cluster Connect que vous créez. Pour obtenir des instructions sur la façon de nommer un cluster Connect, consultez les consignes de dénomination des ressources Managed Service pour Apache Kafka. Le nom d'un cluster est immuable.

Cluster Kafka principal

Cluster Managed Service pour Apache Kafka associé à votre cluster Connect. Ce cluster associé (cluster principal) stocke l'état des connecteurs exécutés sur le cluster Connect. En règle générale, le cluster Managed Service pour Apache Kafka principal sert également de destination pour tous les connecteurs sources et d'entrée pour tous les connecteurs cibles exécutés sur le cluster Connect.

Un même cluster Managed Service pour Apache Kafka peut comporter plusieurs clusters Connect. Si vous choisissez un cluster Managed Service pour Apache Kafka dans un autre projet, assurez-vous que les autorisations appropriées sont configurées.

Vous ne pouvez pas passer à un autre cluster Kafka après avoir créé le cluster Connect.

Avantages de la colocation de région pour la latence et les coûts réseau

Le fait de colocaliser vos clusters Managed Service pour Apache Kafka et Connect dans la même région réduit la latence et les coûts réseau. Par exemple, supposons que votre cluster Managed Service pour Apache Kafka se trouve dans region-a et que vous utilisez un connecteur de récepteur pour écrire des données de ce cluster Managed Service pour Apache Kafka (source) dans une table BigQuery (récepteur) qui se trouve également dans region-a. Si vous déployez votre cluster Connect dans region-a, ce choix de déploiement minimise la latence de l'opération d'écriture BigQuery et élimine les coûts de transfert réseau entre régions entre le cluster Managed Service pour Apache Kafka et le cluster Connect.

Considérations sur la latence et les coûts multisystèmes

Kafka Connect utilise des connecteurs pour transférer des données entre les systèmes. L'un des côtés du connecteur interagit toujours avec un cluster Managed Service pour Apache Kafka. Un seul cluster Kafka Connect peut exécuter plusieurs connecteurs, chacun agissant comme une source (extraction de données d'un système) ou un récepteur (envoi de données à un système).

Bien qu'un cluster Connect situé dans la même région que le cluster Managed Service pour Apache Kafka bénéficie d'une latence de communication plus faible entre eux, chaque connecteur interagit également avec un autre système, tel qu'une table BigQuery ou un autre cluster Kafka. Même si le cluster Connect et le cluster Managed Service pour Apache Kafka sont colocalisés, l'autre système peut se trouver dans une autre région. Cela entraîne une latence et des coûts plus élevés. La latence globale du pipeline dépend des emplacements des trois systèmes : le cluster Managed Service pour Apache Kafka, le cluster Connect et le système source ou récepteur.

Par exemple, si votre cluster Managed Service pour Apache Kafka se trouve dans region-a, votre cluster Connect dans region-b et que vous utilisez un connecteur Cloud Storage pour un bucket dans region-c, vous serez facturé pour deux sauts de réseau (de region-a à region-b, puis de region-b à region-c, ou l'inverse selon la direction du connecteur).

Lorsque vous planifiez l'emplacement de votre cluster Connect, tenez compte de toutes les régions concernées afin d'optimiser la latence et les coûts.

Configuration de la capacité

La configuration de la capacité nécessite que vous configuriez le nombre de vCPU et la quantité de mémoire pour chaque vCPU de votre cluster Connect. Vous pouvez mettre à jour la capacité d'un cluster Connect après l'avoir créé. Voici les propriétés de la configuration de la capacité :

  • vCPU : nombre de vCPU attribués à un cluster Connect. La valeur minimale est de trois processeurs virtuels.

  • Mémoire : quantité de mémoire attribuée à chaque processeur virtuel. Vous devez provisionner entre 1 Gio et 8 Gio par processeur virtuel. Vous pouvez augmenter ou diminuer la quantité de mémoire dans ces limites après la création du cluster.

    Par exemple, si vous créez un cluster avec 6 vCPU, la mémoire minimale que vous pouvez allouer au cluster est de 6 Gio (1 Gio par vCPU) et la mémoire maximale est de 48 Gio (8 Gio par vCPU).

Les processeurs virtuels et la mémoire alloués à chaque nœud de calcul d'un cluster Connect ont un impact significatif sur les performances, la capacité et le coût du cluster. Voici une explication de l'impact des vCPU et de la mémoire sur un cluster Connect.

Nombre de vCPU

  • Kafka Connect divise le travail d'un connecteur en tâches. Chaque tâche peut traiter les données en parallèle. Plus le nombre de processeurs virtuels est élevé, plus le nombre de tâches pouvant être exécutées simultanément est important, ce qui permet d'obtenir un débit plus élevé.

  • Plus vous avez de vCPU, plus les coûts de votre cluster Connect sont élevés.

Mémoire

  • Kafka Connect utilise de la mémoire pour mettre en mémoire tampon les données qui transitent entre les connecteurs et Managed Service pour Apache Kafka. Une mémoire plus importante permet d'avoir des tampons plus grands. Une grande quantité de mémoire peut améliorer le débit, en particulier pour les flux de données à volume élevé. Les connecteurs qui traitent des messages ou des enregistrements très volumineux nécessitent suffisamment de mémoire pour les traiter sans rencontrer d'exceptions OutOfMemoryError.

  • Plus de mémoire augmente le coût de votre cluster Connect.

  • Si vous utilisez une logique de transformation lourde, vous avez besoin d'allouer plus de mémoire.

Votre objectif est de choisir la configuration de capacité adaptée à votre cluster Connect. Pour ce faire, vous devez comprendre le débit que votre cluster Connect peut gérer.

Sous-réseau de nœuds de calcul (principal)

Le sous-réseau de nœuds de calcul, également appelé sous-réseau principal, connecte votre réseau VPC au cluster Connect. Ce sous-réseau permet aux nœuds de calcul du cluster d'accéder aux points de terminaison des sources et des récepteurs dans le réseau de consommateurs, tels que les clusters Managed Service pour Apache Kafka ou les clusters Kafka auto-hébergés.

Voici quelques exigences à respecter pour configurer le sous-réseau des nœuds de calcul :

  • Le sous-réseau du nœud de calcul est obligatoire.

  • Le sous-réseau doit se trouver dans la même région que le cluster Connect.

  • Le sous-réseau doit se trouver dans le même VPC parent que l'un des sous-réseaux connectés du cluster Kafka principal.

  • La plage CIDR du sous-réseau doit avoir une taille minimale de /22 (1 024 adresses).

Les nœuds de calcul du cluster se voient attribuer des adresses IP dans le sous-réseau de nœuds de calcul à l'aide d'une interface Private Service Connect. Les nœuds de calcul peuvent atteindre n'importe quelle destination réseau accessible depuis le réseau VPC du sous-réseau, à condition de respecter les exigences suivantes :

  • Le point de terminaison ne doit pas se trouver dans la plage CIDR 172.16.0.0/14. Cette plage est réservée à l'usage interne de Managed Service pour Apache Kafka Connect.
  • Les règles de pare-feu doivent autoriser le trafic. Consultez Configurer la sécurité des rattachements de réseau.
  • Pour le trafic Internet, vous devez configurer Cloud NAT. Par exemple, Cloud NAT est requis pour qu'un connecteur MirrorMaker réplique les données d'un cluster Kafka accessible sur Internet.
  • Pour accéder aux points de terminaison Private Service Connect qui se trouvent dans un autre VPC que le VPC du sous-réseau de nœuds de calcul, vous devez vous assurer d'utiliser une configuration de consommateur compatible (par exemple, NCC). Pour en savoir plus, consultez À propos de l'accès aux services publiés via des points de terminaison.

Domaines DNS pouvant être résolus

Les domaines DNS résolvables, également appelés noms de domaine DNS, permettent de rendre les adresses DNS du réseau VPC consommateur disponibles pour le VPC locataire. Cela permet au cluster Connect de résoudre les noms DNS en adresses IP, ce qui facilite la communication avec d'autres services, y compris d'autres clusters Kafka pour les connecteurs MirrorMaker.

Pour les domaines DNS résolvables, vous pouvez sélectionner un cluster Managed Service pour Apache Kafka. Vous n'avez pas besoin de configurer le nom de domaine DNS pour le cluster Managed Service pour Apache Kafka principal, car son adresse d'amorçage est automatiquement incluse dans la liste des domaines DNS pouvant être résolus.

Toutefois, vous pouvez également spécifier manuellement un domaine DNS, ce qui est nécessaire si vous sélectionnez un cluster Kafka externe. Le domaine DNS du cluster Managed Service pour Apache Kafka principal est automatiquement inclus. La configuration des domaines DNS reste nécessaire pour les autres clusters Kafka.

Ressources Secret Manager

Spécifiez Secret Manager à charger dans les nœuds de calcul. Ces secrets sont stockés de manière sécurisée dans Secret Manager et mis à la disposition de votre cluster Connect.

Vous pouvez éventuellement utiliser Secret Manager dans les configurations de connecteurs. Par exemple, vous pouvez charger un fichier de clé dans votre cluster Connect et demander à votre connecteur de lire le fichier. Les secrets Secret Manager sont installés en tant que fichiers dans les nœuds de calcul.

Les clusters connectés s'intègrent directement à Secret Manager. Vous devez utiliser Secret Manager pour stocker et gérer vos secrets.

Le format pour spécifier un secret est le suivant : projects/{PROJECT_ID}/secrets/{SECRET_NAME}/versions/{VERSION_ID}

  • PROJECT_ID : ID du projet dans lequel réside votre secret Secret Manager.

  • SECRET_NAME : nom du secret dans Secret Manager.

  • VERSION_ID : numéro de version spécifique du secret. Il s'agit d'un nombre tel que "1", "2" ou "3".

Vous pouvez charger jusqu'à 32 secrets dans un même cluster Connect.

Assurez-vous que l'agent de service qui exécute vos workers Connect dispose du rôle secretmanager.secretAccessor (Accesseur de secrets Secret Manager) sur les secrets que vous souhaitez utiliser. Ce rôle permet au cluster Connect de récupérer les valeurs secrètes de Secret Manager.

Étiquettes

Les libellés sont des paires clé/valeur qui vous aident à organiser et à identifier vos ressources. Ils vous aident à organiser les clusters Connect. Vous pouvez associer un libellé à chaque cluster Connect, puis filtrer les ressources par libellé. Exemples de libellés : environment:prod, application:web-app.

Créer un cluster Connect

Avant de créer un cluster, consultez la documentation sur les propriétés des clusters Connect.

La création d'un cluster Connect prend entre 20 et 30 minutes.

Console

  1. Dans la console Google Cloud , accédez à la page Connecter des clusters.

    Accéder à Connect Clusters

  2. Cliquez sur Créer.

    La page Créer un cluster Connect s'ouvre.

  3. Dans le champ Nom du cluster Connect, saisissez une chaîne.

    Pour en savoir plus sur la façon de nommer un cluster Connect, consultez les consignes de dénomination des ressources Managed Service pour Apache Kafka.

  4. Pour Cluster Kafka principal, sélectionnez un cluster Managed Service pour Apache Kafka dans le menu.

    Pour en savoir plus sur les fonctions de ce cluster Managed Service pour Apache Kafka, consultez Cluster Kafka principal.

  5. Pour Emplacement, sélectionnez un emplacement compatible dans le menu Région ou conservez la valeur par défaut.

    Pour savoir comment sélectionner le bon emplacement, consultez Cluster Kafka principal.

  6. Pour la configuration de la capacité, saisissez des valeurs pour vCPU et Mémoire, ou conservez les valeurs par défaut.

    Pour les vCPUs, saisissez le nombre de processeurs virtuels pour le cluster.

    Pour Mémoire, saisissez la quantité de mémoire par processeur en Gio. Un message d'erreur s'affiche si la mémoire par processeur est supérieure à 8 Gio.

    Pour en savoir plus sur le dimensionnement d'un cluster Managed Service pour Apache Kafka, consultez Configuration de la capacité.

  7. Pour Configuration réseau, dans le menu Réseau, sélectionnez ou conservez le réseau du cluster Managed Service pour Apache Kafka principal.

  8. Pour Sous-réseau de nœuds de calcul, sélectionnez ou conservez le sous-réseau dans le menu.

    Le champ Chemin d'URI du sous-réseau est automatiquement renseigné. Pour en savoir plus, consultez Sous-réseau des nœuds de calcul.

  9. Pour les domaines DNS pouvant être résolus, le domaine DNS du cluster Kafka principal est automatiquement ajouté en tant que domaine DNS pouvant être résolu.

    Pour ajouter d'autres domaines DNS, développez la section si nécessaire.

  10. Cliquez sur Ajouter un domaine DNS.

    Sélectionnez un cluster Kafka dans le menu.

    Le domaine DNS est renseigné automatiquement. Vous pouvez également saisir le nom de domaine DNS d'un cluster Kafka externe.

    Cliquez sur OK.

  11. Pour les ressources Secret Manager, développez la section si nécessaire.

  12. Cliquez sur Ajouter une ressource secrète.

  13. Sélectionnez un secret dans le menu Secret et une version dans le menu Version du secret. Vous pouvez également créer un secret.

    Assurez-vous que l'agent de service qui exécute vos workers Connect dispose du rôle "Accesseur de secrets Secret Manager" pour les secrets que vous souhaitez utiliser. Pour en savoir plus sur Secret Manager, consultez Ressources Secret Manager.

  14. Cliquez sur OK.

  15. Cliquez sur Ajouter une ressource secrète si vous avez besoin d'ajouter d'autres secrets.

  16. Pour Libellés, développez la section si nécessaire.

    Pour organiser votre projet, ajoutez des étiquettes arbitraires sous forme de paires clé/valeur à vos ressources.

    Cliquez sur Ajouter un libellé pour inclure différents environnements, services, propriétaires, équipes, etc.

  17. Cliquez sur Créer.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Exécutez la commande gcloud managed-kafka connect-clusters create :

    gcloud managed-kafka connect-clusters create CONNECT_CLUSTER_ID \
        --location=LOCATION \
        --cpu=CPU \
        --memory=MEMORY \
        --primary-subnet=WORKER_SUBNET \
        --kafka-cluster=KAFKA_CLUSTER \
        [--project=PROJECT_ID] \
        [--secret=SECRET] \
        [--dns-name=DNS_DOMAIN_NAME] \
        [--config-file=CONFIG_FILE] \
        [--labels=LABELS]
        [--async]
    

    Remplacez les éléments suivants :

    • CONNECT_CLUSTER_ID : ID ou nom du cluster Connect. Pour obtenir des instructions sur la façon de nommer un cluster Connect, consultez les consignes de dénomination des ressources Managed Service pour Apache Kafka. Le nom d'un cluster Connect est immuable.

    • LOCATION : emplacement où vous créez le cluster Connect. Il doit s'agir d'une région Google Cloudcompatible. Vous ne pouvez pas modifier l'emplacement d'un cluster Connect après sa création. Pour obtenir la liste des emplacements disponibles, consultez Emplacements Managed Service pour Apache Kafka. Pour en savoir plus sur les recommandations d'emplacement, consultez Cluster Kafka principal.

    • CPU : nombre de processeurs virtuels pour le cluster Connect. La valeur minimale est de trois processeurs virtuels. Consultez Nombre de vCPU.

    • MEMORY : quantité de mémoire pour le cluster Connect. Utilisez les unités "MB", "MiB", "GB", "GiB", "TB" ou "TiB". Par exemple, "3GiB". Vous devez provisionner entre 1 Gio et 8 Gio par processeur virtuel. Consultez Mémoire.

    • WORKER_SUBNET : sous-réseau de nœuds de calcul pour le cluster Connect.

      Le format du sous-réseau est projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_ID.

      Le sous-réseau de nœuds de calcul doit se trouver dans la même région que le cluster Connect.

    • PROJECT_ID : (facultatif) ID du projetGoogle Cloud . Si aucun n'est fourni, le projet actuel est utilisé.

    • KAFKA_CLUSTER : ID ou nom complet du cluster Managed Service pour Apache Kafka principal associé au cluster Connect. Consultez Cluster Kafka. Le format du cluster Kafka est projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID.

      Vous ne pouvez pas passer à un autre cluster Kafka après avoir créé le cluster Connect.

    • SECRET : (facultatif) secrets à charger dans les workers. Les versions exactes des secrets de Secret Manager doivent être fournies. Les alias ne sont pas acceptés. Jusqu'à 32 secrets peuvent être chargés dans un même cluster. Format : projects/PROJECT_ID/secrets/SECRET_NAME/versions/VERSION_ID

    • DNS_DOMAIN_NAME : (Facultatif) Noms de domaine DNS du sous-réseau à rendre visibles pour Connect Cluster. Le cluster Connect peut accéder aux ressources à l'aide de noms de domaine au lieu d'adresses IP. Consultez Appairage DNS.

    • LABELS : (facultatif) libellés à associer au cluster. Pour en savoir plus sur le format des libellés, consultez Libellés. Liste de paires clé/valeur de libellés à ajouter. Les clés doivent commencer par une lettre minuscule et ne contenir que des tirets (-), des traits de soulignement (_), des minuscules et des chiffres. Les valeurs doivent contenir uniquement des tirets (-), des traits de soulignement (_), des minuscules et des chiffres.

    • CONFIG_FILE : (facultatif) chemin d'accès au fichier JSON ou YAML contenant la configuration qui remplace les valeurs par défaut du cluster ou du connecteur. Ce fichier est également compatible avec les formats JSON ou YAML intégrés.

    • --async : (facultatif) renvoie immédiatement une réponse, sans attendre la fin de l'opération en cours. L'option --async vous permet de continuer à effectuer d'autres tâches pendant que le cluster est créé en arrière-plan. Si vous n'utilisez pas l'indicateur, le système attend la fin de l'opération avant de renvoyer une réponse. Vous devez attendre que le cluster soit entièrement mis à jour avant de pouvoir effectuer d'autres tâches.

    Vous obtenez une réponse semblable à la suivante :

    Create request issued for: [sample-connectcluster]
    Check operation [projects/test-project/locations/us-east1/operations/operation-1753590328249-63ae19098cc06-64300a0a-06512d02] for status.
    

    Enregistrez le OPERATION_ID pour suivre votre progression. Par exemple, la valeur ici est operation-1753590328249-63ae19098cc06-64300a0a-06512d02.

  3. Terraform

    Vous pouvez utiliser une ressource Terraform pour créer un cluster Connect.

    resource "google_managed_kafka_connect_cluster" "default" {
      provider           = google-beta
      project            = data.google_project.default.project_id
      connect_cluster_id = "my-connect-cluster-id"
      location           = "us-central1"
      kafka_cluster      = google_managed_kafka_cluster.default.id
      capacity_config {
        vcpu_count   = 12
        memory_bytes = 12884901888 # 12 GiB
      }
      gcp_config {
        access_config {
          network_configs {
            primary_subnet = google_compute_subnetwork.default.id
          }
        }
      }
    }

    Pour savoir comment appliquer ou supprimer une configuration Terraform, consultez Commandes Terraform de base.

    Go

    Avant d'essayer cet exemple, suivez les instructions de configuration pour Go dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Managed Service pour Apache Kafka en langage Go.

    Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func createConnectCluster(w io.Writer, projectID, region, clusterID, kafkaCluster string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-connect-cluster"
    	// kafkaCluster := "projects/my-project-id/locations/us-central1/clusters/my-kafka-cluster"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	locationPath := fmt.Sprintf("projects/%s/locations/%s", projectID, region)
    	clusterPath := fmt.Sprintf("%s/connectClusters/%s", locationPath, clusterID)
    
    	// Capacity configuration with 12 vCPU and 12 GiB RAM
    	capacityConfig := &managedkafkapb.CapacityConfig{
    		VcpuCount:   12,
    		MemoryBytes: 12884901888, // 12 GiB in bytes
    	}
    
    	// Optionally, you can also specify accessible subnets and resolvable DNS
    	// domains as part of your network configuration. For example:
    	// networkConfigs := []*managedkafkapb.ConnectNetworkConfig{
    	// 	{
    	// 		PrimarySubnet:      primarySubnet,
    	// 		AdditionalSubnets:  []string{"subnet-1", "subnet-2"},
    	// 		DnsDomainNames:     []string{"domain-1", "domain-2"},
    	// 	},
    	// }
    
    	connectCluster := &managedkafkapb.ConnectCluster{
    		Name:           clusterPath,
    		KafkaCluster:   kafkaCluster,
    		CapacityConfig: capacityConfig,
    	}
    
    	req := &managedkafkapb.CreateConnectClusterRequest{
    		Parent:           locationPath,
    		ConnectClusterId: clusterID,
    		ConnectCluster:   connectCluster,
    	}
    	op, err := client.CreateConnectCluster(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateConnectCluster got err: %w", err)
    	}
    	// The duration of this operation can vary considerably, typically taking 5-15 minutes.
    	resp, err := op.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("op.Wait got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created connect cluster: %s\n", resp.Name)
    	return nil
    }
    

    Java

    Avant d'essayer cet exemple, suivez les instructions de configuration pour Java dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Java pour Managed Service pour Apache Kafka.

    Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.api.gax.longrunning.OperationSnapshot;
    import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
    import com.google.api.gax.retrying.RetrySettings;
    import com.google.api.gax.retrying.RetryingFuture;
    import com.google.api.gax.retrying.TimedRetryAlgorithm;
    import com.google.cloud.managedkafka.v1.CapacityConfig;
    import com.google.cloud.managedkafka.v1.ConnectAccessConfig;
    import com.google.cloud.managedkafka.v1.ConnectCluster;
    import com.google.cloud.managedkafka.v1.ConnectGcpConfig;
    import com.google.cloud.managedkafka.v1.ConnectNetworkConfig;
    import com.google.cloud.managedkafka.v1.CreateConnectClusterRequest;
    import com.google.cloud.managedkafka.v1.LocationName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectSettings;
    import com.google.cloud.managedkafka.v1.OperationMetadata;
    import java.time.Duration;
    import java.util.concurrent.ExecutionException;
    
    public class CreateConnectCluster {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-connect-cluster";
        String subnet = "my-subnet"; // e.g. projects/my-project/regions/my-region/subnetworks/my-subnet
        String kafkaCluster = "my-kafka-cluster"; // The Kafka cluster to connect to
        int cpu = 12;
        long memoryBytes = 12884901888L; // 12 GiB
        createConnectCluster(projectId, region, clusterId, subnet, kafkaCluster, cpu, memoryBytes);
      }
    
      public static void createConnectCluster(
          String projectId,
          String region,
          String clusterId,
          String subnet,
          String kafkaCluster,
          int cpu,
          long memoryBytes)
          throws Exception {
        CapacityConfig capacityConfig = CapacityConfig.newBuilder().setVcpuCount(cpu)
            .setMemoryBytes(memoryBytes).build();
        ConnectNetworkConfig networkConfig = ConnectNetworkConfig.newBuilder()
            .setPrimarySubnet(subnet)
            .build();
        // Optionally, you can also specify additional accessible subnets and resolvable
        // DNS domains as part of your network configuration. For example:
        // .addAllAdditionalSubnets(List.of("subnet-1", "subnet-2"))
        // .addAllDnsDomainNames(List.of("dns-1", "dns-2"))
        ConnectGcpConfig gcpConfig = ConnectGcpConfig.newBuilder()
            .setAccessConfig(ConnectAccessConfig.newBuilder().addNetworkConfigs(networkConfig).build())
            .build();
        ConnectCluster connectCluster = ConnectCluster.newBuilder()
            .setCapacityConfig(capacityConfig)
            .setGcpConfig(gcpConfig)
            .setKafkaCluster(kafkaCluster)
            .build();
    
        // Create the settings to configure the timeout for polling operations
        ManagedKafkaConnectSettings.Builder settingsBuilder = ManagedKafkaConnectSettings.newBuilder();
        TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
            RetrySettings.newBuilder()
                .setTotalTimeoutDuration(Duration.ofHours(1L))
                .build());
        settingsBuilder.createConnectClusterOperationSettings()
            .setPollingAlgorithm(timedRetryAlgorithm);
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient
            .create(settingsBuilder.build())) {
          CreateConnectClusterRequest request = CreateConnectClusterRequest.newBuilder()
              .setParent(LocationName.of(projectId, region).toString())
              .setConnectClusterId(clusterId)
              .setConnectCluster(connectCluster)
              .build();
    
          // The duration of this operation can vary considerably, typically taking
          // between 10-30 minutes.
          OperationFuture<ConnectCluster, OperationMetadata> future = managedKafkaConnectClient
              .createConnectClusterOperationCallable().futureCall(request);
    
          // Get the initial LRO and print details.
          OperationSnapshot operation = future.getInitialFuture().get();
          System.out.printf(
              "Connect cluster creation started. Operation name: %s\nDone: %s\nMetadata: %s\n",
              operation.getName(), operation.isDone(), future.getMetadata().get().toString());
    
          while (!future.isDone()) {
            // The pollingFuture gives us the most recent status of the operation
            RetryingFuture<OperationSnapshot> pollingFuture = future.getPollingFuture();
            OperationSnapshot currentOp = pollingFuture.getAttemptResult().get();
            System.out.printf("Polling Operation:\nName: %s\n Done: %s\n",
                currentOp.getName(),
                currentOp.isDone());
          }
    
          // NOTE: future.get() blocks completion until the operation is complete (isDone
          // = True)
          ConnectCluster response = future.get();
          System.out.printf("Created connect cluster: %s\n", response.getName());
        } catch (ExecutionException e) {
          System.err.printf("managedKafkaConnectClient.createConnectCluster got err: %s\n", 
              e.getMessage());
          throw e;
        }
      }
    }

    Python

    Avant d'essayer cet exemple, suivez les instructions de configuration pour Python dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Python Managed Service pour Apache Kafka.

    Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud import managedkafka_v1
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import ManagedKafkaConnectClient
    from google.cloud.managedkafka_v1.types import ConnectCluster, CreateConnectClusterRequest, ConnectNetworkConfig
    
    # TODO(developer): Update with your values.
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # kafka_cluster_id = "my-kafka-cluster"
    # primary_subnet = "projects/my-project-id/regions/us-central1/subnetworks/default"
    # cpu = 12
    # memory_bytes = 12884901888  # 12 GiB
    
    connect_client = ManagedKafkaConnectClient()
    kafka_client = managedkafka_v1.ManagedKafkaClient()
    
    parent = connect_client.common_location_path(project_id, region)
    kafka_cluster_path = kafka_client.cluster_path(project_id, region, kafka_cluster_id)
    
    connect_cluster = ConnectCluster()
    connect_cluster.name = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
    connect_cluster.kafka_cluster = kafka_cluster_path
    connect_cluster.capacity_config.vcpu_count = cpu
    connect_cluster.capacity_config.memory_bytes = memory_bytes
    connect_cluster.gcp_config.access_config.network_configs = [ConnectNetworkConfig(primary_subnet=primary_subnet)]
    # Optionally, you can also specify accessible subnets and resolvable DNS domains as part of your network configuration.
    # For example:
    # connect_cluster.gcp_config.access_config.network_configs = [
    #     ConnectNetworkConfig(
    #         primary_subnet=primary_subnet,
    #         additional_subnets=additional_subnets,
    #         dns_domain_names=dns_domain_names,
    #     )
    # ]
    
    request = CreateConnectClusterRequest(
        parent=parent,
        connect_cluster_id=connect_cluster_id,
        connect_cluster=connect_cluster,
    )
    
    try:
        operation = connect_client.create_connect_cluster(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        # Creating a Connect cluster can take 10-40 minutes.
        response = operation.result(timeout=3000)
        print("Created Connect cluster:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")
    

Surveiller l'opération de création du cluster

Vous ne pouvez exécuter la commande suivante que si vous avez exécuté la gcloud CLI pour créer le cluster Connect.

  • La création d'un cluster Connect prend généralement entre 20 et 30 minutes. Pour suivre la progression de la création du cluster, la commande gcloud managed-kafka connect-clusters create utilise une opération de longue durée (LRO), que vous pouvez surveiller à l'aide de la commande suivante :

    gcloud managed-kafka operations describe OPERATION_ID \
        --location=LOCATION
    

    Remplacez les éléments suivants :

    • OPERATION_ID par la valeur de l'ID de l'opération de la section précédente.
    • LOCATION par la valeur de l'emplacement de la section précédente.

Étape suivante

Apache Kafka® est une marque déposée d'Apache Software Foundation ou de ses filiales aux États-Unis et/ou dans d'autres pays.