Memperbarui grup konsumen Google Cloud Managed Service for Apache Kafka

Anda dapat memperbarui grup konsumen Google Cloud Managed Service untuk Apache Kafka guna mengubah offset untuk daftar partisi topik. Hal ini memungkinkan Anda mengontrol pesan yang diterima konsumen dalam grup.

Untuk memperbarui grup konsumen, Anda dapat menggunakan Google Cloud CLI, library klien, Managed Kafka API, atau Apache Kafka API open source. Konsol tidak didukung untuk mengedit grup konsumen. Google Cloud

Sebelum memulai

Untuk memperbarui grup konsumen, pastikan terlebih dahulu bahwa grup tersebut tidak aktif menggunakan pesan. Grup konsumen akan otomatis dihapus oleh Kafka jika tidak pernah menggunakan pesan apa pun, atau saat offset terakhir yang di-commit telah habis masa berlakunya setelah offsets.retention.minutes.

Ikuti langkah-langkah berikut sebelum memperbarui grup konsumen:

  1. Kirim beberapa pesan ke topik tempat grup konsumen Anda membaca pesan.

  2. Mulai grup konsumen Anda untuk memproses beberapa pesan.

  3. Hentikan semua konsumen Anda agar tidak menggunakan pesan. Untuk menghentikan konsumen, tekan Control+C.

Untuk mengetahui informasi selengkapnya tentang cara mengirim dan menggunakan pesan, lihat Membuat dan menggunakan pesan dengan alat command line Kafka.

Peran dan izin yang diperlukan untuk memperbarui grup konsumen

Untuk mendapatkan izin yang diperlukan untuk mengedit grup konsumen, minta administrator Anda untuk memberi Anda peran IAM Managed Kafka Consumer Group Editor (roles/managedkafka.consumerGroupEditor) di project Anda. Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.

Peran yang telah ditentukan ini berisi izin yang diperlukan untuk mengedit grup konsumen Anda. Untuk melihat izin yang benar-benar diperlukan, perluas bagian Izin yang diperlukan:

Izin yang diperlukan

Izin berikut diperlukan untuk mengedit grup konsumen Anda:

  • Memperbarui grup konsumen: managedkafka.consumerGroups.update

Anda mungkin juga bisa mendapatkan izin ini dengan peran khusus atau peran bawaan lainnya.

Untuk mengetahui informasi selengkapnya tentang peran Managed Kafka Consumer Group Editor, lihat Peran bawaan Managed Service untuk Apache Kafka.

Memberikan akses BACA ke agen layanan

Untuk memperbarui offset grup konsumen, agen layanan memerlukan akses ke operasi BACA pada resource topik dan grup konsumen. Akses ini dikonfigurasi dengan ACL Apache Kafka.

Jika Anda belum mengonfigurasi ACL Apache Kafka untuk grup konsumen dan topiknya dalam cluster, agen layanan akan memiliki akses ambient ke resource ini. Anda dapat melewati bagian ini.

Jika ACL Apache Kafka dikonfigurasi untuk grup konsumen dan topiknya dalam cluster, agen layanan memerlukan akses ACL eksplisit untuk operasi BACA bagi kedua resource. Untuk melakukannya, tambahkan entri ACL yang memberikan akses agen layanan ke operasi BACA pada grup konsumen dan topik yang relevan. Ikuti langkah-langkah berikut:

  1. Instal Google Cloud CLI.

  2. Jika Anda menggunakan penyedia identitas (IdP) eksternal, Anda harus login ke gcloud CLI dengan identitas gabungan Anda terlebih dahulu.

  3. Untuk melakukan inisialisasi gcloud CLI, jalankan perintah berikut:

    gcloud init
  4. Jalankan perintah gcloud managed-kafka acls add-acl-entry:

    gcloud managed-kafka acls add-acl-entry CONSUMER_GROUP_ACL_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION \
      --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
      --operation=READ
      --permission-type=ALLOW
      --host=*
    
    
      gcloud managed-kafka acls add-acl-entry TOPIC_ACL_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION \
      --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
      --operation=READ
      --permission-type=ALLOW
      --host=*

    Ganti kode berikut:

    • CONSUMER_GROUP_ACL_ID (wajib): ID unik resource ACL Managed Service untuk Apache Kafka tempat Anda ingin menambahkan entri ACL untuk grup konsumen. Untuk menerapkan akses ke semua grup konsumen, gunakan `allConsumerGroups`. Atau, untuk grup konsumen tertentu, gunakan `consumerGroup/CONSUMER_GROUP_NAME`.
    • TOPIC_ACL_ID (wajib): ID unik resource ACL Managed Service untuk Apache Kafka tempat Anda ingin menambahkan entri ACL untuk topik. Untuk menerapkan akses ke semua topik, gunakan `allTopics`. Atau, untuk topik tertentu, gunakan `topic/TOPIC_NAME`.
    • CLUSTER_ID (wajib): ID cluster yang berisi resource ACL.
    • LOCATION (wajib): region tempat cluster berada. Lihat Lokasi yang didukung.
    • PROJECT_NUMBER (wajib): nomor project tempat cluster berada. Nomor ini digunakan untuk membuat nama utama agen layanan untuk entri ACL.

Untuk mengetahui informasi selengkapnya tentang cara menambahkan entri ACL, lihat Menambahkan entri ACL.

Memperbarui grup konsumen

Pastikan Anda telah menyelesaikan langkah-langkah di bagian Sebelum memulai.

Untuk memperbarui grup konsumen, ikuti langkah-langkah berikut:

gcloud

  1. Di konsol, aktifkan Cloud Shell. Google Cloud

    Aktifkan Cloud Shell

    Di bagian bawah konsol Google Cloud , sesi Cloud Shell akan dimulai dan menampilkan prompt command line. Cloud Shell adalah lingkungan shell dengan Google Cloud CLI yang sudah terinstal, dan dengan nilai yang sudah ditetapkan untuk project Anda saat ini. Diperlukan waktu beberapa detik untuk melakukan inisialisasi pada sesi.

  2. Jalankan perintah gcloud managed-kafka consumer-groups update:

    gcloud managed-kafka consumer-groups update CONSUMER_GROUP_ID \
        --cluster=CLUSTER_ID \
        --location=LOCATION \
        --topics-file=TOPICS_FILE

    Ganti kode berikut:

    • CLUSTER_ID: ID atau nama cluster.

    • LOCATION: Lokasi cluster.

    • CONSUMER_GROUP_ID: ID atau nama grup konsumen.

    • TOPICS_FILE: Setelan ini menentukan lokasi file yang berisi konfigurasi topik yang akan diperbarui untuk grup konsumen. File dapat berformat JSON atau YAML. File dapat berupa jalur file atau langsung menyertakan konten JSON atau YAML.

      File topik menggunakan struktur JSON untuk merepresentasikan peta topik ConsumerGroup, dalam bentuk { topicName1: {ConsumerPartitionMetadata}, topicName2:{ConsumerPartitionMetadata}}. Untuk setiap topik, ConsumerPartitionMetadata memberikan offset dan metadata untuk setiap partisi.

      Untuk menetapkan offset untuk satu partisi (partisi 0) dalam topik bernama topic1 ke 10, konfigurasi JSON akan terlihat seperti:{"topic1": {"partitions": { 0 : { "offset": 10, "metadata": ""}}}}

      Berikut adalah contoh konten file topics.json:

      {
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            },
            "2": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        },
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/OTHER_TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        }
      }

    • TOPIC_PATH: Saat menentukan topik dalam file JSON atau YAML, sertakan jalur topik lengkap yang dapat diperoleh dengan menjalankan perintah gcloud managed-kafak topics describe dan dalam format projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_ID/topics/topic. .

Go

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
	"google.golang.org/protobuf/types/known/fieldmaskpb"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func updateConsumerGroup(w io.Writer, projectID, region, clusterID, consumerGroupID, topicPath string, partitionOffsets map[int32]int64, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// consumerGroupID := "my-consumer-group"
	// topicPath := "my-topic-path"
	// partitionOffsets := map[int32]int64{1: 10, 2: 20, 3: 30}
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	consumerGroupPath := fmt.Sprintf("%s/consumerGroups/%s", clusterPath, consumerGroupID)

	partitionMetadata := make(map[int32]*managedkafkapb.ConsumerPartitionMetadata)
	for partition, offset := range partitionOffsets {
		partitionMetadata[partition] = &managedkafkapb.ConsumerPartitionMetadata{
			Offset: offset,
		}
	}
	topicConfig := map[string]*managedkafkapb.ConsumerTopicMetadata{
		topicPath: {
			Partitions: partitionMetadata,
		},
	}
	consumerGroupConfig := managedkafkapb.ConsumerGroup{
		Name:   consumerGroupPath,
		Topics: topicConfig,
	}
	paths := []string{"topics"}
	updateMask := &fieldmaskpb.FieldMask{
		Paths: paths,
	}

	req := &managedkafkapb.UpdateConsumerGroupRequest{
		UpdateMask:    updateMask,
		ConsumerGroup: &consumerGroupConfig,
	}
	consumerGroup, err := client.UpdateConsumerGroup(ctx, req)
	if err != nil {
		return fmt.Errorf("client.UpdateConsumerGroup got err: %w", err)
	}
	fmt.Fprintf(w, "Updated consumer group: %#v\n", consumerGroup)
	return nil
}

Java

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ConsumerGroup;
import com.google.cloud.managedkafka.v1.ConsumerGroupName;
import com.google.cloud.managedkafka.v1.ConsumerPartitionMetadata;
import com.google.cloud.managedkafka.v1.ConsumerTopicMetadata;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.TopicName;
import com.google.cloud.managedkafka.v1.UpdateConsumerGroupRequest;
import com.google.protobuf.FieldMask;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class UpdateConsumerGroup {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String topicId = "my-topic";
    String consumerGroupId = "my-consumer-group";
    Map<Integer, Integer> partitionOffsets =
        new HashMap<Integer, Integer>() {
          {
            put(1, 10);
            put(2, 20);
            put(3, 30);
          }
        };
    updateConsumerGroup(projectId, region, clusterId, topicId, consumerGroupId, partitionOffsets);
  }

  public static void updateConsumerGroup(
      String projectId,
      String region,
      String clusterId,
      String topicId,
      String consumerGroupId,
      Map<Integer, Integer> partitionOffsets)
      throws Exception {
    TopicName topicName = TopicName.of(projectId, region, clusterId, topicId);
    ConsumerGroupName consumerGroupName =
        ConsumerGroupName.of(projectId, region, clusterId, consumerGroupId);

    Map<Integer, ConsumerPartitionMetadata> partitions =
        new HashMap<Integer, ConsumerPartitionMetadata>() {
          {
            for (Entry<Integer, Integer> partitionOffset : partitionOffsets.entrySet()) {
              ConsumerPartitionMetadata partitionMetadata =
                  ConsumerPartitionMetadata.newBuilder()
                      .setOffset(partitionOffset.getValue())
                      .build();
              put(partitionOffset.getKey(), partitionMetadata);
            }
          }
        };
    ConsumerTopicMetadata topicMetadata =
        ConsumerTopicMetadata.newBuilder().putAllPartitions(partitions).build();
    ConsumerGroup consumerGroup =
        ConsumerGroup.newBuilder()
            .setName(consumerGroupName.toString())
            .putTopics(topicName.toString(), topicMetadata)
            .build();
    FieldMask updateMask = FieldMask.newBuilder().addPaths("topics").build();

    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      UpdateConsumerGroupRequest request =
          UpdateConsumerGroupRequest.newBuilder()
              .setUpdateMask(updateMask)
              .setConsumerGroup(consumerGroup)
              .build();
      // This operation is being handled synchronously.
      ConsumerGroup response = managedKafkaClient.updateConsumerGroup(request);
      System.out.printf("Updated consumer group: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.updateConsumerGroup got err: %s", e.getMessage());
    }
  }
}

Python

from google.api_core.exceptions import NotFound
from google.cloud import managedkafka_v1
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# consumer_group_id = "my-consumer-group"
# topic_path = "my-topic-path"
# partition_offsets = {10: 10}

client = managedkafka_v1.ManagedKafkaClient()

consumer_group = managedkafka_v1.ConsumerGroup()
consumer_group.name = client.consumer_group_path(
    project_id, region, cluster_id, consumer_group_id
)

topic_metadata = managedkafka_v1.ConsumerTopicMetadata()
for partition, offset in partition_offsets.items():
    partition_metadata = managedkafka_v1.ConsumerPartitionMetadata(offset=offset)
    topic_metadata.partitions[partition] = partition_metadata
consumer_group.topics = {
    topic_path: topic_metadata,
}

update_mask = field_mask_pb2.FieldMask()
update_mask.paths.append("topics")

request = managedkafka_v1.UpdateConsumerGroupRequest(
    update_mask=update_mask,
    consumer_group=consumer_group,
)

try:
    response = client.update_consumer_group(request=request)
    print("Updated consumer group:", response)
except NotFound as e:
    print(f"Failed to update consumer group {consumer_group_id} with error: {e.message}")

Apa langkah selanjutnya?

Apache Kafka® adalah merek dagang terdaftar dari The Apache Software Foundation atau afiliasinya di Amerika Serikat dan/atau negara lain.