Google Cloud Managed Service for Apache Kafka-Cluster erstellen

Ein Managed Service for Apache Kafka-Cluster bietet eine Umgebung zum Speichern und Verarbeiten von Nachrichtenstreams, die in Themen organisiert sind.

Sie können einen Cluster über die Google Cloud -Konsole, die Google Cloud CLI, die Clientbibliothek oder die Managed Kafka API erstellen. Sie können keinen Cluster mit der Open-Source-Apache Kafka API erstellen.

Hinweise

Prüfen Sie, ob Sie mit Folgendem vertraut sind:

Erforderliche Rollen und Berechtigungen zum Erstellen eines Clusters

Bitten Sie Ihren Administrator, Ihnen die IAM-Rolle Managed Kafka Cluster Editor (roles/managedkafka.clusterEditor) für Ihr Projekt zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Erstellen eines Clusters benötigen. Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Diese vordefinierte Rolle enthält die Berechtigungen, die zum Erstellen eines Clusters erforderlich sind. Maximieren Sie den Abschnitt Erforderliche Berechtigungen, um die notwendigen Berechtigungen anzuzeigen:

Erforderliche Berechtigungen

Die folgenden Berechtigungen sind zum Erstellen eines Clusters erforderlich:

  • Cluster erstellen: managedkafka.clusters.create

Sie können diese Berechtigungen auch mit benutzerdefinierten Rollen oder anderen vordefinierten Rollen erhalten.

Mit der Rolle „Managed Kafka Cluster Editor“ können Sie keine Themen und Nutzergruppen in Managed Service for Apache Kafka-Clustern erstellen, löschen oder ändern. Außerdem ist damit kein Zugriff auf die Datenebene möglich, um Nachrichten in Clustern zu veröffentlichen oder zu nutzen. Weitere Informationen zu dieser Rolle finden Sie unter Vordefinierte Rollen für Managed Service for Apache Kafka.

Eigenschaften eines Managed Service for Apache Kafka-Clusters

Wenn Sie einen Managed Service for Apache Kafka-Cluster erstellen oder aktualisieren, müssen Sie die folgenden Eigenschaften angeben.

Clustername

Der Name oder die ID des Managed Service for Apache Kafka-Clusters, den Sie erstellen. Tipps zum Benennen von Clustern finden Sie in den Richtlinien zum Benennen einer Ressource von Managed Service for Apache Kafka. Der Name eines Clusters kann nicht geändert werden.

Standort

Der Standort, an dem Sie den Cluster erstellen. Der Standort muss eine der unterstützten Google Cloud Regionen sein. Der Clusterstandort kann im Nachhinein nicht mehr geändert werden. Eine Liste der verfügbaren Standorte finden Sie unter Managed Service for Apache Kafka-Standorte.

Kapazitätskonfiguration

Im Rahmen der Kapazitätskonfiguration müssen Sie die Anzahl der vCPUs und die Größe des Arbeitsspeichers für die Kafka-Einrichtung festlegen. Weitere Informationen zum Konfigurieren der Kapazität eines Clusters finden Sie unter Größe des Kafka-Clusters planen.

Die Eigenschaften für die Kapazitätskonfiguration sind:

  • vCPUs: Die Anzahl der vCPUs im Cluster. Mindestens 3 vCPUs pro Cluster sind erforderlich.

  • Arbeitsspeicher: Die Menge an Arbeitsspeicher, die dem Cluster zugewiesen ist. Sie müssen zwischen 1 GiB und 8 GiB pro vCPU bereitstellen.

    Wenn Sie beispielsweise einen Cluster mit 6 vCPUs erstellen, beträgt der minimale Arbeitsspeicher, den Sie dem Cluster zuweisen können, 6 GiB (1 GiB pro vCPU) und der maximale Arbeitsspeicher 48 GiB (8 GiB pro vCPU).

Weitere Informationen zum Ändern des Arbeitsspeichers und der Anzahl der vCPUs nach dem Erstellen eines Clusters finden Sie unter Clustergröße aktualisieren.

Netzwerkkonfiguration

Die Netzwerkkonfiguration ist eine Liste der VPC-Subnetze, in denen der Cluster zugänglich ist. Damit Clients Nachrichten erstellen oder empfangen können, müssen sie eines dieser Subnetze erreichen können.

Hier sind einige Richtlinien für Ihre Netzwerkkonfiguration:

  • Für einen Cluster ist mindestens ein Subnetz erforderlich. Die maximale Anzahl ist zehn.

  • Pro Netzwerk ist für einen bestimmten Cluster genau ein Subnetz zulässig.

  • Jedes Subnetz muss sich in derselben Region wie der Cluster befinden. Das Projekt und das Netzwerk können unterschiedlich sein.

  • IP-Adressen für die Broker und den Bootstrap-Server werden in jedem Subnetz automatisch zugewiesen. Außerdem werden DNS-Einträge für diese IP-Adressen in den entsprechenden VPC-Netzwerken erstellt.

  • Wenn Sie ein Subnetz aus einem anderen Projekt hinzufügen, müssen Sie dem von Google verwalteten Dienstkonto, das dem Cluster zugeordnet ist, Berechtigungen erteilen. Weitere Informationen finden Sie unter Cluster projektübergreifend verbinden.

Nachdem Sie den Cluster erstellt haben, können Sie die Liste der Subnetze aktualisieren. Weitere Informationen zum Netzwerk finden Sie unter Netzwerk für Managed Service for Apache Kafka konfigurieren.

Labels

Labels sind Schlüssel/Wert-Paare, die Ihnen bei der Organisation und Identifizierung helfen. Mit Labels können Ressourcen nach Umgebung kategorisiert werden. Beispiele sind "env:production" und "owner:data-engineering".

Sie können Ressourcen anhand ihrer Labels filtern und nach ihnen suchen. Angenommen, Sie haben mehrere Managed Service for Apache Kafka-Cluster für verschiedene Abteilungen. Sie können Cluster mit dem Label "department:marketing" konfigurieren und danach suchen, um den relevanten Cluster schnell zu finden.

Konfiguration für Neuausgleich

Mit dieser Einstellung wird festgelegt, ob der Dienst Partitionen automatisch auf Broker verteilt.

Folgende Modi sind verfügbar:

  • Automatische Lastverteilung bei Hochskalierung: Wenn diese Option aktiviert ist, wird automatisch eine Lastverteilung von Replikaten ausgelöst, wenn Sie den Cluster hochskalieren. Dieser Modus trägt zu einer gleichmäßigen Lastverteilung bei, kann aber während des Rebalancing-Vorgangs vorübergehend die Leistung beeinträchtigen.

  • Kein Neuausgleich: Wenn diese Option aktiviert ist, werden Replikate nicht automatisch neu ausgeglichen.

Verschlüsselung

Managed Service for Apache Kafka kann Nachrichten mitGoogle-owned and Google-managed encryption keys (Standard) oder mit vom Kunden verwalteten Verschlüsselungsschlüsseln (Customer-Managed Encryption Keys, CMEK) verschlüsseln. Jede Nachricht wird im Ruhezustand und während der Übertragung verschlüsselt. Der Verschlüsselungstyp für einen Cluster ist unveränderlich.

Standardmäßig wird Google-owned and Google-managed encryption keys verwendet. Diese Schlüssel werden vollständig von Google Cloud in seiner Infrastruktur erstellt, verwaltet und gespeichert.

CMEKs sind Verschlüsselungsschlüssel, die Sie mit Cloud Key Management Service verwalten. Mit dieser Funktion haben Sie mehr Kontrolle über die Schlüssel, die zum Verschlüsseln inaktiver Daten in unterstützten Google Cloud Diensten verwendet werden. Die Verwendung von CMEK verursacht zusätzliche Kosten im Zusammenhang mit Cloud Key Management Service. Wenn Sie CMEK verwenden, muss sich Ihr Schlüsselbund am selben Speicherort wie die Ressourcen befinden, für die Sie ihn verwenden. Weitere Informationen finden Sie unter Nachrichtenverschlüsselung konfigurieren.

mTLS-Konfiguration

Sie können mTLS optional als alternative Authentifizierungsmethode konfigurieren, bei der Clientzertifikate verwendet werden. Die Konfiguration umfasst Folgendes:

  • CA-Pools: Eine Liste mit ein bis zehn CAS-Pools (Certificate Authority Service), denen der Cluster für die Clientauthentifizierung vertraut.

  • SSL-Principal-Zuordnungsregeln: Eine optionale, aber empfohlene ssl.principal.mapping.rules-Broker-Eigenschaft, um lange Zertifikat-Principal-Namen für die Verwendung in Kafka-ACLs zu vereinfachen.

Weitere Informationen zu mTLS finden Sie unter mTLS-Authentifizierung konfigurieren.

Cluster erstellen

Bevor Sie einen Cluster erstellen, sollten Sie die Dokumentation zu Cluster-Properties lesen.

Das Erstellen eines Clusters dauert in der Regel 20 bis 30 Minuten.

So erstellen Sie einen Cluster:

Console

  1. Rufen Sie in der Google Cloud Console die Seite Cluster auf.

    Zu den Clustern

  2. Wählen Sie Erstellen aus.

    Die Seite Kafka-Cluster erstellen wird geöffnet.

  3. Geben Sie für Clustername einen String ein.

    Weitere Informationen zum Benennen eines Clusters finden Sie unter Richtlinien zum Benennen einer Managed Service for Apache Kafka-Ressource.

  4. Geben Sie unter Standort einen unterstützten Standort ein.

    Weitere Informationen zu unterstützten Standorten finden Sie unter Unterstützte Standorte für Managed Service for Apache Kafka.

  5. Geben Sie für die Kapazitätskonfiguration Werte für Arbeitsspeicher und vCPUs ein.

    Weitere Informationen zum Festlegen der Größe eines Managed Service for Apache Kafka-Clusters finden Sie unter Größe des Kafka-Clusters planen.

  6. Geben Sie unter Netzwerkkonfiguration die folgenden Details ein:

    1. Projekt: Das Projekt, in dem sich das Subnetzwerk befindet. Das Subnetz muss sich in derselben Region wie der Cluster befinden, das Projekt kann jedoch ein anderes sein.
    2. Netzwerk: Das Netzwerk, mit dem das Subnetz verbunden ist.
    3. Subnetzwerk: Der Name des Subnetzes.
    4. Subnetz-URI-Pfad: Dieses Feld wird automatisch ausgefüllt. Alternativ können Sie hier den Subnetzpfad eingeben. Der Name des Subnetzes muss das Format projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_ID haben.
    5. Klicken Sie auf Fertig.
  7. Optional: Sie können weitere Subnetze hinzufügen. Klicken Sie dazu auf Verbundenes Subnetz hinzufügen.

    Sie können bis zu zehn weitere Subnetze hinzufügen.

  8. Behalten Sie die anderen Standardwerte bei.

  9. Klicken Sie auf Erstellen.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Zum Abrufen der aktuellen Richtlinie führen Sie den Befehl gcloud managed-kafka clusters create aus:

    gcloud managed-kafka clusters create CLUSTER_ID \
        --location=LOCATION \
        --cpu=CPU \
        --memory=MEMORY \
        --subnets=SUBNETS \
        --auto-rebalance \
        --encryption-key=ENCRYPTION_KEY \
        --async \
        --labels=LABELS
    

    Ersetzen Sie Folgendes:

    • CLUSTER_ID: Die ID oder der Name des Clusters.

      Weitere Informationen zum Benennen eines Clusters finden Sie unter Richtlinien zum Benennen einer Managed Service for Apache Kafka-Ressource.

    • LOCATION: Der Standort des Clusters.

      Weitere Informationen zu unterstützten Standorten finden Sie unter Managed Service for Apache Kafka-Standorte.

    • CPU: Die Anzahl der vCPUs für den Cluster.

      Weitere Informationen zum Festlegen der Größe eines Managed Service for Apache Kafka-Clusters finden Sie unter Größe des Kafka-Clusters planen.

    • MEMORY: Die Größe des Arbeitsspeichers für den Cluster. Verwenden Sie die Einheiten „MB“, „MiB“, „GB“, „GiB“, „TB“ oder „TiB“. Beispiel: „10 GiB“.

    • SUBNETS: Die Liste der Subnetze, mit denen eine Verbindung hergestellt werden soll. Trennen Sie mehrere Subnetzwerte durch Kommas.

      Das Format des Subnetzes ist projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_ID.

    • auto-rebalance: Aktiviert den automatischen Ausgleich von Themenpartitionen unter Brokern, wenn sich die Anzahl der CPUs im Cluster ändert. Diese Einstellung ist standardmäßig aktiviert.

    • ENCRYPTION_KEY: ID des vom Kunden verwalteten Verschlüsselungsschlüssels, der für den Cluster verwendet werden soll.

      Das Format dafür ist projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/CRYPTO_KEY.

    • --async: Das System sendet die Erstellungsanfrage und gibt sofort eine Antwort zurück, ohne auf den Abschluss des Vorgangs zu warten. Mit dem Flag --async können Sie mit anderen Aufgaben fortfahren, während das Cluster im Hintergrund erstellt wird. Wenn Sie das Flag nicht verwenden, wartet das System, bis der Vorgang abgeschlossen ist, bevor eine Antwort zurückgegeben wird. Sie müssen warten, bis der Cluster vollständig aktualisiert wurde, bevor Sie mit anderen Aufgaben fortfahren können.

    • LABELS: Labels, die dem Cluster zugeordnet werden sollen.

      Weitere Informationen zum Format von Labels finden Sie unter Labels.

    Sie erhalten eine Antwort ähnlich der folgenden:

    Create request issued for: [CLUSTER_ID]
    Check operation [projects/PROJECT_ID/locations/LOCATION/operations/OPERATION_ID] for status.
    

    Speichere die OPERATION_ID, um progress zu verfolgen.

  3. REST

    Ersetzen Sie diese Werte in den folgenden Anfragedaten:

    • PROJECT_ID: Ihre Google Cloud Projekt-ID
    • LOCATION: Der Standort des Clusters
    • CLUSTER_ID: Die ID des Clusters
    • CPU_COUNT: die Anzahl der vCPUs für den Cluster
    • MEMORY: Die Größe des Arbeitsspeichers für den Cluster in Byte. Beispiel: 3221225472.
    • SUBNET_ID: Subnetz-ID des Subnetzes, mit dem eine Verbindung hergestellt werden soll. Beispiel: default.

    HTTP-Methode und URL:

    POST https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters?clusterId=CLUSTER_ID

    JSON-Text anfordern:

    {
      "capacityConfig": {
        "vcpuCount": CPU_COUNT,
        "memoryBytes": MEMORY
      },
      "gcpConfig": {
        "accessConfig": {
          "networkConfigs": [
            {
              "subnet": "projects/PROJECT_ID/regions/LOCATION/subnetworks/SUBNET_ID"
            }
          ]
        }
      }
    }
    

    Wenn Sie die Anfrage senden möchten, maximieren Sie eine der folgenden Optionen:

    Sie sollten eine JSON-Antwort ähnlich wie diese erhalten:

    {
      "name": "projects/PROJECT_ID/locations/LOCATION/operations/OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.managedkafka.v1.OperationMetadata",
        "createTime": "CREATE_TIME",
        "target": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID",
        "verb": "create",
        "requestedCancellation": false,
        "apiVersion": "v1"
      },
      "done": false
    }
    

    Terraform

    Sie können eine Terraform-Ressource verwenden, um einen Cluster zu erstellen.

    resource "google_managed_kafka_cluster" "default" {
      project    = data.google_project.default.project_id # Replace this with your project ID in quotes
      cluster_id = "my-cluster-id"
      location   = "us-central1"
      capacity_config {
        vcpu_count   = 3
        memory_bytes = 3221225472
      }
      gcp_config {
        access_config {
          network_configs {
            subnet = google_compute_subnetwork.default.id
          }
        }
      }
    }

    Informationen zum Anwenden oder Entfernen einer Terraform-Konfiguration finden Sie unter Grundlegende Terraform-Befehle.

    Go

    Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Einrichtungsanleitung für Go unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Go API für Managed Service for Apache Kafka.

    Richten Sie zur Authentifizierung bei Managed Service for Apache Kafka die Standardanmeldedaten für Anwendungen(Application Default Credentials, ADC) ein. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func createCluster(w io.Writer, projectID, region, clusterID, subnet string, cpu, memoryBytes int64, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// subnet := "projects/my-project-id/regions/us-central1/subnetworks/default"
    	// cpu := 3
    	// memoryBytes := 3221225472
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	locationPath := fmt.Sprintf("projects/%s/locations/%s", projectID, region)
    	clusterPath := fmt.Sprintf("%s/clusters/%s", locationPath, clusterID)
    
    	// Memory must be between 1 GiB and 8 GiB per CPU.
    	capacityConfig := &managedkafkapb.CapacityConfig{
    		VcpuCount:   cpu,
    		MemoryBytes: memoryBytes,
    	}
    	var networkConfig []*managedkafkapb.NetworkConfig
    	networkConfig = append(networkConfig, &managedkafkapb.NetworkConfig{
    		Subnet: subnet,
    	})
    	platformConfig := &managedkafkapb.Cluster_GcpConfig{
    		GcpConfig: &managedkafkapb.GcpConfig{
    			AccessConfig: &managedkafkapb.AccessConfig{
    				NetworkConfigs: networkConfig,
    			},
    		},
    	}
    	rebalanceConfig := &managedkafkapb.RebalanceConfig{
    		Mode: managedkafkapb.RebalanceConfig_AUTO_REBALANCE_ON_SCALE_UP,
    	}
    	cluster := &managedkafkapb.Cluster{
    		Name:            clusterPath,
    		CapacityConfig:  capacityConfig,
    		PlatformConfig:  platformConfig,
    		RebalanceConfig: rebalanceConfig,
    	}
    
    	req := &managedkafkapb.CreateClusterRequest{
    		Parent:    locationPath,
    		ClusterId: clusterID,
    		Cluster:   cluster,
    	}
    	op, err := client.CreateCluster(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateCluster got err: %w", err)
    	}
    	// The duration of this operation can vary considerably, typically taking 10-40 minutes.
    	resp, err := op.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("op.Wait got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created cluster: %s\n", resp.Name)
    	return nil
    }
    

    Java

    Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Einrichtungsanleitung für Java unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Java API für Managed Service for Apache Kafka.

    Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Managed Service for Apache Kafka zu authentifizieren. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.api.gax.longrunning.OperationSnapshot;
    import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
    import com.google.api.gax.retrying.RetrySettings;
    import com.google.api.gax.retrying.RetryingFuture;
    import com.google.api.gax.retrying.TimedRetryAlgorithm;
    import com.google.cloud.managedkafka.v1.AccessConfig;
    import com.google.cloud.managedkafka.v1.CapacityConfig;
    import com.google.cloud.managedkafka.v1.Cluster;
    import com.google.cloud.managedkafka.v1.CreateClusterRequest;
    import com.google.cloud.managedkafka.v1.GcpConfig;
    import com.google.cloud.managedkafka.v1.LocationName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.ManagedKafkaSettings;
    import com.google.cloud.managedkafka.v1.NetworkConfig;
    import com.google.cloud.managedkafka.v1.OperationMetadata;
    import com.google.cloud.managedkafka.v1.RebalanceConfig;
    import java.time.Duration;
    import java.util.concurrent.ExecutionException;
    
    public class CreateCluster {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        String subnet = "my-subnet"; // e.g. projects/my-project/regions/my-region/subnetworks/my-subnet
        int cpu = 3;
        long memoryBytes = 3221225472L; // 3 GiB
        createCluster(projectId, region, clusterId, subnet, cpu, memoryBytes);
      }
    
      public static void createCluster(
          String projectId, String region, String clusterId, String subnet, int cpu, long memoryBytes)
          throws Exception {
        CapacityConfig capacityConfig =
            CapacityConfig.newBuilder().setVcpuCount(cpu).setMemoryBytes(memoryBytes).build();
        NetworkConfig networkConfig = NetworkConfig.newBuilder().setSubnet(subnet).build();
        GcpConfig gcpConfig =
            GcpConfig.newBuilder()
                .setAccessConfig(AccessConfig.newBuilder().addNetworkConfigs(networkConfig).build())
                .build();
        RebalanceConfig rebalanceConfig =
            RebalanceConfig.newBuilder()
                .setMode(RebalanceConfig.Mode.AUTO_REBALANCE_ON_SCALE_UP)
                .build();
        Cluster cluster =
            Cluster.newBuilder()
                .setCapacityConfig(capacityConfig)
                .setGcpConfig(gcpConfig)
                .setRebalanceConfig(rebalanceConfig)
                .build();
    
        // Create the settings to configure the timeout for polling operations
        ManagedKafkaSettings.Builder settingsBuilder = ManagedKafkaSettings.newBuilder();
        TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
            RetrySettings.newBuilder()
                .setTotalTimeoutDuration(Duration.ofHours(1L))
                .build());
        settingsBuilder.createClusterOperationSettings()
            .setPollingAlgorithm(timedRetryAlgorithm);
    
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create(
            settingsBuilder.build())) {
    
          CreateClusterRequest request =
              CreateClusterRequest.newBuilder()
                  .setParent(LocationName.of(projectId, region).toString())
                  .setClusterId(clusterId)
                  .setCluster(cluster)
                  .build();
    
          // The duration of this operation can vary considerably, typically taking between 10-40
          // minutes.
          OperationFuture<Cluster, OperationMetadata> future =
              managedKafkaClient.createClusterOperationCallable().futureCall(request);
    
          // Get the initial LRO and print details.
          OperationSnapshot operation = future.getInitialFuture().get();
          System.out.printf("Cluster creation started. Operation name: %s\nDone: %s\nMetadata: %s\n",
              operation.getName(),
              operation.isDone(),
              future.getMetadata().get().toString());
    
          while (!future.isDone()) {
            // The pollingFuture gives us the most recent status of the operation
            RetryingFuture<OperationSnapshot> pollingFuture = future.getPollingFuture();
            OperationSnapshot currentOp = pollingFuture.getAttemptResult().get();
            System.out.printf("Polling Operation:\nName: %s\n Done: %s\n",
                currentOp.getName(),
                currentOp.isDone());
          }
    
          // NOTE: future.get() blocks completion until the operation is complete (isDone =  True)
          Cluster response = future.get();
          System.out.printf("Created cluster: %s\n", response.getName());
        } catch (ExecutionException e) {
          System.err.printf("managedKafkaClient.createCluster got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Anleitung für die Einrichtung von Python unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Python API für Managed Service for Apache Kafka.

    Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Managed Service for Apache Kafka zu authentifizieren. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # subnet = "projects/my-project-id/regions/us-central1/subnetworks/default"
    # cpu = 3
    # memory_bytes = 3221225472
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    cluster = managedkafka_v1.Cluster()
    cluster.name = client.cluster_path(project_id, region, cluster_id)
    cluster.capacity_config.vcpu_count = cpu
    cluster.capacity_config.memory_bytes = memory_bytes
    cluster.gcp_config.access_config.network_configs = [
        managedkafka_v1.NetworkConfig(subnet=subnet)
    ]
    cluster.rebalance_config.mode = (
        managedkafka_v1.RebalanceConfig.Mode.AUTO_REBALANCE_ON_SCALE_UP
    )
    
    request = managedkafka_v1.CreateClusterRequest(
        parent=client.common_location_path(project_id, region),
        cluster_id=cluster_id,
        cluster=cluster,
    )
    
    try:
        operation = client.create_cluster(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        # The duration of this operation can vary considerably, typically taking 10-40 minutes.
        # We can set a timeout of 3000s (50 minutes).
        response = operation.result(timeout=3000)
        print("Created cluster:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e.message}")
    

Vorgang zur Clustererstellung überwachen

Sie können den folgenden Befehl nur ausführen, wenn Sie den Cluster mit der gcloud CLI erstellt haben.

  • Das Erstellen eines Clusters dauert in der Regel 20 bis 30 Minuten. Um den Fortschritt der Clustererstellung zu verfolgen, verwendet der Befehl gcloud managed-kafka clusters create einen Vorgang mit langer Ausführungszeit (Long-Running Operation, LRO), den Sie mit dem folgenden Befehl überwachen können:

    gcloud managed-kafka operations describe OPERATION_ID \
        --location=LOCATION
    

    Ersetzen Sie Folgendes:

    • OPERATION_ID mit dem Wert der Vorgangs-ID aus dem vorherigen Abschnitt.
    • LOCATION durch den Wert des Standorts aus dem vorherigen Abschnitt.

Fehlerbehebung

Im Folgenden finden Sie einige Fehler, die beim Erstellen von Clustern auftreten können.

Service agent service-${PROJECT_NUMBER}@gcp-sa-managedkafka.iam.gserviceaccount.com has not been granted the required role cloudkms.cryptoKeyEncrypterDecrypter to encrypt data using the KMS key.

Dem Dienst-Agent für Managed Service for Apache Kafka fehlt die erforderliche Berechtigung für den Zugriff auf den Cloud KMS-Schlüssel. Weitere Informationen finden Sie in der Dokumentation zu erforderlichen Rollen zum Konfigurieren von CMEK.

Service does not have permission to retrieve subnet. Please grant service-${PROJECT_NUMBER}@gcp-sa-managedkafka.iam.gserviceaccount.com the managedkafka.serviceAgent role in the IAM policy of the project ${SUBNET_PROJECT} and ensure the Compute Engine API is enabled in project ${SUBNET_PROJECT}

Dem Dienst-Agent von Managed Service for Apache Kafka fehlt die erforderliche Rolle zum Konfigurieren des Netzwerks im VPC-Netzwerk, in dem die Kafka-Clients ausgeführt werden. Weitere Informationen finden Sie unter Cluster projektübergreifend verbinden.

Nächste Schritte

Apache Kafka® ist eine eingetragene Marke der Apache Software Foundation oder deren Tochtergesellschaften in den USA und/oder anderen Ländern.