Crea un cluster Kafka

Crea un cluster Kafka

Per saperne di più

Per la documentazione dettagliata che include questo esempio di codice, vedi quanto segue:

Esempio di codice

Go

Prima di provare questo esempio, segui le istruzioni di configurazione di Go nella guida rapida di Managed Service per Apache Kafka per l'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Managed Service per Apache Kafka Go.

Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione. Per saperne di più, consulta Configura l'autenticazione per un ambiente di sviluppo locale.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func createCluster(w io.Writer, projectID, region, clusterID, subnet string, cpu, memoryBytes int64, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// subnet := "projects/my-project-id/regions/us-central1/subnetworks/default"
	// cpu := 3
	// memoryBytes := 3221225472
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	locationPath := fmt.Sprintf("projects/%s/locations/%s", projectID, region)
	clusterPath := fmt.Sprintf("%s/clusters/%s", locationPath, clusterID)

	// Memory must be between 1 GiB and 8 GiB per CPU.
	capacityConfig := &managedkafkapb.CapacityConfig{
		VcpuCount:   cpu,
		MemoryBytes: memoryBytes,
	}
	var networkConfig []*managedkafkapb.NetworkConfig
	networkConfig = append(networkConfig, &managedkafkapb.NetworkConfig{
		Subnet: subnet,
	})
	platformConfig := &managedkafkapb.Cluster_GcpConfig{
		GcpConfig: &managedkafkapb.GcpConfig{
			AccessConfig: &managedkafkapb.AccessConfig{
				NetworkConfigs: networkConfig,
			},
		},
	}
	rebalanceConfig := &managedkafkapb.RebalanceConfig{
		Mode: managedkafkapb.RebalanceConfig_AUTO_REBALANCE_ON_SCALE_UP,
	}
	cluster := &managedkafkapb.Cluster{
		Name:            clusterPath,
		CapacityConfig:  capacityConfig,
		PlatformConfig:  platformConfig,
		RebalanceConfig: rebalanceConfig,
	}

	req := &managedkafkapb.CreateClusterRequest{
		Parent:    locationPath,
		ClusterId: clusterID,
		Cluster:   cluster,
	}
	op, err := client.CreateCluster(ctx, req)
	if err != nil {
		return fmt.Errorf("client.CreateCluster got err: %w", err)
	}
	// The duration of this operation can vary considerably, typically taking 10-40 minutes.
	resp, err := op.Wait(ctx)
	if err != nil {
		return fmt.Errorf("op.Wait got err: %w", err)
	}
	fmt.Fprintf(w, "Created cluster: %s\n", resp.Name)
	return nil
}

Java

Prima di provare questo esempio, segui le istruzioni di configurazione di Java nella guida rapida di Managed Service per Apache Kafka per l'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Managed Service per Apache Kafka Java.

Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione. Per saperne di più, consulta Configura l'autenticazione per un ambiente di sviluppo locale.


import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.longrunning.OperationSnapshot;
import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.RetryingFuture;
import com.google.api.gax.retrying.TimedRetryAlgorithm;
import com.google.cloud.managedkafka.v1.AccessConfig;
import com.google.cloud.managedkafka.v1.CapacityConfig;
import com.google.cloud.managedkafka.v1.Cluster;
import com.google.cloud.managedkafka.v1.CreateClusterRequest;
import com.google.cloud.managedkafka.v1.GcpConfig;
import com.google.cloud.managedkafka.v1.LocationName;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.ManagedKafkaSettings;
import com.google.cloud.managedkafka.v1.NetworkConfig;
import com.google.cloud.managedkafka.v1.OperationMetadata;
import com.google.cloud.managedkafka.v1.RebalanceConfig;
import java.time.Duration;
import java.util.concurrent.ExecutionException;

public class CreateCluster {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String subnet = "my-subnet"; // e.g. projects/my-project/regions/my-region/subnetworks/my-subnet
    int cpu = 3;
    long memoryBytes = 3221225472L; // 3 GiB
    createCluster(projectId, region, clusterId, subnet, cpu, memoryBytes);
  }

  public static void createCluster(
      String projectId, String region, String clusterId, String subnet, int cpu, long memoryBytes)
      throws Exception {
    CapacityConfig capacityConfig =
        CapacityConfig.newBuilder().setVcpuCount(cpu).setMemoryBytes(memoryBytes).build();
    NetworkConfig networkConfig = NetworkConfig.newBuilder().setSubnet(subnet).build();
    GcpConfig gcpConfig =
        GcpConfig.newBuilder()
            .setAccessConfig(AccessConfig.newBuilder().addNetworkConfigs(networkConfig).build())
            .build();
    RebalanceConfig rebalanceConfig =
        RebalanceConfig.newBuilder()
            .setMode(RebalanceConfig.Mode.AUTO_REBALANCE_ON_SCALE_UP)
            .build();
    Cluster cluster =
        Cluster.newBuilder()
            .setCapacityConfig(capacityConfig)
            .setGcpConfig(gcpConfig)
            .setRebalanceConfig(rebalanceConfig)
            .build();

    // Create the settings to configure the timeout for polling operations
    ManagedKafkaSettings.Builder settingsBuilder = ManagedKafkaSettings.newBuilder();
    TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
        RetrySettings.newBuilder()
            .setTotalTimeoutDuration(Duration.ofHours(1L))
            .build());
    settingsBuilder.createClusterOperationSettings()
        .setPollingAlgorithm(timedRetryAlgorithm);

    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create(
        settingsBuilder.build())) {

      CreateClusterRequest request =
          CreateClusterRequest.newBuilder()
              .setParent(LocationName.of(projectId, region).toString())
              .setClusterId(clusterId)
              .setCluster(cluster)
              .build();

      // The duration of this operation can vary considerably, typically taking between 10-40
      // minutes.
      OperationFuture<Cluster, OperationMetadata> future =
          managedKafkaClient.createClusterOperationCallable().futureCall(request);

      // Get the initial LRO and print details.
      OperationSnapshot operation = future.getInitialFuture().get();
      System.out.printf("Cluster creation started. Operation name: %s\nDone: %s\nMetadata: %s\n",
          operation.getName(),
          operation.isDone(),
          future.getMetadata().get().toString());

      while (!future.isDone()) {
        // The pollingFuture gives us the most recent status of the operation
        RetryingFuture<OperationSnapshot> pollingFuture = future.getPollingFuture();
        OperationSnapshot currentOp = pollingFuture.getAttemptResult().get();
        System.out.printf("Polling Operation:\nName: %s\n Done: %s\n",
            currentOp.getName(),
            currentOp.isDone());
      }

      // NOTE: future.get() blocks completion until the operation is complete (isDone =  True)
      Cluster response = future.get();
      System.out.printf("Created cluster: %s\n", response.getName());
    } catch (ExecutionException e) {
      System.err.printf("managedKafkaClient.createCluster got err: %s", e.getMessage());
    }
  }
}

Python

Prima di provare questo esempio, segui le istruzioni di configurazione di Python nella guida rapida di Managed Service per Apache Kafka per l'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Managed Service per Apache Kafka Python.

Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione. Per saperne di più, consulta Configura l'autenticazione per un ambiente di sviluppo locale.

from google.api_core.exceptions import GoogleAPICallError
from google.cloud import managedkafka_v1

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# subnet = "projects/my-project-id/regions/us-central1/subnetworks/default"
# cpu = 3
# memory_bytes = 3221225472

client = managedkafka_v1.ManagedKafkaClient()

cluster = managedkafka_v1.Cluster()
cluster.name = client.cluster_path(project_id, region, cluster_id)
cluster.capacity_config.vcpu_count = cpu
cluster.capacity_config.memory_bytes = memory_bytes
cluster.gcp_config.access_config.network_configs = [
    managedkafka_v1.NetworkConfig(subnet=subnet)
]
cluster.rebalance_config.mode = (
    managedkafka_v1.RebalanceConfig.Mode.AUTO_REBALANCE_ON_SCALE_UP
)

request = managedkafka_v1.CreateClusterRequest(
    parent=client.common_location_path(project_id, region),
    cluster_id=cluster_id,
    cluster=cluster,
)

try:
    operation = client.create_cluster(request=request)
    print(f"Waiting for operation {operation.operation.name} to complete...")
    # The duration of this operation can vary considerably, typically taking 10-40 minutes.
    # We can set a timeout of 3000s (50 minutes).
    response = operation.result(timeout=3000)
    print("Created cluster:", response)
except GoogleAPICallError as e:
    print(f"The operation failed with error: {e.message}")

Terraform

Per scoprire come applicare o rimuovere una configurazione Terraform, consulta Comandi Terraform di base. Per saperne di più, consulta la documentazione di riferimento del fornitore Terraform.

resource "google_managed_kafka_cluster" "default" {
  project    = data.google_project.default.project_id # Replace this with your project ID in quotes
  cluster_id = "my-cluster-id"
  location   = "us-central1"
  capacity_config {
    vcpu_count   = 3
    memory_bytes = 3221225472
  }
  gcp_config {
    access_config {
      network_configs {
        subnet = google_compute_subnetwork.default.id
      }
    }
  }
}

Passaggi successivi

Per cercare e filtrare gli esempi di codice per altri prodotti Google Cloud , consulta il browser degli esempi diGoogle Cloud .