Puedes actualizar un grupo de consumidores de Google Cloud Managed Service para Apache Kafka para modificar los desplazamientos de una lista de particiones de temas. Esto te permite controlar qué mensajes reciben los consumidores del grupo.
Para actualizar un grupo de consumidores, puedes usar Google Cloud CLI, la biblioteca cliente, la API de Kafka administrado o las APIs de Apache Kafka de código abierto. La consola de Google Cloud no es compatible con la edición de un grupo de consumidores.
Antes de comenzar
Para actualizar un grupo de consumidores, primero asegúrate de que no esté consumiendo mensajes de forma activa.
Kafka borra automáticamente un grupo de consumidores si nunca consumió mensajes o cuando venció el último desplazamiento confirmado después de offsets.retention.minutes.
Sigue estos pasos antes de actualizar un grupo de consumidores:
Envía algunos mensajes al tema desde el que tu grupo de consumidores está leyendo mensajes.
Inicia tu grupo de consumidores para procesar algunos mensajes.
Detiene el consumo de mensajes de todos tus consumidores. Para detener un consumidor, presiona Control+C.
Para obtener más información sobre el envío y el consumo de mensajes, consulta Produce and consume messages with the Kafka command-line tools.
Roles y permisos obligatorios para actualizar un grupo de consumidores
Si quieres obtener los permisos que necesitas para editar tus grupos de consumidores, pídele a tu administrador que te otorgue el rol de IAM de Editor de grupos de consumidores de Kafka administrado (roles/managedkafka.consumerGroupEditor) en tu proyecto.
Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.
Este rol predefinido contiene los permisos necesarios para editar tus grupos de consumidores. Para ver los permisos exactos que son necesarios, expande la sección Permisos requeridos:
Permisos necesarios
Se requieren los siguientes permisos para editar tus grupos de consumidores:
-
Actualiza los grupos de consumidores:
managedkafka.consumerGroups.update
También puedes obtener estos permisos con roles personalizados o con otros roles predefinidos.
Para obtener más información sobre el rol de editor de grupos de consumidores de Kafka administrado, consulta Roles predefinidos de Managed Service para Apache Kafka.
Otorga acceso de LECTURA al agente de servicio
Para actualizar los desplazamientos del grupo de consumidores, el agente de servicio requiere acceso a la operación READ en los recursos del tema y del grupo de consumidores. Este acceso se configura con ACLs de Apache Kafka.
Si no configuraste ninguna ACL de Apache Kafka para el grupo de consumidores y su tema dentro del clúster, el agente de servicio tiene acceso ambiental a estos recursos. Puedes omitir esta sección.
Si se configuran ACL de Apache Kafka para el grupo de consumidores y su tema dentro del clúster, el agente de servicio requiere acceso explícito a la ACL para la operación de LECTURA de ambos recursos. Para ello, agrega entradas de LCA que otorguen al agente de servicio acceso a la operación de READ en el grupo de consumidores y el tema pertinentes. Lleva a cabo los pasos siguientes:
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init Ejecuta el comando
gcloud managed-kafka acls add-acl-entry:gcloud managed-kafka acls add-acl-entry CONSUMER_GROUP_ACL_ID \ --cluster=CLUSTER_ID \ --location=LOCATION \ --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \ --operation=READ --permission-type=ALLOW --host=* gcloud managed-kafka acls add-acl-entry TOPIC_ACL_ID \ --cluster=CLUSTER_ID \ --location=LOCATION \ --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \ --operation=READ --permission-type=ALLOW --host=*
Reemplaza lo siguiente:
CONSUMER_GROUP_ACL_ID(obligatorio): Es el ID único del recurso de ACL de Managed Service for Apache Kafka en el que deseas agregar la entrada de ACL para el grupo de consumidores. Para aplicar el acceso a todos los grupos de consumidores, usa "allConsumerGroups". Para un grupo de consumidores específico, usa "consumerGroup/NOMBRE_DEL_GRUPO_DE_CONSUMIDORES".TOPIC_ACL_ID(obligatorio): Es el ID único del recurso de ACL de Managed Service for Apache Kafka en el que deseas agregar la entrada de ACL para el tema. Para aplicar el acceso a todos los temas, usa `allTopics`. Para un tema específico, usa `topic/TOPIC_NAME`.CLUSTER_ID(obligatorio): Es el ID del clúster que contiene el recurso de ACL.LOCATION(obligatorio): Es la región en la que se encuentra el clúster. Consulta las ubicaciones admitidas.PROJECT_NUMBER(obligatorio): Es el número del proyecto en el que se encuentra el clúster. Se usa para compilar el nombre principal del agente de servicio para la entrada de la ACL.
Para obtener más información sobre cómo agregar una entrada de LCA, consulta Agrega una entrada de LCA.
Actualiza un grupo de consumidores
Asegúrate de haber completado los pasos de la sección Antes de comenzar.
Para actualizar un grupo de consumidores, sigue estos pasos:
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Ejecuta el comando
gcloud managed-kafka consumer-groups update:gcloud managed-kafka consumer-groups update CONSUMER_GROUP_ID \ --cluster=CLUSTER_ID \ --location=LOCATION \ --topics-file=TOPICS_FILE
Reemplaza lo siguiente:
-
CLUSTER_ID: Es el ID o el nombre del clúster.
-
LOCATION: Es la ubicación del clúster.
-
CONSUMER_GROUP_ID: Es el ID o el nombre del grupo de consumidores.
-
TOPICS_FILE: Este parámetro de configuración especifica la ubicación del archivo que contiene la configuración de los temas que se actualizarán para el grupo de consumidores. El archivo puede estar en formato JSON o YAML. Puede ser una ruta de acceso al archivo o incluir directamente el contenido JSON o YAML.
El archivo de temas usa una estructura JSON para representar un mapa de temas
ConsumerGroup, en el formato{ topicName1: {ConsumerPartitionMetadata}, topicName2:{ConsumerPartitionMetadata}}. Para cada tema,ConsumerPartitionMetadataproporciona el desplazamiento y los metadatos de cada partición.Para establecer el desplazamiento de una sola partición (partición 0) en un tema llamado
topic1en 10, la configuración JSON se vería de la siguiente manera:{"topic1": {"partitions": { 0 : { "offset": 10, "metadata": ""}}}}A continuación, se muestra un ejemplo del contenido de un archivo
topics.json:{ "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/TOPIC_NAME": { "partitions": { "1": { "offset": "1", "metadata": "metadata" }, "2": { "offset": "1", "metadata": "metadata" } } }, "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/OTHER_TOPIC_NAME": { "partitions": { "1": { "offset": "1", "metadata": "metadata" } } } }
-
TOPIC_PATH: Cuando especifiques temas en un archivo JSON o YAML, incluye la ruta de acceso completa al tema, que se puede obtener ejecutando el comando
gcloud managed-kafak topics describey que tiene el formatoprojects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_ID/topics/topic. .
-