Anda dapat memperbarui grup konsumen Google Cloud Managed Service for Apache Kafka untuk mengubah offset bagi daftar partisi topik. Tindakan ini memungkinkan Anda mengontrol pesan mana yang diterima konsumen dalam grup.
Untuk mengupdate grup konsumen, Anda dapat menggunakan Google Cloud CLI, library klien, Managed Kafka API, atau Apache Kafka API open source. Konsol Google Cloud tidak didukung untuk mengedit grup konsumen.
Sebelum memulai
Untuk memperbarui grup konsumen, pastikan terlebih dahulu bahwa grup tersebut tidak sedang aktif menggunakan pesan.
Grup konsumen akan otomatis dihapus oleh Kafka jika tidak pernah menggunakan
pesan apa pun, atau saat offset terakhir yang di-commit telah berakhir setelah
offsets.retention.minutes.
Ikuti langkah-langkah berikut sebelum Anda memperbarui grup konsumen:
Kirim beberapa pesan ke topik tempat grup konsumen Anda membaca pesan.
Mulai grup konsumen Anda untuk memproses beberapa pesan.
Menghentikan semua konsumen Anda agar tidak menggunakan pesan. Untuk menghentikan konsumen, tekan Control+C.
Untuk mengetahui informasi selengkapnya tentang mengirim dan menggunakan pesan, lihat Membuat dan menggunakan pesan dengan alat command line Kafka.
Peran dan izin yang diperlukan untuk memperbarui grup konsumen
Untuk mendapatkan izin yang
diperlukan guna mengedit grup konsumen, minta administrator Anda untuk memberi Anda peran IAM
Managed Kafka Consumer Group Editor (roles/managedkafka.consumerGroupEditor)
di project Anda.
Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.
Peran bawaan ini berisi izin yang diperlukan untuk mengedit grup konsumen Anda. Untuk melihat izin yang benar-benar diperlukan, perluas bagian Izin yang diperlukan:
Izin yang diperlukan
Izin berikut diperlukan untuk mengedit grup konsumen Anda:
-
Perbarui grup konsumen:
managedkafka.consumerGroups.update
Anda mungkin juga bisa mendapatkan izin ini dengan peran khusus atau peran bawaan lainnya.
Untuk mengetahui informasi selengkapnya tentang peran Managed Kafka Consumer Group Editor, lihat Peran standar Managed Service for Apache Kafka.
Memberi agen layanan akses BACA
Untuk memperbarui offset grup konsumen, agen layanan memerlukan akses ke operasi BACA pada topik dan resource grup konsumen. Akses ini dikonfigurasi dengan ACL Apache Kafka.
Jika Anda belum mengonfigurasi ACL Apache Kafka untuk grup konsumen dan topiknya dalam cluster, agen layanan memiliki akses ambient ke resource ini. Anda dapat melewati bagian ini.
Jika ACL Apache Kafka dikonfigurasi untuk grup konsumen dan topiknya dalam cluster, agen layanan memerlukan akses ACL eksplisit untuk operasi BACA untuk kedua resource. Untuk melakukannya, tambahkan entri ACL yang memberikan akses agen layanan ke operasi BACA pada grup dan topik konsumen yang relevan. Ikuti langkah-langkah berikut:
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init Jalankan perintah
gcloud managed-kafka acls add-acl-entry:gcloud managed-kafka acls add-acl-entry CONSUMER_GROUP_ACL_ID \ --cluster=CLUSTER_ID \ --location=LOCATION \ --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \ --operation=READ --permission-type=ALLOW --host=* gcloud managed-kafka acls add-acl-entry TOPIC_ACL_ID \ --cluster=CLUSTER_ID \ --location=LOCATION \ --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \ --operation=READ --permission-type=ALLOW --host=*
Ganti kode berikut:
CONSUMER_GROUP_ACL_ID(wajib): ID unik resource ACL Managed Service untuk Apache Kafka tempat Anda ingin menambahkan entri ACL untuk grup konsumen. Untuk menerapkan akses ke semua grup konsumen, gunakan `allConsumerGroups`. Atau, untuk grup konsumen tertentu, gunakan `consumerGroup/CONSUMER_GROUP_NAME`.TOPIC_ACL_ID(wajib): ID unik dari resource ACL Managed Service untuk Apache Kafka tempat Anda ingin menambahkan entri ACL untuk topik. Untuk menerapkan akses ke semua topik, gunakan `allTopics`. Atau, untuk topik tertentu, gunakan `topic/TOPIC_NAME`.CLUSTER_ID(wajib): ID cluster yang berisi resource ACL.LOCATION(wajib): region tempat cluster berada. Lihat Lokasi yang didukung.PROJECT_NUMBER(wajib): nomor project tempat cluster berada. Ini digunakan untuk membuat nama entity utama agen layanan untuk entri ACL.
Untuk mengetahui informasi selengkapnya tentang cara menambahkan entri ACL, lihat Menambahkan entri ACL.
Memperbarui grup konsumen
Pastikan Anda telah menyelesaikan langkah-langkah di bagian Sebelum memulai.
Untuk memperbarui grup konsumen, ikuti langkah-langkah berikut:
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Jalankan perintah
gcloud managed-kafka consumer-groups update:gcloud managed-kafka consumer-groups update CONSUMER_GROUP_ID \ --cluster=CLUSTER_ID \ --location=LOCATION \ --topics-file=TOPICS_FILE
Ganti kode berikut:
-
CLUSTER_ID: ID atau nama cluster.
-
LOCATION: Lokasi cluster.
-
CONSUMER_GROUP_ID: ID atau nama grup konsumen.
-
TOPICS_FILE: Setelan ini menentukan lokasi file yang berisi konfigurasi topik yang akan diperbarui untuk grup konsumen. File dapat berupa format JSON atau YAML. Dapat berupa jalur file atau langsung menyertakan konten JSON atau YAML.
File topik menggunakan struktur JSON untuk merepresentasikan peta topik
ConsumerGroup, dalam bentuk{ topicName1: {ConsumerPartitionMetadata}, topicName2:{ConsumerPartitionMetadata}}. Untuk setiap topik,ConsumerPartitionMetadatamemberikan offset dan metadata untuk setiap partisi.Untuk menyetel offset satu partisi (partisi 0) dalam topik bernama
topic1ke 10, konfigurasi JSON akan terlihat seperti:{"topic1": {"partitions": { 0 : { "offset": 10, "metadata": ""}}}}Berikut adalah contoh isi file
topics.json:{ "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/TOPIC_NAME": { "partitions": { "1": { "offset": "1", "metadata": "metadata" }, "2": { "offset": "1", "metadata": "metadata" } } }, "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/OTHER_TOPIC_NAME": { "partitions": { "1": { "offset": "1", "metadata": "metadata" } } } }
-
TOPIC_PATH: Saat menentukan topik dalam file JSON atau YAML, sertakan jalur topik lengkap yang dapat diperoleh dari menjalankan perintah
gcloud managed-kafak topics describedan dalam formatprojects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_ID/topics/topic. .
-