Airflow gerenciado (geração 3) | Airflow gerenciado (geração 2) | Airflow gerenciado (geração 1 legada)
Este tutorial é uma modificação de Executar um DAG de análise de dados no Google Cloud que mostra como conectar o ambiente do Airflow gerenciado ao Amazon Web Services para usar os dados armazenados nele. Ele mostra como usar o Airflow gerenciado para criar um DAG do Apache Airflow. O DAG une dados de um conjunto de dados públicos do BigQuery e um arquivo CSV armazenado em um bucket do Amazon Web Services (AWS) S3 e, em seguida, executa um job em lote do Serviço gerenciado para Apache Spark para processar os dados unidos.
O conjunto de dados públicos do BigQuery neste tutorial é ghcn_d, um banco de dados integrado de resumos climáticos em todo o mundo. O arquivo CSV contém informações sobre as datas e os nomes dos feriados dos EUA de 1997 a 2021.
A pergunta que queremos responder usando o DAG é: "Qual foi a temperatura em Chicago no Dia de Ação de Graças nos últimos 25 anos?"
Objetivos
- Criar um ambiente do Airflow gerenciado na configuração padrão
- Criar um bucket no AWS S3
- Criar um conjunto de dados vazio do BigQuery
- Criar um novo bucket do Cloud Storage
- Criar e executar um DAG que inclua as seguintes tarefas:
- Carregar um conjunto de dados externo do S3 para o Cloud Storage
- Carregar um conjunto de dados externo do Cloud Storage para o BigQuery
- Unir dois conjuntos de dados no BigQuery
- Executar um job do PySpark de análise de dados
Antes de começar
Gerenciar permissões na AWS
Siga a seção "Como criar políticas com o editor visual" do tutorial Como criar políticas do IAM da AWS para criar uma política do IAM personalizada para o AWS S3 com a seguinte configuração:
- Serviço:S3
- ListAllMyBuckets (
s3:ListAllMyBuckets), para visualizar seu bucket do S3 - CreateBucket (
s3:CreateBucket), para criar um bucket - PutBucketOwnershipControls (
s3:PutBucketOwnershipControls), para criar um bucket - ListBucket (
s3:ListBucket), para conceder permissão para listar objetos em um bucket do S3 - PutObject (
s3:PutObject), para fazer upload de arquivos para um bucket - GetBucketVersioning (
s3:GetBucketVersioning), para excluir um objeto em um bucket - DeleteObject (
s3:DeleteObject), para excluir um objeto em um bucket - ListBucketVersions (
s3:ListBucketVersions), para excluir um bucket - DeleteBucket (
s3:DeleteBucket), para excluir um bucket - Recursos:escolha "Qualquer" ao lado de "bucket" e "objeto" para conceder permissões a todos os recursos desse tipo.
- Tag:nenhuma
- Nome:TutorialPolicy
Consulte a lista de ações compatíveis com o Amazon S3 para mais informações sobre cada configuração.
Ativar APIs
Ative as APIs a seguir:
Console
Ative as APIs Serviço gerenciado para Apache Spark, Airflow gerenciado, BigQuery e Cloud Storage.
Funções necessárias para ativar APIs
Para ativar as APIs, é necessário ter o papel do IAM de administrador de uso do serviço (roles/serviceusage.serviceUsageAdmin), que contém a permissão serviceusage.services.enable. Saiba como conceder
papéis.
gcloud
Ative as APIs Serviço gerenciado para Apache Spark, Airflow gerenciado, BigQuery e Cloud Storage:
Funções necessárias para ativar APIs
Para ativar as APIs, é necessário ter o papel do IAM de administrador de Service Usage
role (roles/serviceusage.serviceUsageAdmin), que contém a
serviceusage.services.enable permissão. Saiba como conceder
papéis.
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
Conceder permissões
Conceda os seguintes papéis e permissões à sua conta de usuário:
Conceda papéis para gerenciar ambientes do Airflow gerenciado e buckets de ambiente.
Conceda o papel de Proprietário de dados do BigQuery (
roles/bigquery.dataOwner) para criar um conjunto de dados do BigQuery.Conceda o papel de Administrador de armazenamento (
roles/storage.admin) para criar um bucket do Cloud Storage.
Criar e preparar o ambiente do Airflow gerenciado
Crie um ambiente do Airflow gerenciado com parâmetros padrão:
- Escolha uma região dos EUA.
- Escolha a versão mais recente do Airflow gerenciado.
Conceda os seguintes papéis à conta de serviço usada no ambiente do Airflow Gerenciado para que os workers do Airflow executem as tarefas do DAG:
- Usuário do BigQuery (
roles/bigquery.user) - Proprietário de dados do BigQuery (
roles/bigquery.dataOwner) - Usuário da conta de serviço (
roles/iam.serviceAccountUser) - Editor do Dataproc (
roles/dataproc.editor) - Worker do Dataproc (
roles/dataproc.worker)
- Usuário do BigQuery (
Criar e modificar recursos relacionados no Google Cloud
Instale o pacote do PyPI no ambiente do Airflow gerenciado.
apache-airflow-providers-amazonCrie um conjunto de dados vazio do BigQuery com os seguintes parâmetros:
- Nome:
holiday_weather - Região:
US
- Nome:
Crie um novo bucket do Cloud Storage na multirregião
US.Execute o seguinte comando para ativar o Acesso privado do Google na sub-rede padrão na região em que você quer executar o Serviço gerenciado para Apache Spark para atender aos requisitos de rede. Recomendamos usar a mesma região do ambiente do Airflow gerenciado.
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Criar recursos relacionados na AWS
Crie um bucket do S3 com as configurações padrão na região de sua preferência.
Conectar-se à AWS pelo Airflow gerenciado
- Receber o ID da chave de acesso e a chave de acesso secreta da AWS
Adicione sua conexão do AWS S3 conexão usando a interface do Airflow:
- Acesse Administrador > Conexões.
Crie uma nova conexão com a seguinte configuração:
- ID da conexão:
aws_s3_connection - Tipo de conexão:
Amazon S3 - Extras (ou JSON de campos extras):
{"aws_access_key_id":"your_aws_access_key_id", "aws_secret_access_key": "your_aws_secret_access_key"}
- ID da conexão:
Processamento de dados usando o Serviço gerenciado para Apache Spark
Esta seção descreve o processamento de dados com o Serviço gerenciado para Apache Spark.
Conferir o exemplo de job do PySpark
O código mostrado abaixo é um exemplo de job do PySpark que converte a temperatura de décimos de grau Celsius para graus Celsius. Esse job converte dados de temperatura do conjunto de dados em um formato diferente.
Fazer upload do arquivo PySpark para o Cloud Storage
Para fazer upload do arquivo PySpark para o Cloud Storage:
Salve data_analytics_process.py na sua máquina local.
No Google Cloud console, acesse a página Navegador do Cloud Storage:
Clique no nome do bucket que você criou.
Na guia Objetos do bucket, clique no botão Fazer o upload de arquivos , selecione
data_analytics_process.pyna caixa de diálogo exibida e clique em Abrir.
Fazer upload do arquivo CSV para o AWS S3
Para fazer upload do arquivo holidays.csv:
- Salve
holidays.csvna sua máquina local. - Siga o guia da AWS para fazer upload do arquivo no seu bucket.
DAG de análise de dados
Esta seção descreve como configurar e usar o DAG de análise de dados.
Conferir o exemplo de DAG
O DAG usa vários operadores para transformar e unificar os dados:
O
S3ToGCSOperatortransfere o arquivo holidays.csv do bucket do AWS S3 para o bucket do Cloud Storage.O
GCSToBigQueryOperatoringere o arquivo holidays.csv do Cloud Storage para uma nova tabela no conjunto de dadosholidays_weatherdo BigQuery que você criou.O
DataprocCreateBatchOperatorcria e executa um job em lote do PySpark usando o Serviço gerenciado para Apache Spark.O
BigQueryInsertJobOperatorune os dados de holidays.csv na coluna "Date" com dados climáticos do conjunto de dados públicos do BigQuery ghcn_d. As tarefasBigQueryInsertJobOperatorsão geradas dinamicamente usando um loop for, e essas tarefas estão em umTaskGrouppara melhor legibilidade na visualização de gráfico da interface do Airflow.
Usar a interface do Airflow para adicionar variáveis
No Airflow, as variáveis são uma maneira universal de armazenar e recuperar configurações arbitrárias como um armazenamento de chave-valor simples. Esse DAG usa variáveis do Airflow para armazenar valores comuns. Para adicioná-los ao seu ambiente:
Acesse a interface do Airflow no Google Cloud console.
Acesse Administrador > Variáveis.
Adicione as seguintes variáveis:
s3_bucket: o nome do bucket do S3 que você criou.gcp_project: o ID do projeto.gcs_bucket: o nome do bucket que você criou (sem o prefixogs://).gce_region: a região em que você quer que o job do Serviço gerenciado para Apache Spark atenda aos requisitos de rede do Serviço gerenciado para Apache Spark de rede. Essa é a região em que você ativou o Acesso privado do Google.dataproc_service_account: a conta de serviço do ambiente do Airflow gerenciado. Você pode encontrar essa conta de serviço na guia de configuração do ambiente do Airflow gerenciado.
Fazer upload do DAG para o bucket do ambiente
O Airflow gerenciado agenda DAGs localizados na pasta /dags no bucket do ambiente. Para fazer upload do DAG usando o
Google Cloud console:
Na sua máquina local, salve s3togcsoperator_tutorial.py.
No Google Cloud console, acesse a página Ambientes.
Na lista de ambientes, na coluna Pasta de DAGs , clique no link DAGs. A pasta DAGs do ambiente será aberta.
Clique em Fazer o upload dos arquivos.
Selecione
s3togcsoperator_tutorial.pyna sua máquina local e clique em Abrir.
Acionar o DAG
No ambiente do Airflow gerenciado, clique na guia DAGs.
Clique no ID do DAG
s3_to_gcs_dag.Clique em DAG de gatilho.
Aguarde de cinco a dez minutos até que uma marca de seleção verde indique que as tarefas foram concluídas.
Validar a conclusão do DAG
No Google Cloud console, acesse a página BigQuery.
No painel Explorer, clique no nome do projeto.
Clique em
holidays_weather_joined.Clique em "Visualizar" para conferir a tabela resultante. Os números na coluna de valor estão em décimos de grau Celsius.
Clique em
holidays_weather_normalized.Clique em "Visualizar" para conferir a tabela resultante. Os números na coluna de valor estão em graus Celsius.
Revisão dos dados
Exclua os recursos individuais que você criou para este tutorial:
Exclua o arquivo
holidays.csvno bucket do AWS S3.Exclua o bucket do AWS S3 que você criou.
Exclua o bucket do Cloud Storage que você criou para este tutorial.
Exclua o ambiente do Airflow gerenciado, incluindo a exclusão manual do bucket do ambiente.