Actualiza un grupo de consumidores de Google Cloud Managed Service para Apache Kafka

Puedes actualizar un grupo de consumidores de Google Cloud Managed Service para Apache Kafka para modificar los desplazamientos de una lista de particiones de temas. Esto te permite controlar qué mensajes reciben los consumidores del grupo.

Para actualizar un grupo de consumidores, puedes usar Google Cloud CLI, la biblioteca cliente, la API de Kafka administrado o las APIs de Apache Kafka de código abierto. La consola de Google Cloud no es compatible con la edición de un grupo de consumidores.

Antes de comenzar

Para actualizar un grupo de consumidores, primero asegúrate de que no esté consumiendo mensajes de forma activa. Kafka borra automáticamente un grupo de consumidores si nunca consumió mensajes o cuando venció el último desplazamiento confirmado después de offsets.retention.minutes.

Sigue estos pasos antes de actualizar un grupo de consumidores:

  1. Envía algunos mensajes al tema desde el que tu grupo de consumidores está leyendo mensajes.

  2. Inicia tu grupo de consumidores para procesar algunos mensajes.

  3. Detiene el consumo de mensajes de todos tus consumidores. Para detener un consumidor, presiona Control+C.

Para obtener más información sobre el envío y el consumo de mensajes, consulta Produce and consume messages with the Kafka command-line tools.

Roles y permisos obligatorios para actualizar un grupo de consumidores

Si quieres obtener los permisos que necesitas para editar tus grupos de consumidores, pídele a tu administrador que te otorgue el rol de IAM de Editor de grupos de consumidores de Kafka administrado (roles/managedkafka.consumerGroupEditor) en tu proyecto. Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

Este rol predefinido contiene los permisos necesarios para editar tus grupos de consumidores. Para ver los permisos exactos que son necesarios, expande la sección Permisos requeridos:

Permisos necesarios

Se requieren los siguientes permisos para editar tus grupos de consumidores:

  • Actualiza los grupos de consumidores: managedkafka.consumerGroups.update

También puedes obtener estos permisos con roles personalizados o con otros roles predefinidos.

Para obtener más información sobre el rol de editor de grupos de consumidores de Kafka administrado, consulta Roles predefinidos de Managed Service para Apache Kafka.

Otorga acceso de LECTURA al agente de servicio

Para actualizar los desplazamientos del grupo de consumidores, el agente de servicio requiere acceso a la operación READ en los recursos del tema y del grupo de consumidores. Este acceso se configura con ACLs de Apache Kafka.

Si no configuraste ninguna ACL de Apache Kafka para el grupo de consumidores y su tema dentro del clúster, el agente de servicio tiene acceso ambiental a estos recursos. Puedes omitir esta sección.

Si se configuran ACL de Apache Kafka para el grupo de consumidores y su tema dentro del clúster, el agente de servicio requiere acceso explícito a la ACL para la operación de LECTURA de ambos recursos. Para ello, agrega entradas de LCA que otorguen al agente de servicio acceso a la operación de READ en el grupo de consumidores y el tema pertinentes. Lleva a cabo los pasos siguientes:

  1. Install the Google Cloud CLI.

  2. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Ejecuta el comando gcloud managed-kafka acls add-acl-entry:

    gcloud managed-kafka acls add-acl-entry CONSUMER_GROUP_ACL_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION \
      --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
      --operation=READ
      --permission-type=ALLOW
      --host=*
    
    
      gcloud managed-kafka acls add-acl-entry TOPIC_ACL_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION \
      --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
      --operation=READ
      --permission-type=ALLOW
      --host=*

    Reemplaza lo siguiente:

    • CONSUMER_GROUP_ACL_ID (obligatorio): Es el ID único del recurso de ACL de Managed Service for Apache Kafka en el que deseas agregar la entrada de ACL para el grupo de consumidores. Para aplicar el acceso a todos los grupos de consumidores, usa "allConsumerGroups". Para un grupo de consumidores específico, usa "consumerGroup/NOMBRE_DEL_GRUPO_DE_CONSUMIDORES".
    • TOPIC_ACL_ID (obligatorio): Es el ID único del recurso de ACL de Managed Service for Apache Kafka en el que deseas agregar la entrada de ACL para el tema. Para aplicar el acceso a todos los temas, usa `allTopics`. Para un tema específico, usa `topic/TOPIC_NAME`.
    • CLUSTER_ID (obligatorio): Es el ID del clúster que contiene el recurso de ACL.
    • LOCATION (obligatorio): Es la región en la que se encuentra el clúster. Consulta las ubicaciones admitidas.
    • PROJECT_NUMBER (obligatorio): Es el número del proyecto en el que se encuentra el clúster. Se usa para compilar el nombre principal del agente de servicio para la entrada de la ACL.

Para obtener más información sobre cómo agregar una entrada de LCA, consulta Agrega una entrada de LCA.

Actualiza un grupo de consumidores

Asegúrate de haber completado los pasos de la sección Antes de comenzar.

Para actualizar un grupo de consumidores, sigue estos pasos:

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Ejecuta el comando gcloud managed-kafka consumer-groups update:

    gcloud managed-kafka consumer-groups update CONSUMER_GROUP_ID \
        --cluster=CLUSTER_ID \
        --location=LOCATION \
        --topics-file=TOPICS_FILE

    Reemplaza lo siguiente:

    • CLUSTER_ID: Es el ID o el nombre del clúster.

    • LOCATION: Es la ubicación del clúster.

    • CONSUMER_GROUP_ID: Es el ID o el nombre del grupo de consumidores.

    • TOPICS_FILE: Este parámetro de configuración especifica la ubicación del archivo que contiene la configuración de los temas que se actualizarán para el grupo de consumidores. El archivo puede estar en formato JSON o YAML. Puede ser una ruta de acceso al archivo o incluir directamente el contenido JSON o YAML.

      El archivo de temas usa una estructura JSON para representar un mapa de temas ConsumerGroup, en el formato { topicName1: {ConsumerPartitionMetadata}, topicName2:{ConsumerPartitionMetadata}}. Para cada tema, ConsumerPartitionMetadata proporciona el desplazamiento y los metadatos de cada partición.

      Para establecer el desplazamiento de una sola partición (partición 0) en un tema llamado topic1 en 10, la configuración JSON se vería de la siguiente manera:{"topic1": {"partitions": { 0 : { "offset": 10, "metadata": ""}}}}

      A continuación, se muestra un ejemplo del contenido de un archivo topics.json:

      {
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            },
            "2": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        },
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/OTHER_TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        }
      }

    • TOPIC_PATH: Cuando especifiques temas en un archivo JSON o YAML, incluye la ruta de acceso completa al tema, que se puede obtener ejecutando el comando gcloud managed-kafak topics describe y que tiene el formato projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_ID/topics/topic. .

  3. Go

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateConsumerGroup(w io.Writer, projectID, region, clusterID, consumerGroupID, topicPath string, partitionOffsets map[int32]int64, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// consumerGroupID := "my-consumer-group"
    	// topicPath := "my-topic-path"
    	// partitionOffsets := map[int32]int64{1: 10, 2: 20, 3: 30}
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	consumerGroupPath := fmt.Sprintf("%s/consumerGroups/%s", clusterPath, consumerGroupID)
    
    	partitionMetadata := make(map[int32]*managedkafkapb.ConsumerPartitionMetadata)
    	for partition, offset := range partitionOffsets {
    		partitionMetadata[partition] = &managedkafkapb.ConsumerPartitionMetadata{
    			Offset: offset,
    		}
    	}
    	topicConfig := map[string]*managedkafkapb.ConsumerTopicMetadata{
    		topicPath: {
    			Partitions: partitionMetadata,
    		},
    	}
    	consumerGroupConfig := managedkafkapb.ConsumerGroup{
    		Name:   consumerGroupPath,
    		Topics: topicConfig,
    	}
    	paths := []string{"topics"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateConsumerGroupRequest{
    		UpdateMask:    updateMask,
    		ConsumerGroup: &consumerGroupConfig,
    	}
    	consumerGroup, err := client.UpdateConsumerGroup(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateConsumerGroup got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated consumer group: %#v\n", consumerGroup)
    	return nil
    }
    

    Java

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConsumerGroup;
    import com.google.cloud.managedkafka.v1.ConsumerGroupName;
    import com.google.cloud.managedkafka.v1.ConsumerPartitionMetadata;
    import com.google.cloud.managedkafka.v1.ConsumerTopicMetadata;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.TopicName;
    import com.google.cloud.managedkafka.v1.UpdateConsumerGroupRequest;
    import com.google.protobuf.FieldMask;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class UpdateConsumerGroup {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        String topicId = "my-topic";
        String consumerGroupId = "my-consumer-group";
        Map<Integer, Integer> partitionOffsets =
            new HashMap<Integer, Integer>() {
              {
                put(1, 10);
                put(2, 20);
                put(3, 30);
              }
            };
        updateConsumerGroup(projectId, region, clusterId, topicId, consumerGroupId, partitionOffsets);
      }
    
      public static void updateConsumerGroup(
          String projectId,
          String region,
          String clusterId,
          String topicId,
          String consumerGroupId,
          Map<Integer, Integer> partitionOffsets)
          throws Exception {
        TopicName topicName = TopicName.of(projectId, region, clusterId, topicId);
        ConsumerGroupName consumerGroupName =
            ConsumerGroupName.of(projectId, region, clusterId, consumerGroupId);
    
        Map<Integer, ConsumerPartitionMetadata> partitions =
            new HashMap<Integer, ConsumerPartitionMetadata>() {
              {
                for (Entry<Integer, Integer> partitionOffset : partitionOffsets.entrySet()) {
                  ConsumerPartitionMetadata partitionMetadata =
                      ConsumerPartitionMetadata.newBuilder()
                          .setOffset(partitionOffset.getValue())
                          .build();
                  put(partitionOffset.getKey(), partitionMetadata);
                }
              }
            };
        ConsumerTopicMetadata topicMetadata =
            ConsumerTopicMetadata.newBuilder().putAllPartitions(partitions).build();
        ConsumerGroup consumerGroup =
            ConsumerGroup.newBuilder()
                .setName(consumerGroupName.toString())
                .putTopics(topicName.toString(), topicMetadata)
                .build();
        FieldMask updateMask = FieldMask.newBuilder().addPaths("topics").build();
    
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
          UpdateConsumerGroupRequest request =
              UpdateConsumerGroupRequest.newBuilder()
                  .setUpdateMask(updateMask)
                  .setConsumerGroup(consumerGroup)
                  .build();
          // This operation is being handled synchronously.
          ConsumerGroup response = managedKafkaClient.updateConsumerGroup(request);
          System.out.printf("Updated consumer group: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaClient.updateConsumerGroup got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    from google.api_core.exceptions import NotFound
    from google.cloud import managedkafka_v1
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # consumer_group_id = "my-consumer-group"
    # topic_path = "my-topic-path"
    # partition_offsets = {10: 10}
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    consumer_group = managedkafka_v1.ConsumerGroup()
    consumer_group.name = client.consumer_group_path(
        project_id, region, cluster_id, consumer_group_id
    )
    
    topic_metadata = managedkafka_v1.ConsumerTopicMetadata()
    for partition, offset in partition_offsets.items():
        partition_metadata = managedkafka_v1.ConsumerPartitionMetadata(offset=offset)
        topic_metadata.partitions[partition] = partition_metadata
    consumer_group.topics = {
        topic_path: topic_metadata,
    }
    
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("topics")
    
    request = managedkafka_v1.UpdateConsumerGroupRequest(
        update_mask=update_mask,
        consumer_group=consumer_group,
    )
    
    try:
        response = client.update_consumer_group(request=request)
        print("Updated consumer group:", response)
    except NotFound as e:
        print(f"Failed to update consumer group {consumer_group_id} with error: {e.message}")
    

Próximos pasos

Apache Kafka® es una marca registrada de The Apache Software Foundation o sus afiliados en Estados Unidos y otros países.