Arquitetura
O diagrama seguinte mostra a arquitetura de alto nível de um pipeline de extração, carregamento e transformação (ELT) sem servidor através dos Workflows.
No diagrama anterior, considere uma plataforma de retalho que recolhe periodicamente eventos de vendas como ficheiros de várias lojas e, em seguida, escreve os ficheiros num contentor do Cloud Storage. Os eventos são usados para fornecer métricas empresariais através da importação e do processamento no BigQuery. Esta arquitetura oferece um sistema de orquestração fiável e sem servidor para importar os seus ficheiros para o BigQuery e está dividida nos dois módulos seguintes:
- Lista de ficheiros: mantém a lista de ficheiros não processados adicionados a um contentor do Cloud Storage numa coleção do Firestore.
Este módulo funciona através de uma função do Cloud Run acionada por um evento de armazenamento Object Finalize, que é gerado quando um novo ficheiro é adicionado ao contentor do Cloud Storage. O nome do ficheiro é anexado à matriz da coleção denominada
new
no Firestore.files
Fluxo de trabalho: executa os fluxos de trabalho agendados. O Cloud Scheduler aciona um fluxo de trabalho que executa uma série de passos de acordo com uma sintaxe baseada em YAML para orquestrar o carregamento e, em seguida, transformar os dados no BigQuery chamando funções do Cloud Run. Os passos no fluxo de trabalho chamam funções do Cloud Run para executar as seguintes tarefas:
- Crie e inicie uma tarefa de carregamento do BigQuery.
- Verifique o estado da tarefa de carregamento.
- Crie e inicie a tarefa de consulta de transformação.
- Verifique o estado da tarefa de transformação.
A utilização de transações para manter a lista de novos ficheiros no Firestore ajuda a garantir que nenhum ficheiro é ignorado quando um fluxo de trabalho os importa para o BigQuery. As execuções separadas do fluxo de trabalho são tornadas idempotentes através do armazenamento do estado e dos metadados das tarefas no Firestore.
Prepare o seu ambiente
Para preparar o seu ambiente, crie uma base de dados do Firestore, clone os exemplos de código do repositório do GitHub, crie recursos com o Terraform, edite o ficheiro YAML dos fluxos de trabalho e instale os requisitos para o gerador de ficheiros.
Para criar uma base de dados do Firestore, faça o seguinte:
Na Google Cloud consola, aceda à página do Firestore.
Clique em Selecionar modo nativo.
No menu Selecionar uma localização, selecione a região onde quer alojar a base de dados do Firestore. Recomendamos que escolha uma região próxima da sua localização física.
Clique em Criar base de dados.
No Cloud Shell, clone o repositório de origem:
cd $HOME && git clone https://github.com/GoogleCloudPlatform/workflows-demos cd workflows-demos/workflows-bigquery-load
No Cloud Shell, crie os seguintes recursos com o Terraform:
terraform init terraform apply \ -var project_id=PROJECT_ID \ -var region=REGION \ -var zone=ZONE \ --auto-approve
Substitua o seguinte:
PROJECT_ID
: o ID do seu Google Cloud projetoREGION
: uma localização geográfica Google Cloud específica para alojar os seus recursos, por exemplo,us-central1
ZONE
: uma localização numa região para alojar os seus recursos, por exemplo,us-central1-b
Deverá ver uma mensagem semelhante à seguinte:
Apply complete! Resources: 7 added, 0 changed, 1 destroyed.
O Terraform pode ajudar a criar, alterar e atualizar a infraestrutura em grande escala de forma segura e previsível. Os seguintes recursos são criados no seu projeto:
- Contas de serviço com os privilégios necessários para garantir o acesso seguro aos seus recursos.
- Um conjunto de dados do BigQuery denominado
serverless_elt_dataset
e uma tabela denominadaword_count
para carregar os ficheiros recebidos. - Um contentor do Cloud Storage com o nome
${project_id}-ordersbucket
para preparar ficheiros de entrada. - As cinco funções do Cloud Run seguintes:
- O
file_add_handler
adiciona o nome dos ficheiros que são adicionados ao contentor do Cloud Storage à coleção do Firestore. create_job
cria uma nova tarefa de carregamento do BigQuery e associa ficheiros na coleção do Firebase à tarefa.create_query
cria uma nova tarefa de consulta do BigQuery.poll_bigquery_job
recebe o estado de uma tarefa do BigQuery.run_bigquery_job
inicia uma tarefa do BigQuery.
- O
Obtenha os URLs das funções do
create_job
,create_query
,poll_job
erun_bigquery_job
do Cloud Run que implementou no passo anterior.gcloud functions describe create_job | grep url gcloud functions describe poll_bigquery_job | grep url gcloud functions describe run_bigquery_job | grep url gcloud functions describe create_query | grep url
O resultado é semelhante ao seguinte:
url: https://REGION-PROJECT_ID.cloudfunctions.net/create_job url: https://REGION-PROJECT_ID.cloudfunctions.net/poll_bigquery_job url: https://REGION-PROJECT_ID.cloudfunctions.net/run_bigquery_job url: https://REGION-PROJECT_ID.cloudfunctions.net/create_query
Tome nota destes URLs, uma vez que são necessários quando implementar o fluxo de trabalho.
Crie e implemente um fluxo de trabalho
No Cloud Shell, abra o ficheiro de origem do fluxo de trabalho,
workflow.yaml
:Substitua o seguinte:
CREATE_JOB_URL
: o URL da função para criar uma nova tarefaPOLL_BIGQUERY_JOB_URL
: o URL da função para sondar o estado de uma tarefa em execuçãoRUN_BIGQUERY_JOB_URL
: o URL da função para iniciar uma tarefa de carregamento do BigQueryCREATE_QUERY_URL
: o URL da função para iniciar uma tarefa de consulta do BigQueryBQ_REGION
: a região do BigQuery onde os dados são armazenados, por exemplo,US
BQ_DATASET_TABLE_NAME
: o nome da tabela do conjunto de dados do BigQuery no formatoPROJECT_ID.serverless_elt_dataset.word_count
Implemente o ficheiro
workflow
:gcloud workflows deploy WORKFLOW_NAME \ --location=WORKFLOW_REGION \ --description='WORKFLOW_DESCRIPTION' \ --service-account=workflow-runner@PROJECT_ID.iam.gserviceaccount.com \ --source=workflow.yaml
Substitua o seguinte:
WORKFLOW_NAME
: o nome exclusivo do fluxo de trabalhoWORKFLOW_REGION
: a região na qual o fluxo de trabalho é implementado, por exemplo,us-central1
WORKFLOW_DESCRIPTION
: a descrição do fluxo de trabalho
Crie um ambiente virtual do Python 3 e instale os requisitos para o gerador de ficheiros:
sudo apt-get install -y python3-venv python3 -m venv env . env/bin/activate cd generator pip install -r requirements.txt
Gere ficheiros para importar
O script Python gera conteúdo aleatório no formato Avro.gen.py
O esquema é igual ao da tabela word_count
do BigQuery. Estes ficheiros Avro são copiados para o contentor do Cloud Storage especificado.
No Cloud Shell, gere os ficheiros:
python gen.py -p PROJECT_ID \ -o PROJECT_ID-ordersbucket \ -n RECORDS_PER_FILE \ -f NUM_FILES \ -x FILE_PREFIX
Substitua o seguinte:
RECORDS_PER_FILE
: o número de registos num único ficheiroNUM_FILES
: o número total de ficheiros a carregarFILE_PREFIX
: o prefixo dos nomes dos ficheiros gerados
Veja entradas de ficheiros no Firestore
Quando os ficheiros são copiados para o Cloud Storage, a função do Cloud Run é acionada.handle_new_file
Esta função adiciona a lista de ficheiros à matriz de listas de ficheiros no documento new
na coleção jobs
do Firestore.
Para ver a lista de ficheiros, na Google Cloud consola, aceda à página Dados do Firestore.
Acione o fluxo de trabalho
Os fluxos de trabalho associam uma série de tarefas sem servidor a partir de serviços de Google Cloud e API. Os passos individuais neste fluxo de trabalho são executados como funções do Cloud Run e o estado é armazenado no Firestore. Todas as chamadas para as funções do Cloud Run são autenticadas através da conta de serviço do fluxo de trabalho.
No Cloud Shell, execute o fluxo de trabalho:
gcloud workflows execute WORKFLOW_NAME
O diagrama seguinte mostra os passos usados no fluxo de trabalho:
O fluxo de trabalho está dividido em duas partes: o fluxo de trabalho principal e o fluxo de trabalho secundário. O fluxo de trabalho principal processa a criação de tarefas e a execução condicional, enquanto o fluxo de trabalho secundário executa uma tarefa do BigQuery. O fluxo de trabalho realiza as seguintes operações:
- A função do
create_job
Cloud Run cria um novo objeto de tarefa, obtém a lista de ficheiros adicionados ao Cloud Storage a partir do documento do Firestore e associa os ficheiros à tarefa de carregamento. Se não existirem ficheiros para carregar, a função não cria uma nova tarefa. - A função do
create_query
Cloud Run recebe a consulta que tem de ser executada, juntamente com a região do BigQuery na qual a consulta deve ser executada. A função cria a tarefa no Firestore e devolve o ID da tarefa. - A função do
run_bigquery_job
Cloud Run recebe o ID da tarefa que tem de ser executada e, em seguida, chama a API BigQuery para enviar a tarefa. - Em vez de aguardar a conclusão da tarefa na função do Cloud Run, pode sondar periodicamente o estado da tarefa.
- A função do Cloud Run fornece o estado da tarefa.
poll_bigquery_job
É chamado repetidamente até que a tarefa seja concluída. - Para adicionar um atraso entre as chamadas à função do Cloud Run, é chamada uma
sleep
rotina a partir dos fluxos de trabalho.poll_bigquery_job
- A função do Cloud Run fornece o estado da tarefa.
Veja o estado da tarefa
Pode ver a lista de ficheiros e o estado da tarefa.
NaGoogle Cloud consola, aceda à página Dados do Firestore.
É gerado um identificador exclusivo (UUID) para cada tarefa. Para ver o
job_type
e ostatus
, clique no ID da tarefa. Cada tarefa pode ter um dos seguintes tipos e estados:job_type
: o tipo de tarefa que está a ser executada pelo fluxo de trabalho com um dos seguintes valores:- 0: Carregue dados para o BigQuery.
- 1: execute uma consulta no BigQuery.
status
: o estado atual da tarefa com um dos seguintes valores:- 0: A tarefa foi criada, mas não iniciada.
- 1: O trabalho está em execução.
- 2: O trabalho concluiu a execução com êxito.
- 3: Ocorreu um erro e a tarefa não foi concluída com êxito.
O objeto de tarefa também contém atributos de metadados, como a região do conjunto de dados do BigQuery, o nome da tabela do BigQuery e, se for uma tarefa de consulta, a string de consulta que está a ser executada.
Veja os dados no BigQuery
Para confirmar que a tarefa ELT foi bem-sucedida, verifique se os dados aparecem na tabela.
Na Google Cloud consola, aceda à página do editor do BigQuery.
Clique na tabela
serverless_elt_dataset.word_count
.Clique no separador Pré-visualizar.
Agende o fluxo de trabalho
Para executar periodicamente o fluxo de trabalho numa programação, pode usar o Cloud Scheduler.