Executar um DAG de análise de dados em Google Cloud

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Este tutorial mostra como usar o Cloud Composer para criar um DAG do Apache Airflow. O DAG une dados de um conjunto de dados públicos do BigQuery e um arquivo CSV armazenado em um bucket do Cloud Storage e, em seguida, executa um job em lote do Managed Service for Apache Spark para processar os dados unidos.

O conjunto de dados públicos do BigQuery neste tutorial é ghcn_d, um banco de dados integrado de resumos climáticos em todo o mundo. O arquivo CSV contém informações sobre as datas e os nomes dos feriados dos EUA de 1997 a 2021.

A pergunta que queremos responder usando o DAG é: "Qual foi a temperatura em Chicago no Dia de Ação de Graças nos últimos 25 anos?"

Objetivos

  • Criar um ambiente do Cloud Composer na configuração padrão
  • Criar um conjunto de dados vazio do BigQuery
  • Criar um novo bucket do Cloud Storage
  • Criar e executar um DAG que inclua as seguintes tarefas:
    • Carregar um conjunto de dados externo do Cloud Storage para o BigQuery
    • Unir dois conjuntos de dados no BigQuery
    • Executar um job de análise de dados do PySpark

Antes de começar

Ativar APIs

Ative as APIs a seguir:

Console

Ative as APIs Managed Service for Apache Spark, Cloud Composer, BigQuery e Cloud Storage.

Funções necessárias para ativar APIs

Para ativar as APIs, é necessário ter o papel do IAM de administrador de uso do serviço (roles/serviceusage.serviceUsageAdmin), que contém a permissão serviceusage.services.enable. Saiba como conceder papéis.

Ativar as APIs

gcloud

Ative as APIs Managed Service for Apache Spark, Cloud Composer, BigQuery e Cloud Storage:

Funções necessárias para ativar APIs

Para ativar as APIs, é necessário ter o papel do IAM de administrador de Service Usage role (roles/serviceusage.serviceUsageAdmin), que contém a serviceusage.services.enable permissão. Saiba como conceder papéis.

gcloud services enable dataproc.googleapis.com  composer.googleapis.com  bigquery.googleapis.com  storage.googleapis.com

Conceder permissões

Conceda os seguintes papéis e permissões à sua conta de usuário:

Criar e preparar o ambiente do Cloud Composer

  1. Crie um ambiente do Cloud Composer com parâmetros padrão:

    • Escolha uma região dos EUA.
    • Escolha a versão mais recente do Cloud Composer.
  2. Conceda os seguintes papéis à conta de serviço usada no ambiente do Cloud Composer para que os workers do Airflow executem as tarefas do DAG:

    • Usuário do BigQuery (roles/bigquery.user)
    • Proprietário de dados do BigQuery (roles/bigquery.dataOwner)
    • Usuário da conta de serviço (roles/iam.serviceAccountUser)
    • Editor do Dataproc (roles/dataproc.editor)
    • Worker do Dataproc (roles/dataproc.worker)
  1. Crie um conjunto de dados vazio do BigQuery com os seguintes parâmetros:

    • Nome: holiday_weather
    • Região: US
  2. Crie um novo bucket do Cloud Storage na multirregião US.

  3. Execute o comando a seguir para ativar o Acesso privado do Google na sub-rede padrão na região em que você quer executar o Managed Service for Apache Spark para atender aos requisitos de rede. Recomendamos usar a mesma região do ambiente do Cloud Composer.

    gcloud compute networks subnets update default \
        --region DATAPROC_SERVERLESS_REGION \
        --enable-private-ip-google-access
    

Processamento de dados usando o Managed Service for Apache Spark

Conferir o exemplo de job do PySpark

O código mostrado abaixo é um exemplo de job do PySpark que converte a temperatura de décimos de grau Celsius para graus Celsius. Esse job converte dados de temperatura do conjunto de dados em um formato diferente.

import sys


from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
from pyspark.sql.functions import col


if __name__ == "__main__":
    BUCKET_NAME = sys.argv[1]
    READ_TABLE = sys.argv[2]
    WRITE_TABLE = sys.argv[3]

    # Create a SparkSession, viewable via the Spark UI
    spark = SparkSession.builder.appName("data_processing").getOrCreate()

    # Load data into dataframe if READ_TABLE exists
    try:
        df = spark.read.format("bigquery").load(READ_TABLE)
    except Py4JJavaError as e:
        raise Exception(f"Error reading {READ_TABLE}") from e

    # Convert temperature from tenths of a degree in celsius to degrees celsius
    df = df.withColumn("value", col("value") / 10)
    # Display sample of rows
    df.show(n=20)

    # Write results to GCS
    if "--dry-run" in sys.argv:
        print("Data will not be uploaded to BigQuery")
    else:
        # Set GCS temp location
        temp_path = BUCKET_NAME

        # Saving the data to BigQuery using the "indirect path" method and the spark-bigquery connector
        # Uses the "overwrite" SaveMode to ensure DAG doesn't fail when being re-run
        # See https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes
        # for other save mode options
        df.write.format("bigquery").option("temporaryGcsBucket", temp_path).mode(
            "overwrite"
        ).save(WRITE_TABLE)
        print("Data written to BigQuery")

Fazer upload de arquivos de suporte para o Cloud Storage

Para fazer upload do arquivo do PySpark e do conjunto de dados armazenado em holidays.csv:

  1. Salve data_analytics_process.py na sua máquina local.

  2. Salve holidays.csv na sua máquina local.

  3. No Google Cloud console, acesse a página Navegador do Cloud Storage:

    Ir para o navegador do Cloud Storage

  4. Clique no nome do bucket que você criou.

  5. Na guia Objetos do bucket, clique no botão Fazer o upload de arquivos , selecione data_analytics_process.py e holidays.csv na caixa de diálogo exibida e clique em Abrir.

DAG de análise de dados

Conferir o exemplo de DAG

O DAG usa vários operadores para transformar e unificar os dados:

  • O GCSToBigQueryOperator ingere o arquivo holidays.csv do Cloud Storage para uma nova tabela no conjunto de dados BigQuery holidays_weather que você criou.

  • O DataprocCreateBatchOperator cria e executa um job em lote do PySpark usando Managed Service for Apache Spark.

  • O BigQueryInsertJobOperator une os dados de holidays.csv na coluna "Date" com dados climáticos do conjunto de dados públicos do BigQuery ghcn_d. As tarefas BigQueryInsertJobOperator são geradas dinamicamente usando um loop for, e essas tarefas estão em um TaskGroup para melhor legibilidade na visualização de gráfico da interface do Airflow.

import datetime

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
    GCSToBigQueryOperator,
)
from airflow.utils.task_group import TaskGroup

PROJECT_NAME = "{{var.value.gcp_project}}"

# BigQuery configs
BQ_DESTINATION_DATASET_NAME = "holiday_weather"
BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
BQ_NORMALIZED_TABLE_NAME = "holidays_weather_normalized"

# Dataproc configs
BUCKET_NAME = "{{var.value.gcs_bucket}}"
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/data_analytics_process.py"

BATCH_ID = "data-processing-{{ ts_nodash | lower}}"  # Dataproc serverless only allows lowercase characters
BATCH_CONFIG = {
    "runtime_config": {"version": "1.1"},
    "pyspark_batch": {
        "main_python_file_uri": PROCESSING_PYTHON_FILE,
        "args": [
            BUCKET_NAME,
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_DESTINATION_TABLE_NAME}",
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_NORMALIZED_TABLE_NAME}",
        ],
    },
    "environment_config": {
        "execution_config": {
            "service_account": "{{var.value.dataproc_service_account}}"
        }
    },
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
}

with models.DAG(
    "data_analytics_dag",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    create_batch = dataproc.DataprocCreateBatchOperator(
        task_id="create_batch",
        project_id=PROJECT_NAME,
        region="{{ var.value.gce_region }}",
        batch=BATCH_CONFIG,
        batch_id=BATCH_ID,
    )
    # This data is static and it is safe to use WRITE_TRUNCATE
    # to reduce chance of 409 duplicate errors
    load_external_dataset = GCSToBigQueryOperator(
        task_id="run_bq_external_ingestion",
        bucket=BUCKET_NAME,
        source_objects=["holidays.csv"],
        destination_project_dataset_table=f"{BQ_DESTINATION_DATASET_NAME}.holidays",
        source_format="CSV",
        schema_fields=[
            {"name": "Date", "type": "DATE"},
            {"name": "Holiday", "type": "STRING"},
        ],
        skip_leading_rows=1,
        write_disposition="WRITE_TRUNCATE",
    )

    with TaskGroup("join_bq_datasets") as bq_join_group:
        for year in range(1997, 2022):
            # BigQuery configs
            BQ_DATASET_NAME = f"bigquery-public-data.ghcn_d.ghcnd_{str(year)}"
            BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
            # Specifically query a Chicago weather station
            WEATHER_HOLIDAYS_JOIN_QUERY = f"""
            SELECT Holidays.Date, Holiday, id, element, value
            FROM `{PROJECT_NAME}.holiday_weather.holidays` AS Holidays
            JOIN (SELECT id, date, element, value FROM {BQ_DATASET_NAME} AS Table WHERE Table.element="TMAX" AND Table.id="USW00094846") AS Weather
            ON Holidays.Date = Weather.Date;
            """

            # for demo purposes we are using WRITE_APPEND
            # but if you run the DAG repeatedly it will continue to append
            # Your use case may be different, see the Job docs
            # https://cloud.google.com/bigquery/docs/reference/rest/v2/Job
            # for alternative values for the writeDisposition
            # or consider using partitioned tables
            # https://cloud.google.com/bigquery/docs/partitioned-tables
            bq_join_holidays_weather_data = BigQueryInsertJobOperator(
                task_id=f"bq_join_holidays_weather_data_{str(year)}",
                configuration={
                    "query": {
                        "query": WEATHER_HOLIDAYS_JOIN_QUERY,
                        "useLegacySql": False,
                        "destinationTable": {
                            "projectId": PROJECT_NAME,
                            "datasetId": BQ_DESTINATION_DATASET_NAME,
                            "tableId": BQ_DESTINATION_TABLE_NAME,
                        },
                        "writeDisposition": "WRITE_APPEND",
                    }
                },
                location="US",
            )

        load_external_dataset >> bq_join_group >> create_batch

Usar a interface do Airflow para adicionar variáveis

No Airflow, variáveis são uma maneira universal de armazenar e recuperar configurações arbitrárias como um armazenamento de chave-valor simples. Esse DAG usa variáveis do Airflow para armazenar valores comuns. Para adicioná-las ao ambiente:

  1. Acesse a interface do Airflow no console do Cloud Composer.

  2. Acesse Admin > Variables.

  3. Adicione as seguintes variáveis:

    • gcp_project: o ID do projeto.

    • gcs_bucket: o nome do bucket que você criou (sem o prefixo gs://).

    • gce_region: a região em que você quer que o job do Managed Service for Apache Spark atenda aos requisitos de rede do Managed Service for Apache Spark. Essa é a região em que você ativou o Acesso privado do Google.

    • dataproc_service_account: a conta de serviço do ambiente do Cloud Composer. Essa conta de serviço pode ser encontrada na guia de configuração do ambiente do Cloud Composer.

Fazer upload do DAG para o bucket do ambiente

O Cloud Composer agenda DAGs que estão na pasta /dags no bucket do ambiente. Para fazer upload do DAG usando o Google Cloud console:

  1. Na sua máquina local, salve data_analytics_dag.py.

  2. No Google Cloud console, acesse a página Ambientes.

    Acessar "Ambientes"

  3. Na lista de ambientes, na coluna Pasta de DAGs , clique no link DAGs. A pasta DAGs do ambiente será aberta.

  4. Clique em Fazer o upload dos arquivos.

  5. Selecione data_analytics_dag.py na sua máquina local e clique em Abrir.

Acionar o DAG

  1. No ambiente do Cloud Composer, clique na guia DAGs.

  2. Clique no ID do DAG data_analytics_dag.

  3. Clique em DAG de gatilho.

  4. Aguarde de cinco a dez minutos até que uma marca de seleção verde indique que as tarefas foram concluídas.

Validar a conclusão do DAG

  1. No Google Cloud console, acesse a página BigQuery.

    Acessar o BigQuery

  2. No painel Explorer, clique no nome do projeto.

  3. Clique em holidays_weather_joined.

  4. Clique em "Visualização" para conferir a tabela resultante. Os números na coluna de valor estão em décimos de grau Celsius.

  5. Clique em holidays_weather_normalized.

  6. Clique em "Visualização" para conferir a tabela resultante. Os números na coluna de valor estão em graus Celsius.

Aprofundamento com o Managed Service for Apache Spark (opcional)

Você pode testar uma versão avançada desse DAG com um fluxo de processamento de dados do PySpark mais complexo. Consulte a extensão do Managed Service for Apache Spark para o exemplo de análise de dados no GitHub.

Revisão dos dados

Exclua os recursos individuais criados para este tutorial:

A seguir