Airflow gerenciado (geração 3) | Airflow gerenciado (geração 2) | Airflow gerenciado (geração 1 legada)
Este tutorial mostra como usar o Airflow gerenciado para criar um DAG do Apache Airflow. O DAG une dados de um conjunto de dados públicos do BigQuery e um arquivo CSV armazenado em um bucket do Cloud Storage e, em seguida, executa um job em lote do Serviço Gerenciado para Apache Spark para processar os dados unidos.
O conjunto de dados públicos do BigQuery neste tutorial é ghcn_d, um banco de dados integrado de resumos climáticos em todo o mundo. O arquivo CSV contém informações sobre as datas e os nomes dos feriados dos EUA de 1997 a 2021.
A pergunta que queremos responder usando o DAG é: "Qual foi a temperatura em Chicago no Dia de Ação de Graças nos últimos 25 anos?"
Objetivos
- Criar um ambiente do Airflow gerenciado na configuração padrão
- Criar um conjunto de dados vazio do BigQuery
- Criar um novo bucket do Cloud Storage
- Criar e executar um DAG que inclua as seguintes tarefas:
- Carregar um conjunto de dados externo do Cloud Storage para o BigQuery
- Unir dois conjuntos de dados no BigQuery
- Executar um job do PySpark de análise de dados
Antes de começar
Ativar APIs
Ative as APIs a seguir:
Console
Ative as APIs do Serviço Gerenciado para Apache Spark, do Airflow gerenciado, do BigQuery e do Cloud Storage.
Funções necessárias para ativar APIs
Para ativar as APIs, é necessário ter o papel do IAM de administrador de uso do serviço (roles/serviceusage.serviceUsageAdmin), que contém a permissão serviceusage.services.enable. Saiba como conceder
papéis.
gcloud
Ative as APIs do Serviço Gerenciado para Apache Spark, do Airflow gerenciado, do BigQuery e do Cloud Storage:
Funções necessárias para ativar APIs
Para ativar as APIs, é necessário ter o papel do IAM de administrador de Service Usage
role (roles/serviceusage.serviceUsageAdmin), que contém a
serviceusage.services.enable permissão. Saiba como conceder
papéis.
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
Conceder permissões
Conceda os seguintes papéis e permissões à sua conta de usuário:
Conceda papéis para gerenciar ambientes do Airflow Gerenciado e buckets de ambiente.
Conceda o papel de proprietário de dados do BigQuery (
roles/bigquery.dataOwner) para criar um conjunto de dados do BigQuery.Conceda o papel de administrador do Storage (
roles/storage.admin) para criar um bucket do Cloud Storage.
Criar e preparar o ambiente do Airflow gerenciado
Crie um ambiente do Airflow gerenciado com parâmetros padrão:
- Escolha uma região dos EUA.
- Escolha a versão mais recente do Airflow gerenciado.
Conceda os seguintes papéis à conta de serviço usada no ambiente do Airflow Gerenciado para que os workers do Airflow executem as tarefas do DAG:
- Usuário do BigQuery (
roles/bigquery.user) - Proprietário de dados do BigQuery (
roles/bigquery.dataOwner) - Usuário da conta de serviço (
roles/iam.serviceAccountUser) - Editor do Dataproc (
roles/dataproc.editor) - Worker do Dataproc (
roles/dataproc.worker)
- Usuário do BigQuery (
Criar recursos relacionados
Crie um conjunto de dados vazio do BigQuery com os seguintes parâmetros:
- Nome:
holiday_weather - Região:
US
- Nome:
Crie um novo bucket do Cloud Storage na multirregião
US.Execute o comando a seguir para ativar o Acesso privado do Google na sub-rede padrão na região em que você quer executar o Serviço Gerenciado para Apache Spark para atender aos requisitos de rede. Recomendamos usar a mesma região do ambiente do Airflow gerenciado.
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Processamento de dados usando o Serviço Gerenciado para Apache Spark
Conferir o exemplo de job do PySpark
O código mostrado abaixo é um exemplo de job do PySpark que converte a temperatura de décimos de grau Celsius para graus Celsius. Esse job converte dados de temperatura do conjunto de dados em um formato diferente.
Fazer upload de arquivos de suporte para o Cloud Storage
Para fazer upload do arquivo do PySpark e do conjunto de dados armazenado em holidays.csv:
Salve data_analytics_process.py na sua máquina local.
Salve holidays.csv na sua máquina local.
No Google Cloud console, acesse a página Buckets do Cloud Storage:
Clique no nome do bucket que você criou anteriormente.
Na guia Objetos do bucket, clique no botão Fazer o upload dos arquivos , selecione
data_analytics_process.pyeholidays.csvna caixa de diálogo que aparece e clique em Abrir.
DAG de análise de dados
Conferir o exemplo de DAG
O DAG usa vários operadores para transformar e unificar os dados:
O
GCSToBigQueryOperatoringere o arquivo holidays.csv do Cloud Storage para uma nova tabela no conjunto de dados do BigQueryholidays_weatherque você criou anteriormente.O
DataprocCreateBatchOperatorcria e executa um job em lote do PySpark usando o Serviço Gerenciado para Apache Spark.O
BigQueryInsertJobOperatorune os dados de holidays.csv na coluna "Date" com dados climáticos do conjunto de dados públicos do BigQuery ghcn_d. As tarefasBigQueryInsertJobOperatorsão geradas dinamicamente usando um loop for, e essas tarefas estão em umTaskGrouppara melhor legibilidade na visualização de gráfico da interface do Airflow.
Usar a interface do Airflow para adicionar variáveis
No Airflow, variáveis são uma maneira universal de armazenar e recuperar configurações arbitrárias como um armazenamento de chave-valor simples. Esse DAG usa variáveis do Airflow para armazenar valores comuns. Para adicioná-las ao ambiente:
Acesse a interface do Airflow no console do Airflow gerenciado.
Acesse Admin > Variables.
Adicione as seguintes variáveis:
gcp_project: o ID do projeto.gcs_bucket: o nome do bucket que você criou anteriormente (sem o prefixogs://).gce_region: a região em que você quer que o job do Serviço Gerenciado para Apache Spark atenda aos requisitos de rede do Serviço Gerenciado para Apache Spark. Essa é a região em que você ativou o Acesso privado do Google anteriormente.dataproc_service_account: a conta de serviço do ambiente do Airflow gerenciado. É possível encontrar essa conta de serviço na guia de configuração do ambiente do Airflow gerenciado.
Fazer upload do DAG para o bucket do ambiente
O Airflow gerenciado agenda DAGs localizados na pasta /dags no bucket do ambiente. Para fazer upload do DAG usando o
Google Cloud console:
Na sua máquina local, salve data_analytics_dag.py.
No Google Cloud console, acesse a página Ambientes.
Na lista de ambientes, na coluna Pasta DAG , clique no link DAGs. A pasta DAGs do ambiente é aberta.
Clique em Fazer o upload dos arquivos.
Selecione
data_analytics_dag.pyna sua máquina local e clique em Abrir.
Acionar o DAG
No ambiente do Airflow gerenciado, clique na guia DAGs.
Clique no ID do DAG
data_analytics_dag.Clique em DAG de gatilho.
Aguarde de cinco a dez minutos até que uma marca de seleção verde indique que as tarefas foram concluídas.
Validar a conclusão do DAG
No Google Cloud console, acesse a página BigQuery.
No painel Explorer, clique no nome do projeto.
Clique em
holidays_weather_joined.Clique em "Visualização" para conferir a tabela resultante. Os números na coluna de valor estão em décimos de grau Celsius.
Clique em
holidays_weather_normalized.Clique em "Visualização" para conferir a tabela resultante. Os números na coluna de valor estão em graus Celsius.
Análise detalhada com o Serviço Gerenciado para Apache Spark (opcional)
É possível testar uma versão avançada desse DAG com um fluxo de processamento de dados do PySpark mais complexo. Consulte a extensão do Serviço Gerenciado para Apache Spark para o exemplo de análise de dados no GitHub.
Revisão dos dados
Exclua os recursos individuais criados para este tutorial:
Exclua o bucket do Cloud Storage que você criou para este tutorial.
Exclua o ambiente do Airflow gerenciado, incluindo a exclusão manual do bucket do ambiente.
A seguir
- Executar um DAG de análise de dados em Google Cloud Como usar dados da AWS.
- Executar um DAG de análise de dados no Azure.