Memperbarui grup konsumen Google Cloud Managed Service for Apache Kafka

Anda dapat memperbarui grup konsumen Google Cloud Managed Service for Apache Kafka untuk mengubah offset bagi daftar partisi topik. Tindakan ini memungkinkan Anda mengontrol pesan mana yang diterima konsumen dalam grup.

Untuk mengupdate grup konsumen, Anda dapat menggunakan Google Cloud CLI, library klien, Managed Kafka API, atau Apache Kafka API open source. Konsol Google Cloud tidak didukung untuk mengedit grup konsumen.

Sebelum memulai

Untuk memperbarui grup konsumen, pastikan terlebih dahulu bahwa grup tersebut tidak sedang aktif menggunakan pesan. Grup konsumen akan otomatis dihapus oleh Kafka jika tidak pernah menggunakan pesan apa pun, atau saat offset terakhir yang di-commit telah berakhir setelah offsets.retention.minutes.

Ikuti langkah-langkah berikut sebelum Anda memperbarui grup konsumen:

  1. Kirim beberapa pesan ke topik tempat grup konsumen Anda membaca pesan.

  2. Mulai grup konsumen Anda untuk memproses beberapa pesan.

  3. Menghentikan semua konsumen Anda agar tidak menggunakan pesan. Untuk menghentikan konsumen, tekan Control+C.

Untuk mengetahui informasi selengkapnya tentang mengirim dan menggunakan pesan, lihat Membuat dan menggunakan pesan dengan alat command line Kafka.

Peran dan izin yang diperlukan untuk memperbarui grup konsumen

Untuk mendapatkan izin yang diperlukan guna mengedit grup konsumen, minta administrator Anda untuk memberi Anda peran IAM Managed Kafka Consumer Group Editor (roles/managedkafka.consumerGroupEditor) di project Anda. Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.

Peran bawaan ini berisi izin yang diperlukan untuk mengedit grup konsumen Anda. Untuk melihat izin yang benar-benar diperlukan, perluas bagian Izin yang diperlukan:

Izin yang diperlukan

Izin berikut diperlukan untuk mengedit grup konsumen Anda:

  • Perbarui grup konsumen: managedkafka.consumerGroups.update

Anda mungkin juga bisa mendapatkan izin ini dengan peran khusus atau peran bawaan lainnya.

Untuk mengetahui informasi selengkapnya tentang peran Managed Kafka Consumer Group Editor, lihat Peran standar Managed Service for Apache Kafka.

Memberi agen layanan akses BACA

Untuk memperbarui offset grup konsumen, agen layanan memerlukan akses ke operasi BACA pada topik dan resource grup konsumen. Akses ini dikonfigurasi dengan ACL Apache Kafka.

Jika Anda belum mengonfigurasi ACL Apache Kafka untuk grup konsumen dan topiknya dalam cluster, agen layanan memiliki akses ambient ke resource ini. Anda dapat melewati bagian ini.

Jika ACL Apache Kafka dikonfigurasi untuk grup konsumen dan topiknya dalam cluster, agen layanan memerlukan akses ACL eksplisit untuk operasi BACA untuk kedua resource. Untuk melakukannya, tambahkan entri ACL yang memberikan akses agen layanan ke operasi BACA pada grup dan topik konsumen yang relevan. Ikuti langkah-langkah berikut:

  1. Install the Google Cloud CLI.

  2. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Jalankan perintah gcloud managed-kafka acls add-acl-entry:

    gcloud managed-kafka acls add-acl-entry CONSUMER_GROUP_ACL_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION \
      --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
      --operation=READ
      --permission-type=ALLOW
      --host=*
    
    
      gcloud managed-kafka acls add-acl-entry TOPIC_ACL_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION \
      --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
      --operation=READ
      --permission-type=ALLOW
      --host=*

    Ganti kode berikut:

    • CONSUMER_GROUP_ACL_ID (wajib): ID unik resource ACL Managed Service untuk Apache Kafka tempat Anda ingin menambahkan entri ACL untuk grup konsumen. Untuk menerapkan akses ke semua grup konsumen, gunakan `allConsumerGroups`. Atau, untuk grup konsumen tertentu, gunakan `consumerGroup/CONSUMER_GROUP_NAME`.
    • TOPIC_ACL_ID (wajib): ID unik dari resource ACL Managed Service untuk Apache Kafka tempat Anda ingin menambahkan entri ACL untuk topik. Untuk menerapkan akses ke semua topik, gunakan `allTopics`. Atau, untuk topik tertentu, gunakan `topic/TOPIC_NAME`.
    • CLUSTER_ID (wajib): ID cluster yang berisi resource ACL.
    • LOCATION (wajib): region tempat cluster berada. Lihat Lokasi yang didukung.
    • PROJECT_NUMBER (wajib): nomor project tempat cluster berada. Ini digunakan untuk membuat nama entity utama agen layanan untuk entri ACL.

Untuk mengetahui informasi selengkapnya tentang cara menambahkan entri ACL, lihat Menambahkan entri ACL.

Memperbarui grup konsumen

Pastikan Anda telah menyelesaikan langkah-langkah di bagian Sebelum memulai.

Untuk memperbarui grup konsumen, ikuti langkah-langkah berikut:

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Jalankan perintah gcloud managed-kafka consumer-groups update:

    gcloud managed-kafka consumer-groups update CONSUMER_GROUP_ID \
        --cluster=CLUSTER_ID \
        --location=LOCATION \
        --topics-file=TOPICS_FILE

    Ganti kode berikut:

    • CLUSTER_ID: ID atau nama cluster.

    • LOCATION: Lokasi cluster.

    • CONSUMER_GROUP_ID: ID atau nama grup konsumen.

    • TOPICS_FILE: Setelan ini menentukan lokasi file yang berisi konfigurasi topik yang akan diperbarui untuk grup konsumen. File dapat berupa format JSON atau YAML. Dapat berupa jalur file atau langsung menyertakan konten JSON atau YAML.

      File topik menggunakan struktur JSON untuk merepresentasikan peta topik ConsumerGroup, dalam bentuk { topicName1: {ConsumerPartitionMetadata}, topicName2:{ConsumerPartitionMetadata}}. Untuk setiap topik, ConsumerPartitionMetadata memberikan offset dan metadata untuk setiap partisi.

      Untuk menyetel offset satu partisi (partisi 0) dalam topik bernama topic1 ke 10, konfigurasi JSON akan terlihat seperti:{"topic1": {"partitions": { 0 : { "offset": 10, "metadata": ""}}}}

      Berikut adalah contoh isi file topics.json:

      {
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            },
            "2": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        },
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/OTHER_TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        }
      }

    • TOPIC_PATH: Saat menentukan topik dalam file JSON atau YAML, sertakan jalur topik lengkap yang dapat diperoleh dari menjalankan perintah gcloud managed-kafak topics describe dan dalam format projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_ID/topics/topic. .

  3. Go

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateConsumerGroup(w io.Writer, projectID, region, clusterID, consumerGroupID, topicPath string, partitionOffsets map[int32]int64, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// consumerGroupID := "my-consumer-group"
    	// topicPath := "my-topic-path"
    	// partitionOffsets := map[int32]int64{1: 10, 2: 20, 3: 30}
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	consumerGroupPath := fmt.Sprintf("%s/consumerGroups/%s", clusterPath, consumerGroupID)
    
    	partitionMetadata := make(map[int32]*managedkafkapb.ConsumerPartitionMetadata)
    	for partition, offset := range partitionOffsets {
    		partitionMetadata[partition] = &managedkafkapb.ConsumerPartitionMetadata{
    			Offset: offset,
    		}
    	}
    	topicConfig := map[string]*managedkafkapb.ConsumerTopicMetadata{
    		topicPath: {
    			Partitions: partitionMetadata,
    		},
    	}
    	consumerGroupConfig := managedkafkapb.ConsumerGroup{
    		Name:   consumerGroupPath,
    		Topics: topicConfig,
    	}
    	paths := []string{"topics"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateConsumerGroupRequest{
    		UpdateMask:    updateMask,
    		ConsumerGroup: &consumerGroupConfig,
    	}
    	consumerGroup, err := client.UpdateConsumerGroup(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateConsumerGroup got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated consumer group: %#v\n", consumerGroup)
    	return nil
    }
    

    Java

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConsumerGroup;
    import com.google.cloud.managedkafka.v1.ConsumerGroupName;
    import com.google.cloud.managedkafka.v1.ConsumerPartitionMetadata;
    import com.google.cloud.managedkafka.v1.ConsumerTopicMetadata;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.TopicName;
    import com.google.cloud.managedkafka.v1.UpdateConsumerGroupRequest;
    import com.google.protobuf.FieldMask;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class UpdateConsumerGroup {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        String topicId = "my-topic";
        String consumerGroupId = "my-consumer-group";
        Map<Integer, Integer> partitionOffsets =
            new HashMap<Integer, Integer>() {
              {
                put(1, 10);
                put(2, 20);
                put(3, 30);
              }
            };
        updateConsumerGroup(projectId, region, clusterId, topicId, consumerGroupId, partitionOffsets);
      }
    
      public static void updateConsumerGroup(
          String projectId,
          String region,
          String clusterId,
          String topicId,
          String consumerGroupId,
          Map<Integer, Integer> partitionOffsets)
          throws Exception {
        TopicName topicName = TopicName.of(projectId, region, clusterId, topicId);
        ConsumerGroupName consumerGroupName =
            ConsumerGroupName.of(projectId, region, clusterId, consumerGroupId);
    
        Map<Integer, ConsumerPartitionMetadata> partitions =
            new HashMap<Integer, ConsumerPartitionMetadata>() {
              {
                for (Entry<Integer, Integer> partitionOffset : partitionOffsets.entrySet()) {
                  ConsumerPartitionMetadata partitionMetadata =
                      ConsumerPartitionMetadata.newBuilder()
                          .setOffset(partitionOffset.getValue())
                          .build();
                  put(partitionOffset.getKey(), partitionMetadata);
                }
              }
            };
        ConsumerTopicMetadata topicMetadata =
            ConsumerTopicMetadata.newBuilder().putAllPartitions(partitions).build();
        ConsumerGroup consumerGroup =
            ConsumerGroup.newBuilder()
                .setName(consumerGroupName.toString())
                .putTopics(topicName.toString(), topicMetadata)
                .build();
        FieldMask updateMask = FieldMask.newBuilder().addPaths("topics").build();
    
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
          UpdateConsumerGroupRequest request =
              UpdateConsumerGroupRequest.newBuilder()
                  .setUpdateMask(updateMask)
                  .setConsumerGroup(consumerGroup)
                  .build();
          // This operation is being handled synchronously.
          ConsumerGroup response = managedKafkaClient.updateConsumerGroup(request);
          System.out.printf("Updated consumer group: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaClient.updateConsumerGroup got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    from google.api_core.exceptions import NotFound
    from google.cloud import managedkafka_v1
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # consumer_group_id = "my-consumer-group"
    # topic_path = "my-topic-path"
    # partition_offsets = {10: 10}
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    consumer_group = managedkafka_v1.ConsumerGroup()
    consumer_group.name = client.consumer_group_path(
        project_id, region, cluster_id, consumer_group_id
    )
    
    topic_metadata = managedkafka_v1.ConsumerTopicMetadata()
    for partition, offset in partition_offsets.items():
        partition_metadata = managedkafka_v1.ConsumerPartitionMetadata(offset=offset)
        topic_metadata.partitions[partition] = partition_metadata
    consumer_group.topics = {
        topic_path: topic_metadata,
    }
    
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("topics")
    
    request = managedkafka_v1.UpdateConsumerGroupRequest(
        update_mask=update_mask,
        consumer_group=consumer_group,
    )
    
    try:
        response = client.update_consumer_group(request=request)
        print("Updated consumer group:", response)
    except NotFound as e:
        print(f"Failed to update consumer group {consumer_group_id} with error: {e.message}")
    

Apa langkah selanjutnya?

Apache Kafka® adalah merek dagang terdaftar milik The Apache Software Foundation atau afiliasinya di Amerika Serikat dan/atau negara lain.