Este guia descreve como criar e implantar um pipeline de orquestração na extensão do Google Cloud Data Agent Kit para Visual Studio Code.
O pipeline de exemplo executa um script do PySpark no Serviço Gerenciado para Apache Spark.
É possível implantar pipelines de orquestração no VS Code como versões locais ou por uma ação do GitHub, como ao mesclar mudanças na ramificação main. Este documento demonstra como implantar a versão local de um pipeline de orquestração.
Antes de começar
Antes de começar, faça o seguinte:
- Instale a extensão do Data Agent Kit para VS Code.
- Defina as configurações.
- Adicione um repositório do GitHub ao seu espaço de trabalho do VS Code para armazenar pipelines de orquestração e recursos, como scripts.
Analise os papéis necessários do IAM
Para obter as permissões para criar recursos no seu projeto, implantar e executar pipelines de orquestração, peça ao administrador para conceder os papéis necessários.
Para criar e gerenciar ambientes do Serviço Gerenciado para Apache Airflow e gerenciar objetos nos buckets associados, você precisa dos seguintes papéis. Para mais informações sobre essas funções de usuário, consulte Conceder papéis aos usuários na documentação do Serviço Gerenciado para Apache Airflow.
- Administrador de objetos do ambiente e do Storage (composer.environmentAndStorageObjectAdmin)
- Usuário da conta de serviço (
iam.serviceAccountUser)
Para trabalhar com recursos do BigQuery e do Cloud Storage, você precisa dos seguintes papéis.
- Editor de dados do BigQuery (
roles/bigquery.dataEditor) - Administrador do objeto do Storage (
roles/storage.objectAdmin)
Dependendo dos recursos que você planeja acessar, talvez sejam necessários outros papéis além daqueles que permitem usar a extensão e trabalhar com pipelines de orquestração.
Criar uma conta de serviço e conceder papéis do IAM
Use uma conta de serviço exclusiva para o ambiente do Airflow Gerenciado de terceira geração. A conta de serviço cria um ambiente do Airflow Gerenciado de terceira geração e executa todos os pipelines de orquestração implantados.
Peça para o administrador concluir as etapas a seguir:
- Crie uma conta de serviço conforme descrito na documentação do IAM.
- Conceda o papel Worker do Composer (
composer.worker) à conta de serviço. Esse papel fornece as permissões necessárias na maioria dos casos.
Como prática recomendada, se você precisar acessar outros recursos no seu Google Cloud projeto, conceda permissões adicionais a essa conta de serviço somente quando necessário para a operação do pipeline de orquestração.
Criar Google Cloud recursos para o pipeline de orquestração
Nesta etapa, crie Google Cloud recursos para o pipeline de orquestração.
Criar um ambiente do Airflow Gerenciado de terceira geração
Crie um ambiente do Airflow Gerenciado de terceira geração com a seguinte configuração:
- Nome do ambiente: insira um nome que será usado mais tarde para configurar
o pipeline de orquestração. Por exemplo,
example-pipeline-scheduler. - Local: selecione um local. Recomendamos criar todos os recursos neste guia no mesmo local. Por exemplo,
us-central1. - Conta de serviço: selecione a conta de serviço criada para esse ambiente.
O exemplo de comando da Google Cloud CLI a seguir demonstra a sintaxe:
gcloud composer environments create example-pipeline-scheduler \
--location us-central1 \
--image-version composer-3-airflow-2 \
--service-account "example-account@example-project.iam.gserviceaccount.com"
Adicionar parâmetros de ambiente à configuração do programador
Forneça detalhes de conexão para o ambiente do Airflow Gerenciado que vai executar o pipeline de orquestração.
Adicione os parâmetros de configuração do ambiente criado usando o editor de configurações do Google Cloud Data Agent Kit:
- Clique no ícone Google Cloud Data Agent Kit na barra de atividades.
- Expanda Configurações e clique em Configurações.
- Selecione Programador.
- Insira os parâmetros do ambiente do Airflow Gerenciado de terceira geração criado anteriormente:
- ID do projeto: nome do projeto em que o ambiente está localizado.
Exemplo:
example-project. - Região: região em que o ambiente está localizado. Exemplo:
us-central1. - Ambiente: nome do ambiente. Exemplo:
example-pipeline-scheduler.
- ID do projeto: nome do projeto em que o ambiente está localizado.
Exemplo:
- Clique em Salvar.
Criar um bucket para artefatos de pipeline
Crie um bucket do Cloud Storage no mesmo projeto que o
ambiente do Airflow Gerenciado e dê a ele um nome semelhante a
example-pipelines-bucket. Esse bucket é necessário para armazenar o job do Serviço Gerenciado para Apache Spark.
Algumas ações de pipeline, como a saída dos resultados para um bucket do Cloud Storage.
Criar um conjunto de dados e uma tabela no BigQuery
Este guia demonstra um pipeline que grava dados em uma tabela do BigQuery. Crie os seguintes recursos do BigQuery no seu projeto:
- Crie um novo conjunto de dados chamado
wordcount_dataset. - Crie uma nova tabela do BigQuery chamada
wordcount_output.
Adicionar recursos de pipeline
Este guia demonstra uma tarefa comum de engenharia de dados (ETL: extrair, transformar, carregar) usando o PySpark, lendo do BigQuery, transformando os dados (contagem de palavras) e carregando-os de volta no BigQuery.
Não agênticos
Adicione o arquivo a seguir à pasta /scripts do seu repositório. Mais tarde, você adicionará uma ação de pipeline que executa esse script no Serviço Gerenciado para Apache Spark.
Exemplo de arquivo wordcount.py:
#!/usr/bin/python
"""BigQuery I/O PySpark example for Word Count"""
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName('spark-bigquery-demo') \
.getOrCreate()
# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the connector.
bucket = ARTIFACTS_BUCKET_NAME
spark.conf.set('temporaryGcsBucket', bucket)
# Load data from BigQuery public dataset (Shakespeare).
words = spark.read.format('bigquery') \
.option('table', 'bigquery-public-data:samples.shakespeare') \
.load()
words.createOrReplaceTempView('words')
# Perform word count using Spark SQL.
# This query counts occurrences of each word.
word_count = spark.sql(
'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word ORDER BY word_count DESC'
)
word_count.show()
word_count.printSchema()
# Saving the results to a new table in BigQuery.
# Replace YOUR_PROJECT_ID with your project ID.
destination_table = 'PROJECT_ID:wordcount_dataset.wordcount_output'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.mode('overwrite') \
.save()
print(f"Successfully wrote word counts to BigQuery table: {destination_table}")
Substitua:
- ARTIFACTS_BUCKET_NAME: o nome do bucket do Cloud Storage
criado anteriormente. Exemplo:
example-pipelines-bucket. - PROJECT_ID: o nome do projeto em que o ambiente
reside. Exemplo:
example-project.
Agênticos
Peça ao agente para gerar um script de exemplo do PySpark na pasta /scripts do seu repositório. Mais tarde, você adicionará uma ação de pipeline que executa esse script no Serviço Gerenciado para Apache Spark.
Insira um comando semelhante ao seguinte:
I want to create a PySpark script that does the following:
1. Loads data from the bigquery-public-data:samples.shakespeare.
2. Counts occurrences of each word across all works using a Spark SQL query.
Sum the existing word counts for each word to get the total occurrences.
I want the results to be ordered by the word popularity, most popular first.
3. Saves results to a new table in BigQuery, in my project.
My project is sample-project, the destination table is
wordcount_dataset.wordcount_output, and I want to store temporary BigQuery
export data in example-pipelines-bucket.
Save the resulting script to /scripts as wordcount.py
Inicializar pipelines de orquestração no repositório
Ao inicializar pipelines de orquestração, a extensão do Data Agent Kit para VS Code cria um scaffolding que inclui o seguinte:
- Um arquivo YAML de pipeline de orquestração: um exemplo de definição de pipeline que contém uma programação, mas nenhuma ação definida.
deployment.yaml: um exemplo de configuração de implantação de pipeline que define como o pipeline precisa ser implantado. Esse arquivo demonstra a configuração necessária para o ambiente do Airflow Gerenciado, o bucket de artefatos e todos os outros recursos usados pelas ações do pipeline..github/workflows/deploy.yaml: configura uma ação do GitHub que implanta o pipeline quando você mescla mudanças na ramificaçãomaindo seu repositório do GitHub..github/workflows/validate.yaml: configura uma ação do GitHub que valida o pipeline após a implantação.
Nas etapas posteriores deste documento, você vai expandir essas definições usando a extensão do Data Agent Kit para VS Code para criar e implantar um pipeline de orquestração localmente.
Não agênticos
Para inicializar pipelines de orquestração, faça o seguinte:
- Clique no ícone Google Cloud Data Agent Kit na barra de atividades.
- Expanda Engenharia de dados e clique em Inicializar orquestração pipeline.
- Insira os parâmetros do novo pipeline de orquestração:
- ID do pipeline: insira o ID do pipeline. Exemplo:
example-pipeline. - ID do projeto na nuvem do Google Cloud: o nome do projeto em que o ambiente reside. Exemplo:
example-project. - Região: a região em que o ambiente reside. Exemplo:
us-central1. - ID do ambiente: o nome do ambiente em que você quer desenvolver.
Exemplo:
dev/staging. Ambiente do Serviço Gerenciado para Apache Airflow do programador: o nome do ambiente em que você quer orquestrar seus pipelines. Para este documento, especifique o mesmo ambiente nesse parâmetro.
Bucket de artefatos: o nome do bucket usado para artefatos de pipeline, sem o prefixo
gs://. Exemplo:example-pipelines-bucket.Clique em Próxima.
Clique em Inicializar.
Especifique um espaço de trabalho em que você quer que o pipeline seja inicializado.
Agênticos
Peça ao agente para criar um scaffolding para pipelines de orquestração do seu repositório.
Insira um comando semelhante ao seguinte:
Initialize orchestration pipelines in my repository. Don't add any actions
or schedule yet. I want to do it later.
The pipeline is my-sample-pipeline, the project ID is my-project, and the
region is us-central1.
The environment ID is my-test-environment. Use the same environment ID for
the Scheduler Managed Service.
Store pipeline artifacts in example-pipelines-bucket.
Depois de inicializar pipelines no repositório, não é possível fazer isso novamente, porque o novo scaffolding substituiria todas as mudanças de configuração feitas. É possível adicionar novos pipelines criando novos arquivos de definição de pipeline no projeto e adicionando-os à configuração de implantação.
Adicionar uma nova tarefa ao pipeline
Como a configuração inicial do pipeline não tem ações, adicione uma ação que executa o script do PySpark.
Não agênticos
Para editar um pipeline, faça o seguinte:
- Clique no ícone Google Cloud Data Agent Kit na barra de atividades.
- Expanda Engenharia de dados e Orquestração de Pipelines.
- Selecione
example-pipeline.yaml. Um editor de pipeline será aberto para o pipeline selecionado. - Opcional: selecione o nó Acionador de programação. É possível ajustar a programação do pipeline especificando uma expressão semelhante a cron e horários de início e término da programação. A programação padrão para o pipeline recém-inicializado é
0 2 * * *, que é executada às 2h diariamente.
Adicione uma nova tarefa. Neste guia, você adiciona uma tarefa do PySpark que executa um script do PySpark adicionado anteriormente:
- Clique em Adicionar primeira tarefa para incluir um novo nó de tarefa.
- Selecione Executar script do PySpark e o arquivo
script/wordcount.py.
O painel Executar script do PySpark será aberto.
- No modo de cluster do Spark, selecione Spark sem servidor.
- Em Local, especifique o local em que o ambiente reside.
Exemplo:
us-central1. - Clique em Salvar.
Agênticos
Execute o seguinte comando:
Add the wordcount.py script to the pipeline. I want to run it in Serverless
Spark every day at 1 AM. Run it in the same region where the environment that
runs my pipeline is located. Use the minimal resource profile.
Implantar a versão local do pipeline
Implante a versão local do pipeline para confirmar se ela está configurada corretamente.
Ao implantar uma versão local do pipeline de orquestração, a extensão do Data Agent Kit para VS Code faz o upload de uma versão local do pacote de pipeline para o ambiente do Airflow Gerenciado e a executa. A implantação local é destinada ao uso ao trabalhar em um ambiente de desenvolvimento.
O comando de implantação implanta uma programação não pausada. Para evitar isso, pause a programação manualmente no painel de gerenciamento de pipelines. Também é possível editar o arquivo YAML do pipeline para comentar ou remover o bloco triggers: - schedule.
Não agênticos
Para implantar uma versão local do pipeline de orquestração de exemplo, faça o seguinte:
- Clique no ícone Google Cloud Data Agent Kit na barra de atividades.
- Expanda Engenharia de dados e Orquestração de Pipelines.
- Selecione
example-pipeline.yaml. Um editor de pipeline será aberto para o pipeline selecionado. - Selecione Executar pipeline e selecione o ambiente de desenvolvimento ou preparo criado anteriormente.
Agênticos
Execute o seguinte comando:
Deploy my pipeline
Monitorar a execução do pipeline e verificar os registros de execução
Depois que o pipeline for implantado, você poderá conferir as informações detalhadas, o histórico de execuções de pipeline e os registros de execução do pipeline:
- Clique no ícone Google Cloud Data Agent Kit na barra de atividades.
- Expanda Engenharia de dados e selecione Gerenciamento de pipelines.
- Clique no nome do pipeline (
example-pipeline) para conferir o histórico de execução. Na lista de execuções para uma data específica, é possível conferir execuções de pipeline individuais e o detalhamento de ações individuais em cada execução de pipeline. - Clique em um ID de tarefa para conferir os registros de execução da tarefa. Como o script de exemplo do PySpark foi executado no Serviço Gerenciado para Apache Spark, os registros de tarefas terão um link para os registros em lote.
Solucionar problemas e corrigir falhas de pipeline
Quando o pipeline falha, um botão Diagnosticar aparece no painel Gerenciamento de pipelines.
Agênticos
Quando você clica no botão Diagnosticar, o agente gera um comando para solucionar a falha do pipeline. O comando é copiado para a área de transferência ou aberto em uma nova sessão de chat.
O agente usa habilidades especializadas para solucionar problemas de pipelines, com foco na coleta de registros, na verificação cruzada do código implantado e do espaço de trabalho e na geração de uma análise de causa raiz (RCA).
As possíveis próximas etapas após receber a RCA são as seguintes:
- Aplique a análise de causa raiz no espaço de trabalho atual.
- Peça ao agente para criar uma nova ramificação e aplicar as mudanças nela.
- Abra um tíquete do Cloud Customer Care com os detalhes da RCA.
Para receber ajuda na solução de problemas com a extensão, consulte Solução de problemas.