Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Este tutorial mostra como usar o Cloud Composer para criar um DAG do Apache Airflow. O DAG junta dados de um conjunto de dados público do BigQuery e de um ficheiro CSV armazenado num contentor do Cloud Storage e, em seguida, executa uma tarefa em lote doGoogle Cloud Serverless para Apache Spark para processar os dados unidos.
O conjunto de dados públicos do BigQuery neste tutorial é o ghcn_d, uma base de dados integrada de resumos climáticos em todo o mundo. O ficheiro CSV contém informações sobre as datas e os nomes dos feriados nos EUA de 1997 a 2021.
A pergunta à qual queremos responder usando o DAG é: "Qual foi a temperatura em Chicago no Dia de Ação de Graças nos últimos 25 anos?"
Objetivos
- Crie um ambiente do Cloud Composer na configuração predefinida
- Crie um conjunto de dados do BigQuery vazio
- Crie um novo contentor do Cloud Storage
- Crie e execute um DAG que inclua as seguintes tarefas:
- Carregue um conjunto de dados externo do Cloud Storage para o BigQuery
- Junte dois conjuntos de dados no BigQuery
- Execute uma tarefa PySpark de análise de dados
Antes de começar
Ativar APIs
Ative as seguintes APIs:
Consola
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM
role (roles/serviceusage.serviceUsageAdmin
), which
contains the serviceusage.services.enable
permission. Learn how to grant
roles.
gcloud
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM
role (roles/serviceusage.serviceUsageAdmin
), which contains the
serviceusage.services.enable
permission. Learn how to grant
roles.
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
Conceder autorizações
Conceda as seguintes funções e autorizações à sua conta de utilizador:
Conceda funções para gerir ambientes do Cloud Composer e contentores de ambientes.
Conceda a função de proprietário de dados do BigQuery (
roles/bigquery.dataOwner
) para criar um conjunto de dados do BigQuery.Conceda a função de administrador de armazenamento (
roles/storage.admin
) para criar um contentor do Cloud Storage.
Crie e prepare o seu ambiente do Cloud Composer
Crie um ambiente do Cloud Composer com os parâmetros predefinidos:
- Escolha uma região sediada nos EUA.
- Escolha a versão do Cloud Composer mais recente.
Conceda as seguintes funções à conta de serviço usada no seu ambiente do Cloud Composer para que os trabalhadores do Airflow executem com êxito as tarefas DAG:
- Utilizador do BigQuery (
roles/bigquery.user
) - Proprietário dos dados do BigQuery (
roles/bigquery.dataOwner
) - Utilizador da conta de serviço (
roles/iam.serviceAccountUser
) - Editor do Dataproc (
roles/dataproc.editor
) - Dataproc Worker (
roles/dataproc.worker
)
- Utilizador do BigQuery (
Crie recursos relacionados
Crie um conjunto de dados do BigQuery vazio com os seguintes parâmetros:
- Nome:
holiday_weather
- Região:
US
- Nome:
Crie um novo contentor do Cloud Storage na multirregião
US
.Execute o seguinte comando para ativar o acesso privado à Google na sub-rede predefinida na região onde quer executar o Google Cloud Serverless para Apache Spark para cumprir os requisitos de rede. Recomendamos que use a mesma região que o seu ambiente do Cloud Composer.
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Tratamento de dados com o Google Cloud Serverless para Apache Spark
Explore o exemplo de tarefa do PySpark
O código apresentado abaixo é um exemplo de uma tarefa do PySpark que converte a temperatura de décimas de grau em Celsius para graus Celsius. Esta tarefa converte os dados de temperatura do conjunto de dados num formato diferente.
Carregue ficheiros auxiliares para o Cloud Storage
Para carregar o ficheiro PySpark e o conjunto de dados armazenado em holidays.csv
:
Guarde o ficheiro data_analytics_process.py na sua máquina local.
Guarde o ficheiro holidays.csv na sua máquina local.
Na Google Cloud consola, aceda à página do navegador do Cloud Storage:
Clique no nome do contentor que criou anteriormente.
No separador Objects do contentor, clique no botão Upload files, selecione
data_analytics_process.py
eholidays.csv
na caixa de diálogo apresentada e clique em Open.
DAG de análise de dados
Explore o DAG de exemplo
O DAG usa vários operadores para transformar e unificar os dados:
O comando
GCSToBigQueryOperator
introduz o ficheiro holidays.csv do Cloud Storage numa nova tabela no conjunto de dadosholidays_weather
do BigQuery que criou anteriormente.O comando
DataprocCreateBatchOperator
cria e executa uma tarefa em lote do PySpark usando o Serverless para Apache Spark.O comando
BigQueryInsertJobOperator
junta os dados de holidays.csv na coluna "Date" com os dados meteorológicos do conjunto de dados público do BigQuery ghcn_d. As tarefasBigQueryInsertJobOperator
são geradas dinamicamente através de um ciclo for, e estas tarefas estão numTaskGroup
para uma melhor legibilidade na vista de gráfico da IU do Airflow.
Use a IU do Airflow para adicionar variáveis
No Airflow, as variáveis são uma forma universal de armazenar e obter definições ou configurações arbitrárias como um simples armazenamento de valores-chave. Este DAG usa variáveis do Airflow para armazenar valores comuns. Para as adicionar ao seu ambiente:
Aceda à IU do Airflow a partir da consola do Cloud Composer.
Aceda a Administração > Variáveis.
Adicione as seguintes variáveis:
gcp_project
: o ID do seu projeto.gcs_bucket
: o nome do contentor que criou anteriormente (sem o prefixogs://
).gce_region
: a região onde quer que a tarefa do Dataproc cumpra os Google Cloud requisitos de rede do Serverless para Apache Spark. Esta é a região onde ativou o acesso privado à Google anteriormente.dataproc_service_account
: a conta de serviço do seu ambiente do Cloud Composer. Pode encontrar esta conta de serviço no separador de configuração do ambiente do seu ambiente do Cloud Composer.
Carregue o DAG para o contentor do seu ambiente
O Cloud Composer agenda DAGs localizados na pasta /dags
no contentor do seu ambiente. Para carregar o DAG através da
Google Cloud consola:
Na sua máquina local, guarde o ficheiro data_analytics_dag.py.
Na Google Cloud consola, aceda à página Ambientes.
Na lista de ambientes, na coluna Pasta DAG, clique no link DAGs. A pasta DAGs do seu ambiente é aberta.
Clique em Carregar ficheiros.
Selecione
data_analytics_dag.py
no seu computador local e clique em Abrir.
Acione o DAG
No seu ambiente do Cloud Composer, clique no separador DAGs.
Clique no ID do DAG
data_analytics_dag
.Clique em Acionar DAG.
Aguarde cerca de cinco a dez minutos até ver uma marca de verificação verde a indicar que as tarefas foram concluídas com êxito.
Valide o êxito do DAG
Na Google Cloud consola, aceda à página BigQuery.
No painel Explorador, clique no nome do projeto.
Clique em
holidays_weather_joined
.Clique em pré-visualizar para ver a tabela resultante. Tenha em atenção que os números na coluna de valor estão em décimos de grau Celsius.
Clique em
holidays_weather_normalized
.Clique em pré-visualizar para ver a tabela resultante. Tenha em atenção que os números na coluna de valor estão em graus Celsius.
Análise detalhada com o Google Cloud Serverless para Apache Spark (opcional)
Pode experimentar uma versão avançada deste DAG com um fluxo de processamento de dados PySpark mais complexo. Consulte a extensão do Dataproc para o exemplo de estatísticas de dados no GitHub.
Limpeza
Elimine os recursos individuais que criou para este tutorial:
Elimine o contentor do Cloud Storage que criou para este tutorial.
Elimine o ambiente do Cloud Composer, incluindo a eliminação manual do contentor do ambiente.
O que se segue?
- Execute um DAG de estatísticas de dados em Google Cloud Usar dados da AWS.
- Execute um DAG de análise de dados no Azure.