Ejecuta un DAG de análisis de datos en Google Cloud con datos de AWS

Managed Airflow (3ª gen.) | Managed Airflow (2ª gen.) | Managed Airflow (1ª gen. heredada)

Este instructivo es una modificación de Ejecuta un DAG de análisis de datos en Google Cloud que muestra cómo conectar tu entorno de Managed Airflow a Amazon Web Services para utilizar los datos almacenados allí. Muestra cómo usar Airflow administrado para crear un DAG de Apache Airflow. El DAG une datos de un conjunto de datos públicos de BigQuery y un archivo CSV almacenado en un bucket de Amazon Web Services (AWS) S3 y, luego, ejecuta un trabajo por lotes de Managed Service para Apache Spark para procesar los datos unidos.

El conjunto de datos públicos de BigQuery en este instructivo es ghcn_d, una base de datos integrada de resúmenes climáticos en todo el mundo. El archivo CSV contiene información sobre las fechas y los nombres de los días festivos de EE.UU. de 1997 a 2021.

La pregunta que queremos responder con el DAG es la siguiente: "¿Qué tan cálido estuvo en Chicago en el Día de Acción de Gracias durante los últimos 25 años?".

Objetivos

  • Crea un entorno de Managed Airflow con la configuración predeterminada.
  • Crea un bucket en AWS S3.
  • Crea un conjunto de datos vacío de BigQuery.
  • Crea un nuevo bucket de Cloud Storage.
  • Crea y ejecuta un DAG que incluya las siguientes tareas:
    • Carga un conjunto de datos externo de S3 a Cloud Storage.
    • Carga un conjunto de datos externo de Cloud Storage a BigQuery.
    • Une dos conjuntos de datos en BigQuery.
    • Ejecuta un trabajo de PySpark de análisis de datos.

Antes de comenzar

Administra permisos en AWS

  1. Crea una cuenta de AWS.

  2. Sigue la sección "Creación de políticas con el editor visual" del instructivo de AWS sobre cómo crear políticas de IAM para crear una política de IAM personalizada para AWS S3 con la siguiente configuración:

    • Servicio: S3
    • ListAllMyBuckets (s3:ListAllMyBuckets), para ver tu bucket de S3
    • CreateBucket (s3:CreateBucket), para crear un bucket
    • PutBucketOwnershipControls (s3:PutBucketOwnershipControls), para crear un bucket
    • ListBucket (s3:ListBucket), para otorgar permiso para enumerar objetos en un bucket de S3
    • PutObject (s3:PutObject), para subir archivos a un bucket
    • GetBucketVersioning (s3:GetBucketVersioning), para borrar un objeto en un bucket
    • DeleteObject (s3:DeleteObject), para borrar un objeto en un bucket
    • ListBucketVersions (s3:ListBucketVersions), para borrar un bucket
    • DeleteBucket (s3:DeleteBucket), para borrar un bucket
    • Recursos: Elige "Cualquiera" junto a "bucket" y "objeto" para otorgar permisos a cualquier recurso de ese tipo.
    • Etiqueta: Ninguno
    • Nombre: TutorialPolicy

    Consulta la lista de acciones compatibles con Amazon S3 para obtener más información sobre cada configuración.

  3. Agrega la política de IAM TutorialPolicy a tu identidad.

Habilita las APIs

Habilita las siguientes APIs:

Console

Habilita las APIs de Managed Service para Apache Spark, Managed Airflow, BigQuery y Cloud Storage.

Roles necesarios para habilitar las APIs

Para habilitar las APIs, necesitas el rol de IAM de administrador de Service Usage (roles/serviceusage.serviceUsageAdmin), que contiene el permiso serviceusage.services.enable. Obtén más información para otorgar roles.

Habilitar las API

gcloud

Habilita las APIs de Managed Service para Apache Spark, Managed Airflow, BigQuery y Cloud Storage:

Roles necesarios para habilitar las APIs

Para habilitar las APIs, necesitas el rol de IAM de administrador de Service Usage (roles/serviceusage.serviceUsageAdmin), que contiene el serviceusage.services.enable permiso. Obtén más información para otorgar roles.

gcloud services enable dataproc.googleapis.com  composer.googleapis.com  bigquery.googleapis.com  storage.googleapis.com

Otorgar permisos

Otorga los siguientes roles y permisos a tu cuenta de usuario:

Crea y prepara tu entorno de Managed Airflow

  1. Crea un entorno de Managed Airflow con parámetros predeterminados:

    • Elige una región de EE.UU.
    • Elige la versión más reciente de Managed Airflow.
  2. Otorga los siguientes roles a la cuenta de servicio que se usa en tu entorno de Managed Airflow para que los trabajadores de Airflow ejecuten correctamente las tareas de DAG:

    • Usuario de BigQuery (roles/bigquery.user)
    • Propietario de datos de BigQuery (roles/bigquery.dataOwner)
    • Usuario de cuenta de servicio (roles/iam.serviceAccountUser)
    • Editor de Dataproc (roles/dataproc.editor)
    • Trabajador de Dataproc (roles/dataproc.worker)

Crea y modifica recursos relacionados en Google Cloud

  1. Instala el apache-airflow-providers-amazon paquete de PyPI en tu entorno de Managed Airflow.

  2. Crea un conjunto de datos vacío de BigQuery con los siguientes parámetros:

    • Nombre: holiday_weather
    • Región: US
  3. Crea un bucket de Cloud Storage nuevo en la multirregión US.

  4. Ejecuta el siguiente comando para habilitar el Acceso privado a Google en la subred predeterminada de la región en la que deseas ejecutar Managed Service para Apache Spark para cumplir con los requisitos de red. Te recomendamos que uses la misma región que tu entorno de Managed Airflow.

    gcloud compute networks subnets update default \
        --region DATAPROC_SERVERLESS_REGION \
        --enable-private-ip-google-access
    

Crea recursos relacionados en AWS

Crea un bucket de S3 con la configuración predeterminada en la región que prefieras.

Conéctate a AWS desde Managed Airflow

  1. Obtén el ID de clave de acceso y la clave de acceso secreta de AWS.
  2. Agrega tu conexión de AWS S3 con la IU de Airflow:

    1. Ve a Administrador > Conexiones.
    2. Crea una conexión nueva con la siguiente configuración:

      • ID de conexión: aws_s3_connection
      • Tipo de conexión: Amazon S3
      • Extras (o JSON de campos adicionales): {"aws_access_key_id":"your_aws_access_key_id", "aws_secret_access_key": "your_aws_secret_access_key"}

Procesamiento de datos con Managed Service para Apache Spark

En esta sección, se describe el procesamiento de datos con Managed Service para Apache Spark.

Explora el trabajo de PySpark de ejemplo

El siguiente código es un ejemplo de trabajo de PySpark que convierte la temperatura de décimas de grado Celsius a grados Celsius. Este trabajo convierte los datos de temperatura del conjunto de datos en un formato diferente.

import sys


from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
from pyspark.sql.functions import col


if __name__ == "__main__":
    BUCKET_NAME = sys.argv[1]
    READ_TABLE = sys.argv[2]
    WRITE_TABLE = sys.argv[3]

    # Create a SparkSession, viewable via the Spark UI
    spark = SparkSession.builder.appName("data_processing").getOrCreate()

    # Load data into dataframe if READ_TABLE exists
    try:
        df = spark.read.format("bigquery").load(READ_TABLE)
    except Py4JJavaError as e:
        raise Exception(f"Error reading {READ_TABLE}") from e

    # Convert temperature from tenths of a degree in celsius to degrees celsius
    df = df.withColumn("value", col("value") / 10)
    # Display sample of rows
    df.show(n=20)

    # Write results to GCS
    if "--dry-run" in sys.argv:
        print("Data will not be uploaded to BigQuery")
    else:
        # Set GCS temp location
        temp_path = BUCKET_NAME

        # Saving the data to BigQuery using the "indirect path" method and the spark-bigquery connector
        # Uses the "overwrite" SaveMode to ensure DAG doesn't fail when being re-run
        # See https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes
        # for other save mode options
        df.write.format("bigquery").option("temporaryGcsBucket", temp_path).mode(
            "overwrite"
        ).save(WRITE_TABLE)
        print("Data written to BigQuery")

Sube el archivo PySpark a Cloud Storage

Para subir el archivo PySpark a Cloud Storage, sigue estos pasos:

  1. Guarda data_analytics_process.py en tu máquina local.

  2. En la Google Cloud consola, ve a la página Navegador de Cloud Storage:

    Ir al navegador de Cloud Storage

  3. Haz clic en el nombre del bucket que creaste.

  4. En la pestaña Objetos del bucket, haz clic en el botón Subir archivos , selecciona data_analytics_process.py en el diálogo que aparece y haz clic en Abrir.

Sube el archivo CSV a AWS S3

Para subir el archivo holidays.csv, sigue estos pasos:

  1. Guarda holidays.csv en tu máquina local.
  2. Sigue la guía de AWS para subir el archivo a tu bucket.

DAG de análisis de datos

En esta sección, se describe cómo configurar y usar el DAG de análisis de datos.

Explora el DAG de ejemplo

El DAG usa varios operadores para transformar y unificar los datos:

  • El S3ToGCSOperator transfiere el archivo holidays.csv de tu bucket de AWS S3 a tu bucket de Cloud Storage.

  • El GCSToBigQueryOperator transfiere el archivo holidays.csv de Cloud Storage a una tabla nueva en el conjunto de datos holidays_weather de BigQuery que creaste antes.

  • El DataprocCreateBatchOperator crea y ejecuta un trabajo por lotes de PySpark con Managed Service para Apache Spark.

  • El BigQueryInsertJobOperator une los datos de holidays.csv en la columna "Date" con los datos meteorológicos del conjunto de datos públicos de BigQuery ghcn_d. Las tareas de BigQueryInsertJobOperator se generan de forma dinámica con un bucle for, y estas tareas están en un TaskGroup para mejorar la legibilidad en la vista de gráfico de la IU de Airflow.

import datetime

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
    GCSToBigQueryOperator,
)
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.utils.task_group import TaskGroup

PROJECT_NAME = "{{var.value.gcp_project}}"
REGION = "{{var.value.gce_region}}"

# BigQuery configs
BQ_DESTINATION_DATASET_NAME = "holiday_weather"
BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
BQ_NORMALIZED_TABLE_NAME = "holidays_weather_normalized"

# Dataproc configs
BUCKET_NAME = "{{var.value.gcs_bucket}}"
PYSPARK_JAR = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/data_analytics_process.py"

# S3 configs
S3_BUCKET_NAME = "{{var.value.s3_bucket}}"

BATCH_ID = "data-processing-{{ ts_nodash | lower}}"  # Dataproc serverless only allows lowercase characters
BATCH_CONFIG = {
    "pyspark_batch": {
        "jar_file_uris": [PYSPARK_JAR],
        "main_python_file_uri": PROCESSING_PYTHON_FILE,
        "args": [
            BUCKET_NAME,
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_DESTINATION_TABLE_NAME}",
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_NORMALIZED_TABLE_NAME}",
        ],
    },
    "environment_config": {
        "execution_config": {
            "service_account": "{{var.value.dataproc_service_account}}"
        }
    },
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
}

with models.DAG(
    "s3_to_gcs_dag",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    s3_to_gcs_op = S3ToGCSOperator(
        task_id="s3_to_gcs",
        bucket=S3_BUCKET_NAME,
        gcp_conn_id="google_cloud_default",
        aws_conn_id="aws_s3_connection",
        dest_gcs=f"gs://{BUCKET_NAME}",
    )

    create_batch = dataproc.DataprocCreateBatchOperator(
        task_id="create_batch",
        project_id=PROJECT_NAME,
        region=REGION,
        batch=BATCH_CONFIG,
        batch_id=BATCH_ID,
    )

    load_external_dataset = GCSToBigQueryOperator(
        task_id="run_bq_external_ingestion",
        bucket=BUCKET_NAME,
        source_objects=["holidays.csv"],
        destination_project_dataset_table=f"{BQ_DESTINATION_DATASET_NAME}.holidays",
        source_format="CSV",
        schema_fields=[
            {"name": "Date", "type": "DATE"},
            {"name": "Holiday", "type": "STRING"},
        ],
        skip_leading_rows=1,
        write_disposition="WRITE_TRUNCATE",
    )

    with TaskGroup("join_bq_datasets") as bq_join_group:
        for year in range(1997, 2022):
            BQ_DATASET_NAME = f"bigquery-public-data.ghcn_d.ghcnd_{str(year)}"
            BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
            # Specifically query a Chicago weather station
            WEATHER_HOLIDAYS_JOIN_QUERY = f"""
            SELECT Holidays.Date, Holiday, id, element, value
            FROM `{PROJECT_NAME}.holiday_weather.holidays` AS Holidays
            JOIN (SELECT id, date, element, value FROM {BQ_DATASET_NAME} AS Table
            WHERE Table.element="TMAX" AND Table.id="USW00094846") AS Weather
            ON Holidays.Date = Weather.Date;
            """

            # For demo purposes we are using WRITE_APPEND
            # but if you run the DAG repeatedly it will continue to append
            # Your use case may be different, see the Job docs
            # https://cloud.google.com/bigquery/docs/reference/rest/v2/Job
            # for alternative values for the writeDisposition
            # or consider using partitioned tables
            # https://cloud.google.com/bigquery/docs/partitioned-tables
            bq_join_holidays_weather_data = BigQueryInsertJobOperator(
                task_id=f"bq_join_holidays_weather_data_{str(year)}",
                configuration={
                    "query": {
                        "query": WEATHER_HOLIDAYS_JOIN_QUERY,
                        "useLegacySql": False,
                        "destinationTable": {
                            "projectId": PROJECT_NAME,
                            "datasetId": BQ_DESTINATION_DATASET_NAME,
                            "tableId": BQ_DESTINATION_TABLE_NAME,
                        },
                        "writeDisposition": "WRITE_APPEND",
                    }
                },
                location="US",
            )

        s3_to_gcs_op >> load_external_dataset >> bq_join_group >> create_batch

Usa la IU de Airflow para agregar variables

En Airflow, las variables son una forma universal de almacenar y recuperar parámetros o configuraciones arbitrarias como un almacén de pares clave-valor simple. Este DAG usa variables de Airflow para almacenar valores comunes. Para agregarlas a tu entorno, sigue estos pasos:

  1. Accede a la IU de Airflow desde Google Cloud la consola.

  2. Ve a Administrador > Variables.

  3. Agrega las siguientes variables:

    • s3_bucket: el nombre del bucket de S3 que creaste antes.

    • gcp_project: ID del proyecto

    • gcs_bucket: el nombre del bucket que creaste antes (sin el prefijo gs://).

    • gce_region: la región en la que deseas que tu trabajo de Managed Service para Apache Spark cumpla con los requisitos de red de Managed Service para Apache Spark . Esta es la región en la que habilitaste el Acceso privado a Google antes.

    • dataproc_service_account: la cuenta de servicio de tu entorno de Managed Airflow. Puedes encontrar esta cuenta de servicio en la pestaña de configuración del entorno de tu entorno de Managed Airflow.

Sube el DAG al bucket de tu entorno

Managed Airflow programa los DAG que se encuentran en la carpeta /dags del bucket de tu entorno. Para subir el DAG con la Google Cloud consola, sigue estos pasos:

  1. En tu máquina local, guarda s3togcsoperator_tutorial.py.

  2. En la Google Cloud consola, ve a la página Entornos.

    Ir a Entornos

  3. En la lista de entornos, en la columna Carpeta de DAG , haz clic en el vínculo DAG. Se abrirá la carpeta de DAG de tu entorno.

  4. Haz clic en Subir archivos.

  5. Selecciona s3togcsoperator_tutorial.py en tu máquina local y haz clic en Abrir.

Activa el DAG

  1. En tu entorno de Managed Airflow, haz clic en la pestaña DAGs.

  2. Haz clic en el ID de DAG s3_to_gcs_dag.

  3. Haz clic en Activar DAG.

  4. Espera entre cinco y diez minutos hasta que veas una marca de verificación verde que indique que las tareas se completaron correctamente.

Valida el éxito del DAG

  1. En la Google Cloud consola, ve a la página BigQuery.

    Ir a BigQuery

  2. En el panel Explorador, haz clic en el nombre de tu proyecto.

  3. Haz clic en holidays_weather_joined.

  4. Haz clic en la vista previa para ver la tabla resultante. Ten en cuenta que los números de la columna de valores están en décimas de grado Celsius.

  5. Haz clic en holidays_weather_normalized.

  6. Haz clic en la vista previa para ver la tabla resultante. Ten en cuenta que los números de la columna de valores están en grados Celsius.

Limpieza

Borra los recursos individuales que creaste para este instructivo:

¿Qué sigue?