Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Este tutorial é uma modificação do artigo Execute um DAG de análise de dados no Google Cloud, que mostra como associar o seu ambiente do Cloud Composer ao Microsoft Azure para usar os dados aí armazenados. Mostra como usar o Cloud Composer para criar um DAG do Apache Airflow. O DAG junta dados de um conjunto de dados público do BigQuery e um ficheiro CSV armazenado no Azure Blob Storage e, em seguida, executa uma tarefa em lote do Serverless for Apache Spark para processar os dados unidos. Google Cloud
O conjunto de dados públicos do BigQuery neste tutorial é o ghcn_d, uma base de dados integrada de resumos climáticos em todo o mundo. O ficheiro CSV contém informações sobre as datas e os nomes dos feriados nos EUA de 1997 a 2021.
A pergunta à qual queremos responder usando o DAG é: "Qual foi a temperatura em Chicago no Dia de Ação de Graças nos últimos 25 anos?"
Objetivos
- Crie um ambiente do Cloud Composer na configuração predefinida
- Crie um blob no Azure
- Crie um conjunto de dados do BigQuery vazio
- Crie um novo contentor do Cloud Storage
- Crie e execute um DAG que inclua as seguintes tarefas:
- Carregue um conjunto de dados externo do Azure Blob Storage para o Cloud Storage
- Carregue um conjunto de dados externo do Cloud Storage para o BigQuery
- Junte dois conjuntos de dados no BigQuery
- Execute uma tarefa PySpark de análise de dados
Antes de começar
Ativar APIs
Ative as seguintes APIs:
Consola
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM
role (roles/serviceusage.serviceUsageAdmin
), which
contains the serviceusage.services.enable
permission. Learn how to grant
roles.
gcloud
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM
role (roles/serviceusage.serviceUsageAdmin
), which contains the
serviceusage.services.enable
permission. Learn how to grant
roles.
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
Conceder autorizações
Conceda as seguintes funções e autorizações à sua conta de utilizador:
Conceda funções para gerir ambientes do Cloud Composer e contentores de ambientes.
Conceda a função de proprietário de dados do BigQuery (
roles/bigquery.dataOwner
) para criar um conjunto de dados do BigQuery.Conceda a função de administrador de armazenamento (
roles/storage.admin
) para criar um contentor do Cloud Storage.
Crie e prepare o seu ambiente do Cloud Composer
Crie um ambiente do Cloud Composer com os parâmetros predefinidos:
- Escolha uma região sediada nos EUA.
- Escolha a versão do Cloud Composer mais recente.
Conceda as seguintes funções à conta de serviço usada no seu ambiente do Cloud Composer para que os trabalhadores do Airflow executem com êxito as tarefas DAG:
- Utilizador do BigQuery (
roles/bigquery.user
) - Proprietário dos dados do BigQuery (
roles/bigquery.dataOwner
) - Utilizador da conta de serviço (
roles/iam.serviceAccountUser
) - Editor do Dataproc (
roles/dataproc.editor
) - Dataproc Worker (
roles/dataproc.worker
)
- Utilizador do BigQuery (
Crie e modifique recursos relacionados no Google Cloud
Instale o
apache-airflow-providers-microsoft-azure
pacote PyPI no seu ambiente do Cloud Composer.Crie um conjunto de dados do BigQuery vazio com os seguintes parâmetros:
- Nome:
holiday_weather
- Região:
US
- Nome:
Crie um novo contentor do Cloud Storage na multirregião
US
.Execute o seguinte comando para ativar o acesso privado à Google na sub-rede predefinida na região onde quer executar o Google Cloud Serverless para Apache Spark para cumprir os requisitos de rede. Recomendamos que use a mesma região que o seu ambiente do Cloud Composer.
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Crie recursos relacionados no Azure
Crie uma conta de armazenamento com as predefinições.
Obtenha a chave de acesso e a string de ligação para a sua conta de armazenamento.
Crie um contentor com as opções predefinidas na conta de armazenamento que acabou de criar.
Conceda a função de delegador de blobs de armazenamento para o contentor criado no passo anterior.
Carregue o ficheiro holidays.csv para criar um blob de blocos com as opções predefinidas no portal do Azure.
Crie um token SAS para o blob de blocos que criou no passo anterior no portal do Azure.
- Método de assinatura: chave de delegação do utilizador
- Autorizações: leitura
- Endereço IP permitido: nenhum
- Protocolos permitidos: apenas HTTPS
Estabeleça ligação ao Azure a partir do Cloud Composer
Adicione a sua ligação do Microsoft Azure através da IU do Airflow:
Aceda a Administração > Ligações.
Crie uma nova associação com a seguinte configuração:
- ID da associação:
azure_blob_connection
- Tipo de ligação:
Azure Blob Storage
- Início de sessão no armazenamento de blobs: o nome da sua conta de armazenamento
- Chave de armazenamento de blobs: a chave de acesso para a sua conta de armazenamento
- String de ligação da conta de armazenamento de blobs: a string de ligação da conta de armazenamento
- Token SAS: o token SAS gerado a partir do seu blob
- ID da associação:
Tratamento de dados com o Google Cloud Serverless para Apache Spark
Explore o exemplo de tarefa do PySpark
O código apresentado abaixo é um exemplo de uma tarefa do PySpark que converte a temperatura de décimas de grau em Celsius para graus Celsius. Esta tarefa converte os dados de temperatura do conjunto de dados num formato diferente.
Carregue o ficheiro PySpark para o Cloud Storage
Para carregar o ficheiro PySpark para o Cloud Storage:
Guarde o ficheiro data_analytics_process.py na sua máquina local.
Na Google Cloud consola, aceda à página do navegador do Cloud Storage:
Clique no nome do contentor que criou anteriormente.
No separador Objetos do contentor, clique no botão Carregar ficheiros, selecione
data_analytics_process.py
na caixa de diálogo apresentada e clique em Abrir.
DAG de análise de dados
Explore o DAG de exemplo
O DAG usa vários operadores para transformar e unificar os dados:
O comando
AzureBlobStorageToGCSOperator
transfere o ficheiro holidays.csv do seu blob de blocos do Azure para o contentor do Cloud Storage.O comando
GCSToBigQueryOperator
introduz o ficheiro holidays.csv do Cloud Storage numa nova tabela no conjunto de dadosholidays_weather
do BigQuery que criou anteriormente.O comando
DataprocCreateBatchOperator
cria e executa uma tarefa em lote do PySpark usando o Serverless para Apache Spark.O comando
BigQueryInsertJobOperator
junta os dados de holidays.csv na coluna "Date" com os dados meteorológicos do conjunto de dados público do BigQuery ghcn_d. As tarefasBigQueryInsertJobOperator
são geradas dinamicamente através de um ciclo for, e estas tarefas estão numTaskGroup
para uma melhor legibilidade na vista de gráfico da IU do Airflow.
Use a IU do Airflow para adicionar variáveis
No Airflow, as variáveis são uma forma universal de armazenar e obter definições ou configurações arbitrárias como um simples armazenamento de valores-chave. Este DAG usa variáveis do Airflow para armazenar valores comuns. Para as adicionar ao seu ambiente:
Aceda à IU do Airflow a partir da consola do Cloud Composer.
Aceda a Administração > Variáveis.
Adicione as seguintes variáveis:
gcp_project
: o ID do seu projeto.gcs_bucket
: o nome do contentor que criou anteriormente (sem o prefixogs://
).gce_region
: a região onde quer que a tarefa do Dataproc cumpra os Google Cloud requisitos de rede do Serverless para Apache Spark. Esta é a região onde ativou o acesso privado à Google anteriormente.dataproc_service_account
: a conta de serviço do seu ambiente do Cloud Composer. Pode encontrar esta conta de serviço no separador de configuração do ambiente do seu ambiente do Cloud Composer.azure_blob_name
: o nome do blob que criou anteriormente.azure_container_name
: o nome do contentor que criou anteriormente.
Carregue o DAG para o contentor do seu ambiente
O Cloud Composer agenda DAGs localizados na pasta /dags
no contentor do seu ambiente. Para carregar o DAG através da
Google Cloud consola:
Na sua máquina local, guarde o ficheiro azureblobstoretogcsoperator_tutorial.py.
Na Google Cloud consola, aceda à página Ambientes.
Na lista de ambientes, na coluna Pasta DAG, clique no link DAGs. A pasta DAGs do seu ambiente é aberta.
Clique em Carregar ficheiros.
Selecione
azureblobstoretogcsoperator_tutorial.py
no seu computador local e clique em Abrir.
Acione o DAG
No seu ambiente do Cloud Composer, clique no separador DAGs.
Clique no ID do DAG
azure_blob_to_gcs_dag
.Clique em Acionar DAG.
Aguarde cerca de cinco a dez minutos até ver uma marca de verificação verde a indicar que as tarefas foram concluídas com êxito.
Valide o êxito do DAG
Na Google Cloud consola, aceda à página BigQuery.
No painel Explorador, clique no nome do projeto.
Clique em
holidays_weather_joined
.Clique em pré-visualizar para ver a tabela resultante. Tenha em atenção que os números na coluna de valor estão em décimos de grau Celsius.
Clique em
holidays_weather_normalized
.Clique em pré-visualizar para ver a tabela resultante. Tenha em atenção que os números na coluna de valor estão em graus Celsius.
Limpeza
Elimine os recursos individuais que criou para este tutorial:
Elimine o contentor do Cloud Storage que criou para este tutorial.
Elimine o ambiente do Cloud Composer, incluindo a eliminação manual do contentor do ambiente.