Desenvolver um aplicativo produtor em Python
Saiba como desenvolver um aplicativo produtor em Python que faz a autenticação com um cluster do serviço gerenciado para Apache Kafka usando as Credenciais padrão de aplicativo (ADC, na sigla em inglês). O ADC permite que aplicativos executados no Google Cloud encontrem e usem automaticamente as credenciais certas para autenticação nos serviços do Google Cloud .
Antes de começar
Antes de iniciar este tutorial, crie um cluster do serviço gerenciado para Apache Kafka. Se você já tiver um cluster, pule esta etapa.
Como criar um cluster
Console
- Acesse a página Serviço Gerenciado para Apache Kafka > Clusters.
- Clique em Criar.
- Na caixa Nome do cluster, insira um nome para o cluster.
- Na lista Região, selecione um local para o cluster.
-
Em Configuração de rede, configure a sub-rede em que o cluster está acessível:
- Em Projeto, selecione o projeto.
- Em Rede, selecione a rede VPC.
- Em Sub-rede, selecione a sub-rede.
- Clique em Concluído.
- Clique em Criar.
Depois de clicar em Criar, o estado do cluster será Creating. Quando o cluster estiver pronto, o estado será Active.
gcloud
Para criar um cluster do Kafka, execute o comando
managed-kafka clusters
create.
gcloud managed-kafka clusters create KAFKA_CLUSTER \ --location=REGION \ --cpu=3 \ --memory=3GiB \ --subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --async
Substitua:
KAFKA_CLUSTER: um nome para o cluster do KafkaREGION: o local do clusterPROJECT_ID: ID do projeto;SUBNET_NAME: a sub-rede em que você quer criar o cluster, por exemplo,default
Para informações sobre os locais aceitos, consulte Locais do serviço gerenciado para Apache Kafka.
O comando é executado de forma assíncrona e retorna um ID de operação:
Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.
Para acompanhar o progresso da operação de criação, use o comando
gcloud managed-kafka
operations describe:
gcloud managed-kafka operations describe OPERATION_ID \ --location=REGION
Quando o cluster estiver pronto, a saída desse comando vai incluir a entrada state:
ACTIVE. Para mais informações, consulte
Monitorar a
operação de criação de cluster.
Funções exigidas
Para ter as permissões necessárias para criar e configurar uma VM cliente, peça ao administrador para conceder a você os seguintes papéis do IAM no projeto:
-
Administrador da instância do Compute (v1) (
roles/compute.instanceAdmin.v1) -
Administrador de projetos do IAM (
roles/resourcemanager.projectIamAdmin) -
Leitor de papel (
roles/iam.roleViewer) -
Usuário da conta de serviço (
roles/iam.serviceAccountUser)
Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.
Também é possível conseguir as permissões necessárias usando papéis personalizados ou outros papéis predefinidos.
Configurar uma VM cliente
Crie uma instância de máquina virtual (VM) do Linux no Compute Engine que possa acessar o cluster do Kafka. Ao configurar a VM, defina as seguintes opções:
Region. Crie a VM na mesma região do cluster do Kafka.
Sub-rede. Crie a VM na mesma rede VPC da sub-rede usada na configuração do cluster do Kafka. Para mais informações, consulte Ver as sub-redes de um cluster.
Escopos de acesso. Atribua o escopo de acesso
https://www.googleapis.com/auth/cloud-platformà VM. Esse escopo autoriza a VM a enviar solicitações para a API Managed Kafka.
As etapas a seguir mostram como definir essas opções.
Console
No console do Google Cloud , acesse a página Criar uma instância.
No painel Configuração da máquina, faça o seguinte:
No campo Nome, especifique um nome para a instância. Para mais informações, consulte Convenção de nomenclatura de recursos.
Na lista Região, selecione a mesma região do cluster do Kafka.
Na lista Zona, selecione uma zona.
No menu de navegação, clique em Rede. No painel Rede que aparece, faça o seguinte:
Acesse a seção Interfaces de rede.
Para expandir a interface de rede padrão, clique na seta .
No campo Rede, escolha a rede VPC.
Na lista Sub-rede, selecione a sub-rede.
Clique em Concluído.
No menu de navegação, clique em Segurança. No painel Segurança que aparece, faça o seguinte:
Para Escopos de acesso, selecione Definir acesso para cada API.
Na lista de escopos de acesso, encontre a lista suspensa Cloud Platform e selecione Ativado.
Clique em Criar para criar a VM.
gcloud
Para criar a instância de VM, use o comando
gcloud compute instances create.
gcloud compute instances create VM_NAME \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
--zone=ZONE
Substitua:
- VM_NAME: o nome da VM
- PROJECT_ID: ID do projeto;
- REGION: a região em que você criou o cluster do Kafka, por exemplo,
us-central1. - SUBNET: uma sub-rede na mesma rede VPC que a usada na configuração do cluster
- ZONE: uma zona na região em que você criou o
cluster, por exemplo,
us-central1-c
Para mais informações sobre como criar uma VM, consulte Criar uma instância de VM em uma sub-rede específica.
Conceder papéis do IAM
Conceda os seguintes papéis do Identity and Access Management (IAM) à conta de serviço padrão do Compute Engine:
- Cliente Kafka gerenciado (
roles/managedkafka.client) - Criador do token da conta de serviço (
roles/iam.serviceAccountTokenCreator) Criador do token do OpenID da conta de serviço (
roles/iam.serviceAccountOpenIdTokenCreator)
Console
No console do Google Cloud , acesse a página IAM.
Encontre a linha da conta de serviço padrão do Compute Engine e clique em Editar principal.
Clique em Adicionar outro papel e selecione Cliente gerenciado do Kafka. Repita essa etapa para os papéis Criador de token da conta de serviço e Criador de token do OpenID da conta de serviço.
Clique em Salvar.
gcloud
Para conceder papéis do IAM, use o comando
gcloud projects add-iam-policy-binding.
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
gcloud projects add-iam-policy-binding PROJECT_ID\
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/iam.serviceAccountTokenCreator
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/iam.serviceAccountOpenIdTokenCreator
Substitua:
PROJECT_ID: ID do projeto;
PROJECT_NUMBER: o ID do seu projeto
Para saber o número do projeto, execute o comando
gcloud projects describe:
gcloud projects describe PROJECT_ID
Para mais informações, consulte Encontrar o nome, o número e o ID do projeto.
Conectar-se à VM
Use SSH para se conectar à instância de VM.
Console
Acesse a página Instâncias de VM.
Na lista de instâncias de VM, encontre o nome da VM e clique em SSH.
gcloud
Para se conectar à VM, use o
comando gcloud compute ssh.
gcloud compute ssh VM_NAME \
--project=PROJECT_ID \
--zone=ZONE
Substitua:
- VM_NAME: o nome da VM
- PROJECT_ID: ID do projeto;
- ZONE: a zona em que você criou a VM
Talvez seja necessário fazer mais configurações para usar o SSH pela primeira vez. Para mais informações, consulte Sobre conexões SSH.
Criar um aplicativo produtor em Python
Na sessão SSH, execute os comandos a seguir para criar um aplicativo produtor.
Instale o pip, um gerenciador de pacotes do Python e o gerenciador de ambiente virtual:
sudo apt install python3-pip -y sudo apt install python3-venv -yCrie e ative um novo ambiente virtual (venv):
python3 -m venv kafka source kafka/bin/activateInstale o cliente
confluent-kafkae outras dependências:pip install confluent-kafka google-auth urllib3 packagingCopie o código do cliente produtor a seguir em um arquivo chamado
producer.pyimport confluent_kafka import argparse from tokenprovider import TokenProvider parser = argparse.ArgumentParser() parser.add_argument('-b', '--bootstrap-servers', dest='bootstrap', type=str, required=True) parser.add_argument('-t', '--topic-name', dest='topic_name', type=str, default='example-topic', required=False) parser.add_argument('-n', '--num_messages', dest='num_messages', type=int, default=1, required=False) args = parser.parse_args() token_provider = TokenProvider() config = { 'bootstrap.servers': args.bootstrap, 'security.protocol': 'SASL_SSL', 'sasl.mechanisms': 'OAUTHBEARER', 'oauth_cb': token_provider.get_token, } producer = confluent_kafka.Producer(config) def callback(error, message): if error is not None: print(error) return print("Delivered a message to {}[{}]".format(message.topic(), message.partition())) for i in range(args.num_messages): message = f"{i} hello world!".encode('utf-8') producer.produce(args.topic_name, message, callback=callback) producer.flush()Agora você precisa de uma implementação do provedor de token OAuth. Salve o código a seguir em um arquivo chamado
tokenprovider.py:import base64 import datetime import http.server import json import google.auth from google.auth.transport.urllib3 import Request import urllib3 import time def encode(source): """Safe base64 encoding.""" return base64.urlsafe_b64encode(source.encode('utf-8')).decode('utf-8').rstrip('=') class TokenProvider(object): """ Provides OAuth tokens from Google Cloud Application Default credentials. """ HEADER = json.dumps({'typ':'JWT', 'alg':'GOOG_OAUTH2_TOKEN'}) def __init__(self, **config): self.credentials, _project = google.auth.default() self.http_client = urllib3.PoolManager() def get_credentials(self): if not self.credentials.valid: self.credentials.refresh(Request(self.http_client)) return self.credentials def get_jwt(self, creds): token_data = dict( exp=creds.expiry.replace(tzinfo=datetime.timezone.utc).timestamp(), iat=datetime.datetime.now(datetime.timezone.utc).timestamp(), iss='Google', sub=creds.service_account_email ) return json.dumps(token_data) def get_token(self, args): creds = self.get_credentials() token = '.'.join([ encode(self.HEADER), encode(self.get_jwt(creds)), encode(creds.token) ]) # compute expiry time expiry_utc = creds.expiry.replace(tzinfo=datetime.timezone.utc) now_utc = datetime.datetime.now(datetime.timezone.utc) expiry_seconds = (expiry_utc - now_utc).total_seconds() return token, time.time() + expiry_secondsAgora você já pode executar o aplicativo:
python producer.py -b bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092
Limpar
Para evitar cobranças na conta do Google Cloud pelos recursos usados nesta página, siga as etapas abaixo.
Console
Exclua a instância de VM.
Acesse a página Instâncias de VM.
Selecione a VM e clique em Excluir.
Exclua o cluster do Kafka.
Acesse a página Serviço gerenciado para Apache Kafka > Clusters.
Selecione o cluster do Kafka e clique em Excluir.
gcloud
Para excluir a VM, use o comando
gcloud compute instances delete.gcloud compute instances delete VM_NAME --zone=ZONEPara excluir o cluster do Kafka, use o comando
gcloud managed-kafka clusters delete.gcloud managed-kafka clusters delete CLUSTER_ID \ --location=REGION --async
A seguir
Autenticar clientes do Kafka com o Serviço gerenciado para Apache Kafka.
Ferramentas e documentação de autenticação do Serviço gerenciado para Apache Kafka