Pipeline de ETL

+

Arquitetura

O pipeline ETL é uma arquitetura para executar pipelines de processamento de dados em lote através do método de extração, transformação e carregamento. Esta arquitetura consiste nos seguintes componentes:

  • Google Cloud Storage para dados de origem de página de destino
  • Dataflow para fazer transformações nos dados de origem
  • BigQuery como destino dos dados transformados
  • Ambiente do Cloud Composer para orquestrar o processo de ETL

Começar

Clique no seguinte link para aceder a uma cópia do código fonte no Cloud Shell. Quando estiver lá, um único comando vai iniciar uma cópia funcional da aplicação no seu projeto.

Abra no Cloud Shell

Veja o código fonte no GitHub


Componentes do pipeline de ETL

A arquitetura do pipeline de ETL usa vários produtos. A lista seguinte apresenta os componentes, juntamente com mais informações sobre os componentes, incluindo links para vídeos relacionados, documentação do produto e visitas guiadas interativas.
Vídeo Docs Instruções passo a passo
BigQuery O BigQuery é um armazém de dados com várias nuvens, sem servidor e económico concebido para ajudar a transformar grandes volumes de dados em estatísticas empresariais valiosas.
Cloud Composer Um serviço de orquestração de fluxos de trabalho totalmente gerido incorporado no Apache Airflow.
Cloud Storage O Cloud Storage oferece armazenamento de ficheiros e publicação pública de imagens através de http(s).

Scripts

O script de instalação usa um executável escrito em go e ferramentas da CLI do Terraform para usar um projeto vazio e instalar a aplicação no mesmo. A saída deve ser uma aplicação funcional e um URL para o endereço IP de equilíbrio de carga.

./main.tf

Ative os serviços

Os serviços Google Cloud estão desativados num projeto por predefinição. Para usar qualquer uma das soluções aqui apresentadas, temos de ativar o seguinte:

  • IAM: gere a identidade e o acesso aos recursos do Google Cloud
  • Armazenamento: serviço para armazenar e aceder aos seus dados no Google Cloud
  • Dataflow: serviço gerido para executar uma grande variedade de padrões de processamento de dados
  • BigQuery: plataforma de dados para criar, gerir, partilhar e consultar dados
  • Composer: gere ambientes do Apache Airflow no Google Cloud
  • Computação: máquinas virtuais e serviços de rede (usados pelo Composer)
variable "gcp_service_list" {
  description = "The list of apis necessary for the project"
  type        = list(string)
  default = [
    "dataflow.googleapis.com",
    "compute.googleapis.com",
    "composer.googleapis.com",
    "storage.googleapis.com",
    "bigquery.googleapis.com",
    "iam.googleapis.com"
  ]
}

resource "google_project_service" "all" {
  for_each           = toset(var.gcp_service_list)
  project            = var.project_number
  service            = each.key
  disable_on_destroy = false
}

Crie uma conta de serviço

Cria uma conta de serviço a ser usada pelo Composer e pelo Dataflow.

resource "google_service_account" "etl" {
  account_id   = "etlpipeline"
  display_name = "ETL SA"
  description  = "user-managed service account for Composer and Dataflow"
  project = var.project_id
  depends_on = [google_project_service.all]
}

Atribuir funções

Concede as funções necessárias à conta de serviço e concede a função de extensão do agente de serviço da API Cloud Composer v2 ao agente de serviço do Cloud Composer (necessário para ambientes do Composer 2).

variable "build_roles_list" { description = "The list of roles that Composer and Dataflow needs" type = list(string) default = [ "roles/composer.worker", "roles/dataflow.admin", "roles/dataflow.worker", "roles/bigquery.admin", "roles/storage.objectAdmin", "roles/dataflow.serviceAgent", "roles/composer.ServiceAgentV2Ext" ] }

resource "google_project_iam_member" "allbuild" {
  project    = var.project_id
  for_each   = toset(var.build_roles_list)
  role       = each.key
  member     = "serviceAccount:${google_service_account.etl.email}"
  depends_on = [google_project_service.all,google_service_account.etl]
}

resource "google_project_iam_member" "composerAgent" {
  project    = var.project_id
  role       = "roles/composer.ServiceAgentV2Ext"
  member     = "serviceAccount:service-${var.project_number}@cloudcomposer-accounts.iam.gserviceaccount.com"
  depends_on = [google_project_service.all]
}

Crie um ambiente do Composer

O Airflow depende de muitos microsserviços para ser executado, pelo que o Cloud Composer aprovisiona os componentes do Google Cloud para executar os seus fluxos de trabalho. Estes componentes são conhecidos coletivamente como um ambiente do Cloud Composer.

# Create Composer environment
resource "google_composer_environment" "example" {
  project   = var.project_id
  name      = "example-environment"
  region    = var.region
  config {

    software_config {
      image_version = "composer-2.0.12-airflow-2.2.3"
      env_variables = {
        AIRFLOW_VAR_PROJECT_ID  = var.project_id
        AIRFLOW_VAR_GCE_ZONE    = var.zone
        AIRFLOW_VAR_BUCKET_PATH = "gs://${var.basename}-${var.project_id}-files"
      }
    }
    node_config {
      service_account = google_service_account.etl.name
    }
  }
  depends_on = [google_project_service.all, google_service_account.etl, google_project_iam_member.allbuild, google_project_iam_member.composerAgent]
}

Crie um conjunto de dados e uma tabela do BigQuery

Cria um conjunto de dados e uma tabela no BigQuery para armazenar os dados processados e disponibilizá-los para análise.

resource "google_bigquery_dataset" "weather_dataset" {
  project    = var.project_id
  dataset_id = "average_weather"
  location   = "US"
  depends_on = [google_project_service.all]
}

resource "google_bigquery_table" "weather_table" {
  project    = var.project_id
  dataset_id = google_bigquery_dataset.weather_dataset.dataset_id
  table_id   = "average_weather"
  deletion_protection = false

  schema     = <<EOF
[
  {
    "name": "location",
    "type": "GEOGRAPHY",
    "mode": "REQUIRED"
  },
  {
    "name": "average_temperature",
    "type": "INTEGER",
    "mode": "REQUIRED"
  },
   {
    "name": "month",
    "type": "STRING",
    "mode": "REQUIRED"
  },
   {
    "name": "inches_of_rain",
    "type": "NUMERIC",
    "mode": "NULLABLE"
  },
   {
    "name": "is_current",
    "type": "BOOLEAN",
    "mode": "NULLABLE"
  },
   {
    "name": "latest_measurement",
    "type": "DATE",
    "mode": "NULLABLE"
  }
]
EOF
  depends_on = [google_bigquery_dataset.weather_dataset]
}

Crie um contentor do Cloud Storage e adicione ficheiros

Cria um contentor de armazenamento para guardar os ficheiros necessários para o pipeline, incluindo os dados de origem (inputFile.txt), o esquema de destino (jsonSchema.json) e a função definida pelo utilizador para a transformação (transformCSCtoJSON.js).

# Create Cloud Storage bucket and add files
resource "google_storage_bucket" "pipeline_files" {
  project       = var.project_number
  name          = "${var.basename}-${var.project_id}-files"
  location      = "US"
  force_destroy = true
  depends_on    = [google_project_service.all]
}

resource "google_storage_bucket_object" "json_schema" {
  name       = "jsonSchema.json"
  source     = "${path.module}/files/jsonSchema.json"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

resource "google_storage_bucket_object" "input_file" {
  name       = "inputFile.txt"
  source     = "${path.module}/files/inputFile.txt"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

resource "google_storage_bucket_object" "transform_CSVtoJSON" {
  name       = "transformCSVtoJSON.js"
  source     = "${path.module}/files/transformCSVtoJSON.js"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

Carregue o ficheiro DAG

Primeiro, usa uma origem de dados para determinar o caminho do contentor do Cloud Storage adequado para adicionar o ficheiro DAG e, em seguida, adiciona os ficheiros DAG ao contentor. O ficheiro DAG define os fluxos de trabalho, as dependências e os agendamentos para o Airflow orquestrar o seu pipeline.


data "google_composer_environment" "example" {
  project    = var.project_id
  region     = var.region
  name       = google_composer_environment.example.name
  depends_on = [google_composer_environment.example]
}

resource "google_storage_bucket_object" "dag_file" {
  name       = "dags/composer-dataflow-dag.py"
  source     = "${path.module}/files/composer-dataflow-dag.py"
  bucket     = replace(replace(data.google_composer_environment.example.config.0.dag_gcs_prefix, "gs://", ""),"/dags","")
  depends_on = [google_composer_environment.example, google_storage_bucket.pipeline_files, google_bigquery_table.weather_table]
}

Conclusão

Depois de executado, deve ter um ambiente do Composer configurado para executar tarefas de ETL nos dados apresentados no exemplo. Além disso, deve ter todo o código para modificar ou expandir esta solução de forma a adequar-se ao seu ambiente.