Mengembangkan aplikasi produsen Python

Pelajari cara mengembangkan aplikasi produsen Python yang melakukan autentikasi dengan cluster Managed Service untuk Apache Kafka menggunakan Kredensial Default Aplikasi (ADC). ADC memungkinkan aplikasi yang berjalan di Google Cloud secara otomatis menemukan dan menggunakan kredensial yang tepat untuk melakukan autentikasi ke Google Cloud layanan.

Sebelum memulai

Sebelum memulai tutorial ini, buat cluster Managed Service untuk Apache Kafka baru. Jika sudah memiliki cluster, Anda dapat melewati langkah ini.

Cara membuat cluster

Konsol

  1. Buka halaman Managed Service untuk Apache Kafka > Cluster.

    Buka Cluster

  2. Klik Buat.
  3. Di kotak Nama cluster, masukkan nama untuk cluster.
  4. Dalam daftar Region, pilih lokasi untuk cluster.
  5. Untuk Konfigurasi jaringan, konfigurasikan subnet tempat cluster dapat diakses:
    1. Untuk Project, pilih project Anda.
    2. Untuk Network, pilih jaringan VPC.
    3. Untuk Subnet, pilih subnet.
    4. Klik Selesai.
  6. Klik Buat.

Setelah Anda mengklik Buat, status cluster adalah Creating. Saat cluster siap, statusnya adalah Active.

gcloud

Untuk membuat cluster Kafka, jalankan perintah managed-kafka clusters create.

gcloud managed-kafka clusters create KAFKA_CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
--async

Ganti kode berikut:

  • KAFKA_CLUSTER: nama untuk cluster Kafka
  • REGION: lokasi cluster
  • PROJECT_ID: project ID Anda
  • SUBNET_NAME: subnet tempat Anda ingin membuat cluster, misalnya default

Untuk mengetahui informasi tentang lokasi yang didukung, lihat Lokasi Managed Service untuk Apache Kafka.

Perintah ini berjalan secara asinkron dan menampilkan ID operasi:

Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.

Untuk melacak progres operasi pembuatan, gunakan perintah gcloud managed-kafka operations describe:

gcloud managed-kafka operations describe OPERATION_ID \
  --location=REGION

Saat cluster siap, output dari perintah ini akan menyertakan entri state: ACTIVE. Untuk mengetahui informasi selengkapnya, lihat Memantau operasi pembuatan cluster.

Peran yang diperlukan

Untuk mendapatkan izin yang Anda perlukan untuk membuat dan mengonfigurasi VM klien, minta administrator untuk memberi Anda peran IAM berikut pada project:

Untuk mengetahui informasi selengkapnya tentang pemberian peran, lihat Mengelola akses ke project, folder, dan organisasi.

Anda mungkin juga bisa mendapatkan izin yang diperlukan melalui peran khusus atau peran bawaan lainnya.

Menyiapkan VM klien

Buat instance virtual machine (VM) Linux di Compute Engine yang dapat mengakses cluster Kafka. Saat mengonfigurasi VM, tetapkan opsi berikut:

  • Region. Buat VM di region yang sama dengan cluster Kafka Anda.

  • Subnet. Buat VM di jaringan VPC yang sama dengan subnet yang Anda gunakan dalam konfigurasi cluster Kafka. Untuk mengetahui informasi selengkapnya, lihat Melihat subnet cluster.

  • Access scopes. Tetapkan https://www.googleapis.com/auth/cloud-platform cakupan akses ke VM. Cakupan ini mengotorisasi VM untuk mengirim permintaan ke Managed Kafka API.

Langkah-langkah berikut menunjukkan cara menetapkan opsi ini.

Konsol

  1. Di Google Cloud konsol, buka halaman Buat instance.

    Membuat instance

  2. Di panel Machine configuration, lakukan hal berikut:

    1. Di kolom Name, tentukan nama untuk instance Anda. Untuk mengetahui informasi selengkapnya, lihat Konvensi penamaan resource.

    2. Dalam daftar Region, pilih region yang sama dengan cluster Kafka Anda.

    3. Dalam daftar Zone, pilih zona.

  3. Di menu navigasi, klik Networking. Di panel Networking yang muncul, lakukan hal berikut:

    1. Buka bagian Network interfaces.

    2. Untuk meluaskan antarmuka jaringan default, klik the arrow.

    3. Di kolom Network, pilih jaringan VPC.

    4. Dalam daftar Subnetwork, pilih subnet.

    5. Klik Selesai.

  4. Di menu navigasi, klik Security. Di panel Security yang muncul, lakukan hal berikut:

    1. Untuk Access scopes, pilih Set access for each API.

    2. Dalam daftar cakupan akses, temukan menu drop-down Cloud Platform , lalu pilih Enabled.

  5. Klik Create untuk membuat VM.

gcloud

Untuk membuat instance VM, gunakan gcloud compute instances create perintah.

gcloud compute instances create VM_NAME \
  --scopes=https://www.googleapis.com/auth/cloud-platform \
  --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
  --zone=ZONE

Ganti kode berikut:

  • VM_NAME: nama VM
  • PROJECT_ID: project ID Anda
  • REGION: region tempat Anda membuat cluster Kafka, misalnya us-central1
  • SUBNET: subnet di jaringan VPC yang sama dengan subnet yang Anda gunakan dalam konfigurasi cluster
  • ZONE: zona di region tempat Anda membuat cluster, misalnya us-central1-c

Untuk mengetahui informasi selengkapnya tentang cara membuat VM, lihat Membuat instance VM di subnet tertentu.

Memberikan peran IAM

Berikan peran Identity and Access Management (IAM) berikut ke akun layanan default Compute Engine:

  • Managed Kafka Client (roles/managedkafka.client)
  • Service Account Token Creator (roles/iam.serviceAccountTokenCreator)
  • Service Account OpenID Token Creator (roles/iam.serviceAccountOpenIdTokenCreator)

Konsol

  1. Di Google Cloud konsol, buka halaman IAM.

    Buka IAM

  2. Temukan baris untuk Compute Engine default service account , lalu klik Edit principal.

  3. Klik Add another role , lalu pilih peran Managed Kafka Client. Ulangi langkah ini untuk peran Service Account Token Creator dan Service Account OpenID Token Creator.

  4. Klik Simpan.

gcloud

Untuk memberikan peran IAM, gunakan gcloud projects add-iam-policy-binding perintah.

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/managedkafka.client

gcloud projects add-iam-policy-binding PROJECT_ID\
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountTokenCreator

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountOpenIdTokenCreator

Ganti kode berikut:

  • PROJECT_ID: project ID Anda

  • PROJECT_NUMBER: nomor project Anda

Untuk mendapatkan nomor project, jalankan perintah gcloud projects describe:

gcloud projects describe PROJECT_ID

Untuk mengetahui informasi selengkapnya, lihat Menemukan nama, nomor, dan ID project.

Terhubung ke VM

Gunakan SSH untuk terhubung ke instance VM.

Konsol

  1. Buka halaman VM instances.

    Buka instance VM

  2. Dalam daftar instance VM, temukan nama VM, lalu klik SSH.

gcloud

Untuk terhubung ke VM, gunakan perintah gcloud compute ssh.

gcloud compute ssh VM_NAME \
  --project=PROJECT_ID \
  --zone=ZONE

Ganti kode berikut:

  • VM_NAME: nama VM
  • PROJECT_ID: project ID Anda
  • ZONE: zona tempat Anda membuat VM

Konfigurasi tambahan mungkin diperlukan untuk penggunaan SSH pertama kali. Untuk mengetahui informasi selengkapnya, lihat Tentang koneksi SSH.

Membuat aplikasi produsen Python

Dari sesi SSH Anda, jalankan perintah berikut untuk membuat aplikasi produsen.

  1. Instal pip, pengelola paket Python, dan pengelola lingkungan virtual:

    sudo apt install python3-pip -y
    sudo apt install python3-venv -y
    
  2. Buat lingkungan virtual (venv) baru dan aktifkan:

    python3 -m venv kafka
    source kafka/bin/activate
    
  3. Instal klien confluent-kafka dan dependensi lainnya:

    pip install confluent-kafka google-auth urllib3 packaging
    
  4. Salin kode klien produsen berikut ke dalam file bernama producer.py

    import confluent_kafka
    import argparse
    from tokenprovider import TokenProvider
    
    parser = argparse.ArgumentParser()
    parser.add_argument('-b', '--bootstrap-servers', dest='bootstrap', type=str, required=True)
    parser.add_argument('-t', '--topic-name', dest='topic_name', type=str, default='example-topic', required=False)
    parser.add_argument('-n', '--num_messages', dest='num_messages', type=int, default=1, required=False)
    args = parser.parse_args()
    
    token_provider = TokenProvider()
    
    config = {
        'bootstrap.servers': args.bootstrap,
        'security.protocol': 'SASL_SSL',
        'sasl.mechanisms': 'OAUTHBEARER',
        'oauth_cb': token_provider.get_token,
    }
    
    producer = confluent_kafka.Producer(config)
    
    def callback(error, message):
        if error is not None:
            print(error)
            return
        print("Delivered a message to {}[{}]".format(message.topic(), message.partition()))
    
    for i in range(args.num_messages):
    
      message = f"{i} hello world!".encode('utf-8')
      producer.produce(args.topic_name, message, callback=callback)
    
    producer.flush()
    
  5. Sekarang Anda memerlukan implementasi penyedia token OAuth. Simpan kode berikut dalam file bernama tokenprovider.py:

    import base64
    import datetime
    import http.server
    import json
    import google.auth
    from google.auth.transport.urllib3 import Request
    import urllib3
    import time
    
    def encode(source):
      """Safe base64 encoding."""
      return base64.urlsafe_b64encode(source.encode('utf-8')).decode('utf-8').rstrip('=')
    
    class TokenProvider(object):
      """
      Provides OAuth tokens from Google Cloud Application Default credentials.
      """
      HEADER = json.dumps({'typ':'JWT', 'alg':'GOOG_OAUTH2_TOKEN'})
    
      def __init__(self, **config):
        self.credentials, _project = google.auth.default()
        self.http_client = urllib3.PoolManager()
    
      def get_credentials(self):
        if not self.credentials.valid:
          self.credentials.refresh(Request(self.http_client))
        return self.credentials
    
      def get_jwt(self, creds):
        token_data = dict(
          exp=creds.expiry.replace(tzinfo=datetime.timezone.utc).timestamp(),
          iat=datetime.datetime.now(datetime.timezone.utc).timestamp(),
          iss='Google',
          sub=creds.service_account_email
        )
        return json.dumps(token_data)
    
      def get_token(self, args):
        creds = self.get_credentials()
        token = '.'.join([
          encode(self.HEADER),
          encode(self.get_jwt(creds)),
          encode(creds.token)
        ])
    
        # compute expiry time
        expiry_utc = creds.expiry.replace(tzinfo=datetime.timezone.utc)
        now_utc = datetime.datetime.now(datetime.timezone.utc)
        expiry_seconds = (expiry_utc - now_utc).total_seconds()
    
        return token, time.time() + expiry_seconds
    
  6. Anda kini siap menjalankan aplikasi:

    python producer.py -b bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092
    

Pembersihan

Agar akunAnda tidak dikenai biaya untuk resource yang digunakan pada halaman ini, ikuti langkah-langkah berikut. Google Cloud

Konsol

  1. Hapus instance VM.

    1. Buka halaman VM instances.

      Buka instance VM

    2. Pilih VM, lalu klik Delete.

  2. Hapus cluster Kafka.

    1. Buka halaman Managed Service untuk Apache Kafka > Cluster.

      Buka Cluster

    2. Pilih cluster Kafka, lalu klik Delete.

gcloud

  1. Untuk menghapus VM, gunakan gcloud compute instances delete perintah.

    gcloud compute instances delete VM_NAME --zone=ZONE
    
  2. Untuk menghapus cluster Kafka, gunakan gcloud managed-kafka clusters delete perintah.

    gcloud managed-kafka clusters delete CLUSTER_ID \
      --location=REGION --async
    

Langkah berikutnya

Apache Kafka® adalah merek dagang terdaftar dari The Apache Software Foundation atau afiliasinya di Amerika Serikat dan/atau negara lain.