Este guia descreve como criar e implantar um pipeline de orquestração na extensão do Google Cloud Data Agent Kit para Visual Studio Code.
O exemplo de pipeline executa um script do PySpark no Serviço Gerenciado para Apache Spark.
É possível implantar pipelines de orquestração do VS Code como
versões locais ou por uma ação do GitHub, como ao mesclar mudanças
na ramificação main. Neste documento, mostramos como implantar a versão local de um pipeline de orquestração.
Antes de começar
Antes de começar, faça o seguinte:
- Instale a extensão do Data Agent Kit para VS Code.
- Defina suas configurações.
- Adicione um repositório do GitHub ao seu espaço de trabalho do VS Code para armazenar pipelines de orquestração e recursos, como scripts.
Revisar os papéis necessários do IAM
Para obter as permissões para criar recursos no seu projeto, implantar e executar pipelines de orquestração, peça ao seu administrador para conceder a você os papéis necessários.
Para criar e gerenciar ambientes do Serviço Gerenciado para Apache Airflow e gerenciar objetos nos buckets associados, você precisa dos seguintes papéis. Para mais informações sobre essas funções de usuário, consulte Conceder papéis aos usuários na documentação do Serviço Gerenciado para Apache Airflow.
- Administrador de objetos do ambiente e do Storage (composer.environmentAndStorageObjectAdmin)
- Usuário da conta de serviço
(
iam.serviceAccountUser)
Para trabalhar com recursos do BigQuery e do Cloud Storage, você precisa das seguintes funções.
- Editor de dados do BigQuery (
roles/bigquery.dataEditor) - Administrador do objeto do Storage (
roles/storage.objectAdmin)
Dependendo dos recursos que você planeja acessar, talvez sejam necessários outros papéis além daqueles que permitem usar a extensão e trabalhar com pipelines de orquestração.
Criar uma conta de serviço e conceder papéis do IAM
Use uma conta de serviço exclusiva para o ambiente do Airflow gerenciado de terceira geração. A conta de serviço cria um ambiente do Airflow gerenciado de terceira geração e executa todos os pipelines de orquestração implantados.
Peça para o administrador seguir estas etapas:
- Crie uma conta de serviço conforme descrito na documentação do IAM.
- Conceda o papel Worker do Composer (
composer.worker) à conta de serviço. Esse papel fornece as permissões necessárias na maioria dos casos.
Como prática recomendada, se você precisar acessar outros recursos no seuGoogle Cloud projeto, conceda permissões adicionais a essa conta de serviço somente quando necessário para a operação do pipeline de orquestração.
Criar recursos Google Cloud para o pipeline de orquestração
Nesta etapa, crie recursos Google Cloud para seu pipeline de orquestração.
Criar um ambiente do Airflow gerenciado de terceira geração
Crie um ambiente do Airflow gerenciado de terceira geração com a seguinte configuração:
- Nome do ambiente: insira um nome que você vai usar mais tarde para configurar
o pipeline de orquestração. Por exemplo,
example-pipeline-scheduler. - Local: selecione um local. Recomendamos criar todos os recursos deste guia no mesmo local. Por exemplo,
us-central1. - Conta de serviço: selecione a conta de serviço que você criou para este ambiente.
O exemplo de comando da Google Cloud CLI a seguir demonstra a sintaxe:
gcloud composer environments create example-pipeline-scheduler \
--location us-central1 \
--image-version composer-3-airflow-2 \
--service-account "example-account@example-project.iam.gserviceaccount.com"
Adicionar parâmetros de ambiente à configuração do programador
Forneça detalhes da conexão para o ambiente gerenciado do Airflow que vai executar seu pipeline de orquestração.
Adicione os parâmetros de configuração do ambiente criado usando o editor de configurações do Kit de agente de dados do Google Cloud:
- Clique no ícone Kit do agente de dados do Google Cloud na barra de atividades.
- Abra Configurações e clique em Configurações.
- Selecione Programador.
- Insira os parâmetros do ambiente do Airflow gerenciado de terceira geração que você criou antes:
- ID do projeto: nome do projeto em que o ambiente está localizado.
Exemplo:
example-project. - Região: região em que o ambiente está localizado. Por exemplo:
us-central1. - Ambiente: nome do ambiente. Por exemplo:
example-pipeline-scheduler.
- ID do projeto: nome do projeto em que o ambiente está localizado.
Exemplo:
- Clique em Salvar.
Criar um bucket para artefatos de pipeline
Crie um bucket do Cloud Storage no mesmo projeto do ambiente do Airflow Gerenciado e dê a ele um nome semelhante a example-pipelines-bucket. Esse bucket é necessário para armazenar seu
job do Serviço Gerenciado para Apache Spark.
Algumas ações de pipeline, como a saída dos resultados para um bucket do Cloud Storage.
Criar um conjunto de dados e uma tabela no BigQuery
Este guia demonstra um pipeline que grava dados em uma tabela do BigQuery. Crie os seguintes recursos do BigQuery no seu projeto:
- Crie um conjunto de dados chamado
wordcount_dataset. - Crie uma tabela do BigQuery chamada
wordcount_output.
Adicionar recursos de pipeline
Este guia demonstra uma tarefa comum de engenharia de dados (ETL: extração, transformação e carregamento) usando o PySpark, lendo do BigQuery, transformando os dados (contagem de palavras) e carregando-os de volta no BigQuery.
Não agêntica
Adicione o seguinte arquivo à pasta /scripts do seu repositório. Depois, você adiciona uma ação de pipeline que executa esse script no Serviço Gerenciado para Apache Spark.
Exemplo de arquivo wordcount.py:
#!/usr/bin/python
"""BigQuery I/O PySpark example for Word Count"""
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName('spark-bigquery-demo') \
.getOrCreate()
# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the connector.
bucket = ARTIFACTS_BUCKET_NAME
spark.conf.set('temporaryGcsBucket', bucket)
# Load data from BigQuery public dataset (Shakespeare).
words = spark.read.format('bigquery') \
.option('table', 'bigquery-public-data:samples.shakespeare') \
.load()
words.createOrReplaceTempView('words')
# Perform word count using Spark SQL.
# This query counts occurrences of each word.
word_count = spark.sql(
'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word ORDER BY word_count DESC'
)
word_count.show()
word_count.printSchema()
# Saving the results to a new table in BigQuery.
# Replace YOUR_PROJECT_ID with your project ID.
destination_table = 'PROJECT_ID:wordcount_dataset.wordcount_output'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.mode('overwrite') \
.save()
print(f"Successfully wrote word counts to BigQuery table: {destination_table}")
Substitua:
- ARTIFACTS_BUCKET_NAME: o nome do bucket do Cloud Storage
criado anteriormente. Exemplo:
example-pipelines-bucket. - PROJECT_ID: o nome do projeto em que o ambiente
está localizado. Exemplo:
example-project.
Agênticos
Peça ao agente para gerar um exemplo de script do PySpark na pasta /scripts do seu repositório. Depois, você adiciona uma ação de pipeline que executa esse script no Serviço Gerenciado para Apache Spark.
Insira um comando semelhante a este:
I want to create a PySpark script that does the following:
1. Loads data from the bigquery-public-data:samples.shakespeare.
2. Counts occurrences of each word across all works using a Spark SQL query.
Sum the existing word counts for each word to get the total occurrences.
I want the results to be ordered by the word popularity, most popular first.
3. Saves results to a new table in BigQuery, in my project.
My project is sample-project, the destination table is
wordcount_dataset.wordcount_output, and I want to store temporary BigQuery
export data in example-pipelines-bucket.
Save the resulting script to /scripts as wordcount.py
Inicializar pipelines de orquestração no repositório
Ao inicializar pipelines de orquestração, a extensão Data Agent Kit para VS Code cria um scaffolding que inclui o seguinte:
- Um arquivo YAML de pipeline de orquestração: um exemplo de definição de pipeline que contém um cronograma, mas nenhuma ação definida.
deployment.yaml: um exemplo de configuração de implantação de pipeline que define como o pipeline precisa ser implantado. Esse arquivo demonstra a configuração necessária para o ambiente do Airflow Gerenciado, o bucket de artefatos e outros recursos usados pelas ações do pipeline..github/workflows/deploy.yaml: configura uma ação do GitHub que implanta seu pipeline quando você mescla as mudanças na ramificaçãomaindo seu repositório do GitHub..github/workflows/validate.yaml: configura uma ação do GitHub que valida seu pipeline depois que ele é implantado.
Nas etapas posteriores deste documento, você vai expandir essas definições usando a extensão Data Agent Kit para VS Code e criar e implantar um pipeline de orquestração localmente.
Não agêntica
Para inicializar pipelines de orquestração, faça o seguinte:
- Clique no ícone Kit do agente de dados do Google Cloud na barra de atividades.
- Expanda Engenharia de dados e clique em Inicializar pipeline de orquestração.
- Insira os parâmetros do novo pipeline de orquestração:
- ID do pipeline: insira o ID do pipeline. Exemplo:
example-pipeline. - ID do projeto na nuvem do Google: o nome do projeto em que o ambiente reside. Exemplo:
example-project. - Região: a região em que seu ambiente reside. Por exemplo:
us-central1. - ID do ambiente: o nome do ambiente em que você quer desenvolver.
Exemplo:
dev/staging. Serviço Gerenciado para Apache Airflow: o nome do ambiente em que você quer orquestrar seus pipelines. Para este documento, especifique o mesmo ambiente nesse parâmetro.
Bucket de artefatos: o nome do bucket usado para artefatos de pipeline, sem o prefixo
gs://. Exemplo:example-pipelines-bucket.Clique em Próxima.
Clique em Inicializar.
Especifique um espaço de trabalho em que você quer inicializar o pipeline.
Agênticos
Peça ao agente para criar um scaffolding para pipelines de orquestração do seu repositório.
Insira um comando semelhante a este:
Initialize orchestration pipelines in my repository. Don't add any actions
or schedule yet. I want to do it later.
The pipeline is my-sample-pipeline, the project ID is my-project, and the
region is us-central1.
The environment ID is my-test-environment. Use the same environment ID for
the Scheduler Managed Service.
Store pipeline artifacts in example-pipelines-bucket.
Depois de inicializar os pipelines no repositório, não é possível fazer isso de novo, porque o novo scaffolding substituiria as mudanças de configuração feitas. É possível adicionar novos pipelines criando arquivos de definição no projeto e adicionando-os à configuração de implantação.
Adicionar uma nova tarefa ao pipeline
Como a configuração inicial do pipeline não tem ações, adicione uma ação que execute o script PySpark.
Não agêntica
Para editar um pipeline, faça o seguinte:
- Clique no ícone Kit do agente de dados do Google Cloud na barra de atividades.
- Expanda Engenharia de dados e Orquestração de Pipelines.
- Selecione
example-pipeline.yaml. Um editor de pipeline é aberto para o pipeline selecionado. - Opcional: selecione o nó Gatilho de programação. É possível ajustar a programação do pipeline especificando uma expressão semelhante a cron e horários de início e término. A programação padrão do pipeline recém-inicializado é
0 2 * * *, que é executada diariamente às 2h.
Adicione uma tarefa. Neste guia, você adiciona uma tarefa do PySpark que executa um script do PySpark adicionado anteriormente:
- Clique em Adicionar primeira tarefa para incluir um novo nó de tarefa.
- Selecione Executar script PySpark e o arquivo
script/wordcount.py.
O painel Executar script PySpark é aberto.
- No modo de cluster do Spark, selecione Spark sem servidor.
- Em Local, especifique o local em que seu ambiente está.
Exemplo:
us-central1. - Clique em Salvar.
Agênticos
Execute o seguinte comando:
Add the wordcount.py script to the pipeline. I want to run it in Serverless
Spark every day at 1 AM. Run it in the same region where the environment that
runs my pipeline is located. Use the minimal resource profile.
Implante a versão local do pipeline
Implante a versão local do pipeline para confirmar se ela está configurada corretamente.
Ao implantar uma versão local do pipeline de orquestração, a extensão do Data Agent Kit para VS Code faz upload de uma versão local do pacote de pipeline para o ambiente do Airflow Gerenciado e a executa. A implantação local é destinada ao uso em um ambiente de desenvolvimento.
O comando de implantação implanta uma programação que não está pausada. Para evitar isso, pause
a programação manualmente no painel "Gerenciamento de pipelines". Você também pode editar o arquivo YAML do pipeline para comentar ou remover o bloco triggers: - schedule.
Não agêntica
Para implantar uma versão local do pipeline de orquestração de exemplo, faça o seguinte:
- Clique no ícone Kit do agente de dados do Google Cloud na barra de atividades.
- Expanda Engenharia de dados e depois Orquestração de Pipelines.
- Selecione
example-pipeline.yaml. Um editor de pipeline é aberto para o pipeline selecionado. - Selecione Executar pipeline e escolha o ambiente de desenvolvimento ou de teste que você criou anteriormente.
Agênticos
Execute o seguinte comando:
Deploy my pipeline
Monitorar a execução do pipeline e verificar os registros de execução
Depois que o pipeline for implantado, você poderá conferir as informações detalhadas, o histórico de execuções e os registros de execução dele:
- Clique no ícone Kit do agente de dados do Google Cloud na barra de atividades.
- Expanda Engenharia de dados e selecione Gerenciamento de pipelines.
- Clique no nome do pipeline (
example-pipeline) para conferir o histórico de execução dele. Na lista de execuções de uma data específica, é possível conferir execuções de pipeline individuais e o detalhamento das ações individuais em cada execução. - Clique em um ID de tarefa para conferir os registros de execução dela. Como o script de exemplo do PySpark foi executado no Serviço Gerenciado para Apache Spark, os registros de tarefas terão um link para os registros do Batch.
Resolver problemas e corrigir falhas de pipeline
Quando o pipeline falha, um botão Diagnosticar aparece no painel Gerenciamento de pipelines.
Agênticos
Quando você clica no botão Diagnosticar, o agente gera um comando para resolver o problema da falha do pipeline. O comando é copiado para a área de transferência ou aberto em uma nova sessão de chat.
O agente usa habilidades especializadas para resolver problemas com pipelines, focando em coletar registros, verificar o código implantado e o espaço de trabalho e gerar uma análise de causa raiz (RCA).
Possíveis próximas etapas após receber a RCA:
- Aplique a análise da causa raiz no espaço de trabalho atual.
- Peça ao agente para criar uma nova ramificação e aplicar as mudanças nela.
- Abra um tíquete do Cloud Customer Care com os detalhes da RCA.
Para receber ajuda na solução de problemas com a extensão, consulte Resolver problemas com a extensão do Data Agent Kit para VS Code.