Airflow gerenciado (Geração 3) | Airflow gerenciado (Geração 2) | Airflow gerenciado (Geração 1 legada)
Neste tutorial, mostramos como usar o Airflow gerenciado para criar um DAG do Apache Airflow. O DAG une dados de um conjunto de dados público do BigQuery e um arquivo CSV armazenado em um bucket do Cloud Storage e executa um job em lote do Serviço gerenciado para Apache Spark para processar os dados unidos.
O conjunto de dados público do BigQuery neste tutorial é ghcn_d, um banco de dados integrado de resumos climáticos em todo o mundo. O arquivo CSV contém informações sobre as datas e os nomes dos feriados dos EUA de 1997 a 2021.
A pergunta que queremos responder usando o DAG é: "Qual foi a temperatura em Chicago no Dia de Ação de Graças nos últimos 25 anos?"
Objetivos
- Criar um ambiente do Airflow Gerenciado na configuração padrão
- Criar um conjunto de dados vazio do BigQuery
- Criar um novo bucket do Cloud Storage
- Crie e execute um DAG que inclua as seguintes tarefas:
- Carregar um conjunto de dados externo do Cloud Storage para o BigQuery
- Combinar dois conjuntos de dados no BigQuery
- Executar um job do PySpark de análise de dados
Antes de começar
Ativar APIs
Ative as APIs a seguir:
Console
Ative as APIs Serviço Gerenciado para Apache Spark, Airflow Gerenciado, BigQuery e Cloud Storage.
Funções necessárias para ativar APIs
Para ativar as APIs, é necessário ter o papel do IAM de administrador de uso do serviço (roles/serviceusage.serviceUsageAdmin), que contém a permissão serviceusage.services.enable. Saiba como conceder
papéis.
gcloud
Ative as APIs Serviço Gerenciado para Apache Spark, Airflow Gerenciado, BigQuery e Cloud Storage:
Funções necessárias para ativar APIs
Para ativar as APIs, é necessário ter o papel do IAM de administrador de uso do serviço (roles/serviceusage.serviceUsageAdmin), que contém a permissão serviceusage.services.enable. Saiba como conceder
papéis.
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
Conceder permissões
Conceda os seguintes papéis e permissões à sua conta de usuário:
Conceda papéis para gerenciar ambientes do Airflow Gerenciado e buckets de ambiente.
Conceda o papel de Proprietário de dados do BigQuery (
roles/bigquery.dataOwner) para criar um conjunto de dados do BigQuery.Conceda o papel Administrador do Storage (
roles/storage.admin) para criar um bucket do Cloud Storage.
Criar e preparar seu ambiente do Airflow gerenciado
Crie um ambiente do Airflow Gerenciado com parâmetros padrão:
- Escolha uma região nos EUA.
- Escolha a versão mais recente do Airflow gerenciado.
Conceda os seguintes papéis à conta de serviço usada no seu ambiente do Airflow Gerenciado para que os workers do Airflow executem tarefas de DAG com sucesso:
- Usuário do BigQuery (
roles/bigquery.user) - Proprietário de dados do BigQuery (
roles/bigquery.dataOwner) - Usuário da conta de serviço (
roles/iam.serviceAccountUser) - Editor do Dataproc (
roles/dataproc.editor) - Worker do Dataproc (
roles/dataproc.worker)
- Usuário do BigQuery (
Criar recursos relacionados
Crie um conjunto de dados vazio do BigQuery com os seguintes parâmetros:
- Nome:
holiday_weather - Região:
US
- Nome:
Crie um bucket do Cloud Storage na multirregião
US.Execute o seguinte comando para ativar o Acesso privado do Google na sub-rede padrão da região em que você quer executar o Serviço gerenciado para Apache Spark e atender aos requisitos de rede. Recomendamos usar a mesma região do seu ambiente do Airflow Gerenciado.
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Processamento de dados usando o Serviço Gerenciado para Apache Spark
Analisar o exemplo de job do PySpark
O código abaixo é um exemplo de job do PySpark que converte a temperatura de décimos de grau Celsius para graus Celsius. Esse job converte dados de temperatura do conjunto de dados em um formato diferente.
Fazer upload de arquivos de suporte para o Cloud Storage
Para fazer upload do arquivo PySpark e do conjunto de dados armazenado em holidays.csv:
Salve data_analytics_process.py na sua máquina local.
Salve holidays.csv na sua máquina local.
No console do Google Cloud , acesse a página Navegador do Cloud Storage:
Clique no nome do bucket que você criou.
Na guia Objetos do bucket, clique no botão Fazer upload de arquivos, selecione
data_analytics_process.pyeholidays.csvna caixa de diálogo exibida e clique em Abrir.
DAG de análise de dados
Conhecer o exemplo de DAG
O DAG usa vários operadores para transformar e unificar os dados:
O
GCSToBigQueryOperatoringere o arquivo holidays.csv do Cloud Storage para uma nova tabela no conjunto de dadosholidays_weatherdo BigQuery que você criou anteriormente.O
DataprocCreateBatchOperatorcria e executa um job em lote do PySpark usando o Serviço Gerenciado para Apache Spark.O
BigQueryInsertJobOperatormescla os dados de holidays.csv na coluna "Date" com dados meteorológicos do conjunto de dados público do BigQuery ghcn_d. As tarefasBigQueryInsertJobOperatorsão geradas dinamicamente usando um loop for, e essas tarefas estão em umTaskGrouppara melhorar a legibilidade na visualização de gráfico da interface do Airflow.
Usar a interface do Airflow para adicionar variáveis
No Airflow, as variáveis são uma maneira universal de armazenar e recuperar configurações ou configurações arbitrárias como um simples armazenamento de chave-valor. Esse DAG usa variáveis do Airflow para armazenar valores comuns. Para adicioná-los ao seu ambiente:
Acesse a interface do Airflow no console do Airflow Gerenciado.
Acesse Administrador > Variáveis.
Adicione as seguintes variáveis:
gcp_project: o ID do projeto.gcs_bucket: o nome do bucket criado anteriormente (sem o prefixogs://).gce_region: a região em que você quer que seu job do Serviço Gerenciado para Apache Spark atenda aos requisitos de rede do Serviço Gerenciado para Apache Spark. Essa é a região em que você ativou o Acesso privado do Google anteriormente.dataproc_service_account: a conta de serviço do seu ambiente Airflow Gerenciado. Você pode encontrar essa conta de serviço na guia de configuração do ambiente do seu ambiente do Airflow Gerenciado.
Faça upload do DAG para o bucket do ambiente
O Airflow Gerenciado agenda os DAGs que estão na
pasta /dags no bucket do ambiente. Para fazer upload do DAG usando o console do
Google Cloud :
Na sua máquina local, salve data_analytics_dag.py.
No console Google Cloud , acesse a página Ambientes.
Na lista de ambientes, na coluna Pasta de DAGs, clique no link DAGs. A pasta DAGs do seu ambiente é aberta.
Clique em Fazer o upload dos arquivos.
Selecione
data_analytics_dag.pyna sua máquina local e clique em Abrir.
Acionar o DAG
No ambiente do Airflow Gerenciado, clique na guia DAGs.
Clique no ID do DAG
data_analytics_dag.Clique em Acionar DAG.
Aguarde de cinco a dez minutos até ver uma marca de seleção verde indicando que as tarefas foram concluídas com sucesso.
Validar o sucesso do DAG
No console Google Cloud , acesse a página BigQuery.
No painel Explorer, clique no nome do projeto.
Clique em
holidays_weather_joined.Clique em "Visualizar" para conferir a tabela resultante. Observação: os números na coluna "Valor" estão em décimos de grau Celsius.
Clique em
holidays_weather_normalized.Clique em "Visualizar" para conferir a tabela resultante. Os números na coluna "Valor" estão em graus Celsius.
Aprofunde-se no Serviço Gerenciado para Apache Spark (opcional)
Você pode testar uma versão avançada desse DAG com um fluxo de processamento de dados PySpark mais complexo. Consulte a extensão do Serviço Gerenciado para Apache Spark para o exemplo de análise de dados no GitHub.
Limpeza
Exclua os recursos individuais criados para este tutorial:
Exclua o bucket do Cloud Storage que você criou para este tutorial.
Exclua o ambiente do Airflow gerenciado, incluindo a exclusão manual do bucket do ambiente.
A seguir
- Execute um DAG de análise de dados em Google Cloud Usando dados da AWS.
- Executar um DAG de análise de dados no Azure.