Mengembangkan aplikasi produsen Python
Pelajari cara mengembangkan aplikasi produsen Python yang melakukan autentikasi dengan cluster Layanan Terkelola untuk Apache Kafka menggunakan Kredensial Default Aplikasi (ADC). ADC memungkinkan aplikasi yang berjalan di Google Cloud menemukan dan menggunakan kredensial yang tepat secara otomatis untuk melakukan autentikasi ke layanan Google Cloud .
Sebelum memulai
Sebelum memulai tutorial ini, buat cluster Managed Service for Apache Kafka yang baru. Jika sudah memiliki cluster, Anda dapat melewati langkah ini.
Cara membuat cluster
Konsol
- Buka halaman Managed Service for Apache Kafka > Clusters.
- Klik Create.
- Di kotak Cluster name, masukkan nama untuk cluster.
- Dalam daftar Region, pilih lokasi untuk cluster.
-
Untuk Network configuration, konfigurasikan subnet tempat cluster dapat diakses:
- Untuk Project, pilih project Anda.
- Untuk Network, pilih jaringan VPC.
- Untuk Subnet, pilih subnet.
- Klik Done.
- Klik Create.
Setelah Anda mengklik Create, status cluster adalah Creating. Saat cluster
siap, statusnya adalah Active.
gcloud
Untuk membuat cluster Kafka, jalankan perintah
managed-kafka clusters
create.
gcloud managed-kafka clusters create KAFKA_CLUSTER \ --location=REGION \ --cpu=3 \ --memory=3GiB \ --subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --async
Ganti kode berikut:
KAFKA_CLUSTER: nama untuk cluster KafkaREGION: lokasi clusterPROJECT_ID: project ID AndaSUBNET_NAME: subnet tempat Anda ingin membuat cluster, misalnyadefault
Untuk mengetahui informasi tentang lokasi yang didukung, lihat Lokasi Managed Service for Apache Kafka.
Perintah berjalan secara asinkron dan menampilkan ID operasi:
Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.
Untuk melacak progres operasi pembuatan, gunakan perintah
gcloud managed-kafka
operations describe:
gcloud managed-kafka operations describe OPERATION_ID \ --location=REGION
Setelah cluster siap, output dari perintah ini akan menyertakan entri state:
ACTIVE. Untuk mengetahui informasi selengkapnya, lihat
Memantau
operasi pembuatan cluster.
Peran yang diperlukan
Agar mendapatkan izin yang diperlukan untuk membuat dan mengonfigurasi VM klien, minta administrator untuk memberi Anda peran IAM berikut di project:
-
Compute Instance Admin (v1) (
roles/compute.instanceAdmin.v1) -
Project IAM Admin (
roles/resourcemanager.projectIamAdmin) -
Role Viewer (
roles/iam.roleViewer) -
Pengguna Akun Layanan (
roles/iam.serviceAccountUser)
Untuk mengetahui informasi selengkapnya tentang pemberian peran, lihat Mengelola akses ke project, folder, dan organisasi.
Anda mungkin juga bisa mendapatkan izin yang diperlukan melalui peran khusus atau peran bawaan lainnya.
Menyiapkan VM klien
Buat instance virtual machine (VM) Linux di Compute Engine yang dapat mengakses cluster Kafka. Saat Anda mengonfigurasi VM, tetapkan opsi berikut:
Region. Buat VM di region yang sama dengan cluster Kafka Anda.
Subnet. Buat VM di jaringan VPC yang sama dengan subnet yang Anda gunakan dalam konfigurasi cluster Kafka. Untuk mengetahui informasi selengkapnya, lihat Melihat subnet cluster.
Cakupan akses. Tetapkan cakupan akses
https://www.googleapis.com/auth/cloud-platformke VM. Cakupan ini mengizinkan VM untuk mengirim permintaan ke Managed Kafka API.
Langkah-langkah berikut menunjukkan cara menyetel opsi ini.
Konsol
Di konsol Google Cloud , buka halaman Create an instance.
Di panel Machine configuration, lakukan hal berikut:
Di kolom Name, tentukan nama untuk instance Anda. Untuk mengetahui informasi selengkapnya, lihat Konvensi penamaan resource.
Di daftar Region, pilih region yang sama dengan cluster Kafka Anda.
Di daftar Zone, pilih zona.
Di menu navigasi, klik Networking. Di panel Networking yang muncul, lakukan hal berikut:
Buka bagian Network interfaces.
Untuk meluaskan antarmuka jaringan default, klik panah .
Di kolom Network, pilih jaringan VPC.
Dalam daftar Subnetwork, pilih subnet.
Klik Done.
Di menu navigasi, klik Security. Di panel Security yang muncul, lakukan hal berikut:
Di bagian Access scopes, pilih Set access for each API.
Dalam daftar cakupan akses, temukan menu drop-down Cloud Platform dan pilih Aktifkan.
Klik Create untuk membuat VM.
gcloud
Untuk membuat instance VM, gunakan perintah
gcloud compute instances create.
gcloud compute instances create VM_NAME \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
--zone=ZONE
Ganti kode berikut:
- VM_NAME: nama VM
- PROJECT_ID: project ID Anda
- REGION: region tempat Anda membuat cluster Kafka, misalnya
us-central1 - SUBNET: subnet di jaringan VPC yang sama dengan subnet yang Anda gunakan dalam konfigurasi cluster
- ZONE: zona di region tempat Anda membuat
cluster, misalnya
us-central1-c
Untuk mengetahui informasi selengkapnya tentang cara membuat VM, lihat Membuat instance VM di subnet tertentu.
Memberikan peran IAM
Berikan peran Identity and Access Management (IAM) berikut ke akun layanan default Compute Engine:
- Managed Kafka Client (
roles/managedkafka.client) - Service Account Token Creator (
roles/iam.serviceAccountTokenCreator) Service Account OpenID Token Creator (
roles/iam.serviceAccountOpenIdTokenCreator)
Konsol
Di konsol Google Cloud , buka halaman IAM.
Temukan baris untuk akun layanan default Compute Engine, lalu klik Edit principal.
Klik Add another role, lalu pilih peran Managed Kafka Client. Ulangi langkah ini untuk peran Service Account Token Creator dan Service Account OpenID Token Creator.
Klik Simpan.
gcloud
Untuk memberikan peran IAM, gunakan perintah
gcloud projects add-iam-policy-binding.
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
gcloud projects add-iam-policy-binding PROJECT_ID\
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/iam.serviceAccountTokenCreator
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/iam.serviceAccountOpenIdTokenCreator
Ganti kode berikut:
PROJECT_ID: project ID Anda
PROJECT_NUMBER: nomor project Anda
Untuk mendapatkan nomor project, jalankan perintah
gcloud projects describe:
gcloud projects describe PROJECT_ID
Untuk mengetahui informasi selengkapnya, lihat Menemukan nama, nomor, dan ID project.
Terhubung ke VM
Gunakan SSH untuk terhubung ke instance VM.
Konsol
Buka halaman VM instances.
Dalam daftar instance VM, temukan nama VM, lalu klik SSH.
gcloud
Untuk terhubung ke VM, gunakan perintah
gcloud compute ssh.
gcloud compute ssh VM_NAME \
--project=PROJECT_ID \
--zone=ZONE
Ganti kode berikut:
- VM_NAME: nama VM
- PROJECT_ID: project ID Anda
- ZONE: zona tempat Anda membuat VM
Konfigurasi tambahan mungkin diperlukan untuk penggunaan SSH pertama kali. Untuk mengetahui informasi selengkapnya, lihat Tentang koneksi SSH.
Membuat aplikasi produsen Python
Dari sesi SSH Anda, jalankan perintah berikut untuk membuat aplikasi produser.
Instal pip, pengelola paket Python, dan pengelola lingkungan virtual:
sudo apt install python3-pip -y sudo apt install python3-venv -yBuat lingkungan virtual (venv) baru dan aktifkan:
python3 -m venv kafka source kafka/bin/activateInstal klien
confluent-kafkadan dependensi lainnya:pip install confluent-kafka google-auth urllib3 packagingSalin kode klien produser berikut ke dalam file bernama
producer.pyimport confluent_kafka import argparse from tokenprovider import TokenProvider parser = argparse.ArgumentParser() parser.add_argument('-b', '--bootstrap-servers', dest='bootstrap', type=str, required=True) parser.add_argument('-t', '--topic-name', dest='topic_name', type=str, default='example-topic', required=False) parser.add_argument('-n', '--num_messages', dest='num_messages', type=int, default=1, required=False) args = parser.parse_args() token_provider = TokenProvider() config = { 'bootstrap.servers': args.bootstrap, 'security.protocol': 'SASL_SSL', 'sasl.mechanisms': 'OAUTHBEARER', 'oauth_cb': token_provider.get_token, } producer = confluent_kafka.Producer(config) def callback(error, message): if error is not None: print(error) return print("Delivered a message to {}[{}]".format(message.topic(), message.partition())) for i in range(args.num_messages): message = f"{i} hello world!".encode('utf-8') producer.produce(args.topic_name, message, callback=callback) producer.flush()Sekarang Anda memerlukan implementasi penyedia token OAuth. Simpan kode berikut dalam file bernama
tokenprovider.py:import base64 import datetime import http.server import json import google.auth from google.auth.transport.urllib3 import Request import urllib3 import time def encode(source): """Safe base64 encoding.""" return base64.urlsafe_b64encode(source.encode('utf-8')).decode('utf-8').rstrip('=') class TokenProvider(object): """ Provides OAuth tokens from Google Cloud Application Default credentials. """ HEADER = json.dumps({'typ':'JWT', 'alg':'GOOG_OAUTH2_TOKEN'}) def __init__(self, **config): self.credentials, _project = google.auth.default() self.http_client = urllib3.PoolManager() def get_credentials(self): if not self.credentials.valid: self.credentials.refresh(Request(self.http_client)) return self.credentials def get_jwt(self, creds): token_data = dict( exp=creds.expiry.replace(tzinfo=datetime.timezone.utc).timestamp(), iat=datetime.datetime.now(datetime.timezone.utc).timestamp(), iss='Google', sub=creds.service_account_email ) return json.dumps(token_data) def get_token(self, args): creds = self.get_credentials() token = '.'.join([ encode(self.HEADER), encode(self.get_jwt(creds)), encode(creds.token) ]) # compute expiry time expiry_utc = creds.expiry.replace(tzinfo=datetime.timezone.utc) now_utc = datetime.datetime.now(datetime.timezone.utc) expiry_seconds = (expiry_utc - now_utc).total_seconds() return token, time.time() + expiry_secondsSekarang Anda siap menjalankan aplikasi:
python producer.py -b bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092
Pembersihan
Agar akun Google Cloud Anda tidak dikenai biaya untuk resource yang digunakan pada halaman ini, ikuti langkah-langkah berikut.
Konsol
Hapus instance VM.
Buka halaman VM instances.
Pilih VM, lalu klik Hapus.
Hapus cluster Kafka.
Buka halaman Managed Service for Apache Kafka > Clusters.
Pilih cluster Kafka, lalu klik Hapus.
gcloud
Untuk menghapus VM, gunakan perintah
gcloud compute instances delete.gcloud compute instances delete VM_NAME --zone=ZONEUntuk menghapus cluster Kafka, gunakan perintah
gcloud managed-kafka clusters delete.gcloud managed-kafka clusters delete CLUSTER_ID \ --location=REGION --async
Langkah berikutnya
Mengautentikasi klien Kafka dengan Managed Service for Apache Kafka.
Alat dan dokumentasi autentikasi Managed Service untuk Apache Kafka