Sviluppa un'applicazione di produzione Python
Scopri come sviluppare un'applicazione di produzione Python che esegue l'autenticazione con un cluster Managed Service per Apache Kafka utilizzando le credenziali predefinite dell'applicazione (ADC). ADC consente alle applicazioni in esecuzione su Google Cloud di trovare e utilizzare automaticamente le credenziali corrette per l'autenticazione ai Google Cloud servizi.
Prima di iniziare
Prima di iniziare questo tutorial, crea un nuovo cluster Managed Service per Apache Kafka. Se hai già un cluster, puoi saltare questo passaggio.
Come creare un cluster
Console
- Vai alla pagina Managed Service per Apache Kafka > Cluster.
- Fai clic su Crea.
- Nella casella Nome cluster, inserisci un nome per il cluster.
- Nell'elenco Regione, seleziona una località per il cluster.
-
Per Configurazione di rete, configura la subnet in cui il cluster è accessibile:
- In Progetto, seleziona il tuo progetto.
- In Rete, seleziona la rete VPC.
- In Subnet, seleziona la subnet.
- Fai clic su Fine.
- Fai clic su Crea.
Dopo aver fatto clic su Crea, lo stato del cluster è Creating. Quando il cluster
è pronto, lo stato è Active.
gcloud
Per creare un cluster Kafka, esegui il
managed-kafka clusters
create comando.
gcloud managed-kafka clusters create KAFKA_CLUSTER \ --location=REGION \ --cpu=3 \ --memory=3GiB \ --subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --async
Sostituisci quanto segue:
KAFKA_CLUSTER: un nome per il cluster KafkaREGION: la località del clusterPROJECT_ID: il tuo ID progettoSUBNET_NAME: la subnet in cui vuoi creare il cluster, ad esempiodefault
Per informazioni sulle località supportate, consulta Località di Managed Service per Apache Kafka.
Il comando viene eseguito in modo asincrono e restituisce un ID operazione:
Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.
Per monitorare l'avanzamento dell'operazione di creazione, utilizza il
gcloud managed-kafka
operations describe comando:
gcloud managed-kafka operations describe OPERATION_ID \ --location=REGION
Quando il cluster è pronto, l'output di questo comando include la voce state:
ACTIVE. Per saperne di più, consulta
Monitorare l'operazione di creazione del cluster.
Ruoli obbligatori
Per ottenere le autorizzazioni necessarie per creare e configurare una VM client, chiedi all'amministratore di concederti i seguenti ruoli IAM nel progetto:
- Compute Instance Admin (v1) (
roles/compute.instanceAdmin.v1) - Project IAM Admin (
roles/resourcemanager.projectIamAdmin) - Role Viewer (
roles/iam.roleViewer) - Utente Service Account (
roles/iam.serviceAccountUser)
Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.
Potresti anche riuscire a ottenere le autorizzazioni richieste tramite i ruoli personalizzati o altri ruoli predefiniti.
Configura una VM client
Crea un'istanza di macchina virtuale (VM) Linux in Compute Engine che possa accedere al cluster Kafka. Quando configuri la VM, imposta le seguenti opzioni:
Regione. Crea la VM nella stessa regione del cluster Kafka.
Subnet. Crea la VM nella stessa rete VPC della subnet utilizzata nella configurazione del cluster Kafka. Per saperne di più, consulta Visualizzare le subnet di un cluster.
Ambiti di accesso. Assegna l'
https://www.googleapis.com/auth/cloud-platformambito di accesso alla VM. Questo ambito autorizza la VM a inviare richieste all'API Managed Kafka.
I passaggi seguenti mostrano come impostare queste opzioni.
Console
Nella Google Cloud console, vai alla pagina Crea un'istanza.
Nel riquadro Configurazione macchina, segui questi passaggi:
Nel campo Nome, specifica un nome per l'istanza. Per ulteriori informazioni, consulta le convenzioni per la denominazione delle risorse.
Nell'elenco Regione, seleziona la stessa regione del cluster Kafka.
Nell'elenco Zona, seleziona una zona.
Nel menu di navigazione, fai clic su Networking. Nel riquadro Networking visualizzato, segui questi passaggi:
Vai alla sezione Interfacce di rete.
Per espandere l'interfaccia di rete predefinita, fai clic sulla freccia.
Nel campo Rete, scegli la rete VPC.
Nell'elenco Subnet, seleziona la subnet.
Fai clic su Fine.
Nel menu di navigazione, fai clic su Sicurezza. Nel riquadro Sicurezza visualizzato, segui questi passaggi:
Per Ambiti di accesso, seleziona Imposta l'accesso per ogni API.
Nell'elenco degli ambiti di accesso, trova l'elenco a discesa Google Cloud e seleziona Attivato.
Fai clic su Crea per creare la VM.
gcloud
Per creare l'istanza VM, utilizza il
gcloud compute instances create
comando.
gcloud compute instances create VM_NAME \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
--zone=ZONE
Sostituisci quanto segue:
- VM_NAME: il nome della VM
- PROJECT_ID: il tuo ID progetto
- REGION: la regione in cui hai creato il cluster Kafka, ad esempio
us-central1 - SUBNET: una subnet nella stessa rete VPC della subnet utilizzata nella configurazione del cluster
- ZONE: una zona nella regione in cui hai creato il
cluster, ad esempio
us-central1-c
Per saperne di più sulla creazione di una VM, consulta Crea un'istanza VM in una subnet specifica.
Concedi ruoli IAM
Concedi i seguenti ruoli Identity and Access Management (IAM) al service account predefinito di Compute Engine:
- Managed Kafka Client (
roles/managedkafka.client) - Creatore token account di servizio (
roles/iam.serviceAccountTokenCreator) Creatore token OpenID account di servizio (
roles/iam.serviceAccountOpenIdTokenCreator)
Console
Nella Google Cloud console vai alla pagina IAM.
Trova la riga relativa a account di servizio predefinito Compute Engine e fai clic Modifica entità.
Fai clic su Aggiungi un altro ruolo e seleziona il ruolo Managed Kafka Client. Ripeti questo passaggio per i ruoli Creatore token account di servizio e Creatore token OpenID account di servizio.
Fai clic su Salva.
gcloud
Per concedere i ruoli IAM, utilizza il
gcloud projects add-iam-policy-binding
comando.
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
gcloud projects add-iam-policy-binding PROJECT_ID\
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/iam.serviceAccountTokenCreator
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/iam.serviceAccountOpenIdTokenCreator
Sostituisci quanto segue:
PROJECT_ID: il tuo ID progetto
PROJECT_NUMBER: il numero del progetto
Per ottenere il numero di progetto, esegui il
gcloud projects describe comando:
gcloud projects describe PROJECT_ID
Per saperne di più, consulta Trovare il nome, il numero e l'ID del progetto.
Connettiti alla VM
Utilizza SSH per connetterti all'istanza VM.
Console
Vai alla pagina Istanze VM.
Nell'elenco delle istanze VM, trova il nome della VM e fai clic su SSH.
gcloud
Per connetterti alla VM, utilizza il
gcloud compute ssh comando.
gcloud compute ssh VM_NAME \
--project=PROJECT_ID \
--zone=ZONE
Sostituisci quanto segue:
- VM_NAME: il nome della VM
- PROJECT_ID: il tuo ID progetto
- ZONE: la zona in cui hai creato la VM
Potrebbe essere necessaria una configurazione aggiuntiva per il primo utilizzo di SSH. Per saperne di più, consulta Informazioni sulle connessioni SSH.
Crea un'applicazione di produzione Python
Dalla sessione SSH, esegui i seguenti comandi per creare un'applicazione di produzione.
Installa pip, un gestore di pacchetti Python e il gestore di ambienti virtuali:
sudo apt install python3-pip -y sudo apt install python3-venv -yCrea un nuovo ambiente virtuale (venv) e attivalo:
python3 -m venv kafka source kafka/bin/activateInstalla il client
confluent-kafkae altre dipendenze:pip install confluent-kafka google-auth urllib3 packagingCopia il seguente codice client di produzione in un file denominato
producer.pyimport confluent_kafka import argparse from tokenprovider import TokenProvider parser = argparse.ArgumentParser() parser.add_argument('-b', '--bootstrap-servers', dest='bootstrap', type=str, required=True) parser.add_argument('-t', '--topic-name', dest='topic_name', type=str, default='example-topic', required=False) parser.add_argument('-n', '--num_messages', dest='num_messages', type=int, default=1, required=False) args = parser.parse_args() token_provider = TokenProvider() config = { 'bootstrap.servers': args.bootstrap, 'security.protocol': 'SASL_SSL', 'sasl.mechanisms': 'OAUTHBEARER', 'oauth_cb': token_provider.get_token, } producer = confluent_kafka.Producer(config) def callback(error, message): if error is not None: print(error) return print("Delivered a message to {}[{}]".format(message.topic(), message.partition())) for i in range(args.num_messages): message = f"{i} hello world!".encode('utf-8') producer.produce(args.topic_name, message, callback=callback) producer.flush()Ora ti serve un'implementazione del provider di token OAuth. Salva il seguente codice in un file denominato
tokenprovider.py:import base64 import datetime import http.server import json import google.auth from google.auth.transport.urllib3 import Request import urllib3 import time def encode(source): """Safe base64 encoding.""" return base64.urlsafe_b64encode(source.encode('utf-8')).decode('utf-8').rstrip('=') class TokenProvider(object): """ Provides OAuth tokens from Google Cloud Application Default credentials. """ HEADER = json.dumps({'typ':'JWT', 'alg':'GOOG_OAUTH2_TOKEN'}) def __init__(self, **config): self.credentials, _project = google.auth.default() self.http_client = urllib3.PoolManager() def get_credentials(self): if not self.credentials.valid: self.credentials.refresh(Request(self.http_client)) return self.credentials def get_jwt(self, creds): token_data = dict( exp=creds.expiry.replace(tzinfo=datetime.timezone.utc).timestamp(), iat=datetime.datetime.now(datetime.timezone.utc).timestamp(), iss='Google', sub=creds.service_account_email ) return json.dumps(token_data) def get_token(self, args): creds = self.get_credentials() token = '.'.join([ encode(self.HEADER), encode(self.get_jwt(creds)), encode(creds.token) ]) # compute expiry time expiry_utc = creds.expiry.replace(tzinfo=datetime.timezone.utc) now_utc = datetime.datetime.now(datetime.timezone.utc) expiry_seconds = (expiry_utc - now_utc).total_seconds() return token, time.time() + expiry_secondsOra puoi eseguire l'applicazione:
python producer.py -b bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092
Libera spazio
Per evitare che al tuo Google Cloud account vengano addebitati costi relativi alle risorse utilizzate in questa pagina, segui questi passaggi.
Console
Elimina l'istanza VM.
Vai alla pagina Istanze VM.
Seleziona la VM e fai clic su Elimina.
Elimina il cluster Kafka.
Vai alla pagina Managed Service per Apache Kafka > Cluster.
Seleziona il cluster Kafka e fai clic su Elimina.
gcloud
Per eliminare la VM, utilizza il
gcloud compute instances deletecomando.gcloud compute instances delete VM_NAME --zone=ZONEPer eliminare il cluster Kafka, utilizza il
gcloud managed-kafka clusters deletecomando.gcloud managed-kafka clusters delete CLUSTER_ID \ --location=REGION --async
Passaggi successivi
Autentica i client Kafka con Managed Service per Apache Kafka.
Strumenti e documentazione di autenticazione Managed Service per Apache Kafka