Crear canalizaciones de ingeniería de datos

En esta guía, se describe cómo crear y, luego, implementar una canalización de orquestación en la extensión del kit de agentes de datos de Google Cloud para Visual Studio Code.

La canalización de ejemplo ejecuta una secuencia de comandos de PySpark en Managed Service para Apache Spark.

Puedes implementar canalizaciones de orquestación desde VS Code como versiones locales o a través de una acción de GitHub, por ejemplo, cuando se combinan cambios en la rama main. En este documento, se muestra cómo implementar la versión local de una canalización de orquestación.

Antes de comenzar

Antes de comenzar, completa los siguientes pasos:

  1. Instala la extensión Data Agent Kit para VS Code.
  2. Configura tus parámetros.
  3. Agrega un repositorio de GitHub a tu espacio de trabajo de VS Code para almacenar canalizaciones de orquestación y recursos, como secuencias de comandos.

Revisa los roles de IAM obligatorios

Para obtener los permisos necesarios para crear recursos en tu proyecto, implementar y ejecutar canalizaciones de orquestación, pídele a tu administrador que te otorgue los roles requeridos.

Para crear y administrar entornos de Managed Service para Apache Airflow, y administrar objetos en sus buckets asociados, necesitas los siguientes roles. Para obtener más información sobre estos roles de usuario, consulta Otorga roles a los usuarios en la documentación de Managed Service para Apache Airflow.

  • Administrador de objetos de almacenamiento y entorno (composer.environmentAndStorageObjectAdmin)
  • Usuario de cuenta de servicio (iam.serviceAccountUser)

Para trabajar con recursos de BigQuery y Cloud Storage, necesitas los siguientes roles.

  • Editor de datos de BigQuery (roles/bigquery.dataEditor)
  • Storage Object Admin (roles/storage.objectAdmin)

Según los recursos a los que planeas acceder, es posible que necesites roles adicionales más allá de los que te permiten usar la extensión y trabajar con las canalizaciones de orquestación.

Crea una cuenta de servicio y otórgale roles de IAM

Usa una cuenta de servicio única para el entorno de Airflow administrado (3ª gen.). La cuenta de servicio crea un entorno de Managed Airflow Gen 3 y ejecuta todas las canalizaciones de organización que implementes.

Pídele a tu administrador que complete los siguientes pasos:

  1. Crea una cuenta de servicio como se describe en la documentación de IAM.
  2. Otorga el rol de Trabajador de Composer (composer.worker) a la cuenta de servicio. Este rol proporciona los permisos necesarios en la mayoría de los casos.

Como práctica recomendada, si necesitas acceder a otros recursos en tuGoogle Cloud proyecto, otorga permisos adicionales a esta cuenta de servicio solo cuando sea necesario para la operación de la canalización de orquestación.

Crea recursos Google Cloud para tu canalización de orquestación

En este paso, crearás recursos Google Cloud para tu canalización de orquestación.

Crear un entorno de Airflow administrado (3ª gen.)

Crea un entorno de Airflow administrado (3ª gen.) con la siguiente configuración:

  • Nombre del entorno: Ingresa un nombre que usarás más adelante para configurar la canalización de orquestación. Por ejemplo, example-pipeline-scheduler
  • Ubicación: Selecciona una ubicación. Recomendamos crear todos los recursos de esta guía en la misma ubicación. Por ejemplo, us-central1
  • Cuenta de servicio: Selecciona la cuenta de servicio que creaste para este entorno.

En el siguiente ejemplo de comando de Google Cloud CLI, se muestra la sintaxis:

gcloud composer environments create example-pipeline-scheduler \
  --location us-central1 \
  --image-version composer-3-airflow-2 \
  --service-account "example-account@example-project.iam.gserviceaccount.com"

Agrega parámetros de entorno a la configuración del programador

Proporciona los detalles de conexión para el entorno de Managed Airflow que ejecutará tu canalización de orquestación.

Agrega los parámetros de configuración del entorno que creaste con el editor de configuración del kit del agente de datos de Google Cloud:

  1. Haz clic en el ícono de Google Cloud Data Agent Kit en la barra de actividades.
  2. Expande Configuración y, luego, haz clic en Configuración.
  3. Selecciona Scheduler.
  4. Ingresa los parámetros del entorno de Airflow administrado de 3ª gen. que creaste antes:
    • ID del proyecto: Es el nombre del proyecto en el que se encuentra el entorno. Ejemplo: example-project.
    • Región: Es la región en la que se encuentra el entorno. Ejemplo: us-central1.
    • Entorno: Es el nombre del entorno. Ejemplo: example-pipeline-scheduler.
  5. Haz clic en Guardar.

Crea un bucket para los artefactos de la canalización

Crea un bucket de Cloud Storage en el mismo proyecto que el entorno de Airflow administrado y asígnale un nombre similar a example-pipelines-bucket. Este bucket es necesario para almacenar tu trabajo de Managed Service para Apache Spark.

Algunas acciones de la canalización, como el envío de los resultados a un bucket de Cloud Storage

Crea un nuevo conjunto de datos y una tabla en BigQuery

En esta guía, se muestra una canalización que escribe datos en una tabla de BigQuery. Crea los siguientes recursos de BigQuery en tu proyecto:

  1. Crea un conjunto de datos nuevo llamado wordcount_dataset.
  2. Crea una tabla de BigQuery nueva llamada wordcount_output.

Agrega recursos de canalización

En esta guía, se muestra una tarea común de ingeniería de datos (ETL: extracción, transformación y carga) con PySpark, que lee datos de BigQuery, los transforma (recuento de palabras) y los vuelve a cargar en BigQuery.

Sin agente

Agrega el siguiente archivo a la carpeta /scripts de tu repositorio. Más adelante, agregarás una acción de canalización que ejecute esta secuencia de comandos en Managed Service para Apache Spark.

Archivo wordcount.py de ejemplo:

#!/usr/bin/python
"""BigQuery I/O PySpark example for Word Count"""

from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.appName('spark-bigquery-demo') \
.getOrCreate()

# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the connector.
bucket = ARTIFACTS_BUCKET_NAME
spark.conf.set('temporaryGcsBucket', bucket)

# Load data from BigQuery public dataset (Shakespeare).
words = spark.read.format('bigquery') \
.option('table', 'bigquery-public-data:samples.shakespeare') \
.load()
words.createOrReplaceTempView('words')

# Perform word count using Spark SQL.
# This query counts occurrences of each word.
word_count = spark.sql(
    'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word ORDER BY word_count DESC'
)
word_count.show()
word_count.printSchema()

# Saving the results to a new table in BigQuery.
# Replace YOUR_PROJECT_ID with your project ID.
destination_table = 'PROJECT_ID:wordcount_dataset.wordcount_output'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.mode('overwrite') \
.save()

print(f"Successfully wrote word counts to BigQuery table: {destination_table}")

Reemplaza lo siguiente:

  • ARTIFACTS_BUCKET_NAME: Es el nombre del bucket de Cloud Storage que creaste. Ejemplo: example-pipelines-bucket.
  • PROJECT_ID: Es el nombre del proyecto en el que reside el entorno. Ejemplo: example-project.

Agentic

Pídele al agente que genere una muestra de secuencia de comandos de PySpark en la carpeta /scripts de tu repositorio. Más adelante, agregarás una acción de canalización que ejecute este script en Managed Service para Apache Spark.

Ingresa una instrucción similar a la siguiente:

I want to create a PySpark script that does the following:

1. Loads data from the bigquery-public-data:samples.shakespeare.
2. Counts occurrences of each word across all works using a Spark SQL query.
Sum the existing word counts for each word to get the total occurrences.
I want the results to be ordered by the word popularity, most popular first.
3. Saves results to a new table in BigQuery, in my project.

My project is sample-project, the destination table is
wordcount_dataset.wordcount_output, and I want to store temporary BigQuery
export data in example-pipelines-bucket.

Save the resulting script to /scripts as wordcount.py

Inicializa canalizaciones de organización en tu repositorio

Cuando inicializas canalizaciones de orquestación, la extensión Data Agent Kit para VS Code crea un andamiaje que incluye lo siguiente:

  • Un archivo YAML de canalización de orquestación: Es un ejemplo de definición de canalización que contiene una programación, pero no acciones definidas.
  • deployment.yaml: Es un ejemplo de configuración de implementación de canalización que define cómo se debe implementar tu canalización. En este archivo, se muestra la configuración requerida para el entorno de Managed Airflow, el bucket de artefactos y cualquier otro recurso que usen las acciones de tu canalización.
  • .github/workflows/deploy.yaml: Configura una acción de GitHub que implementa tu canalización cuando combinas cambios en la rama main de tu repositorio de GitHub.
  • .github/workflows/validate.yaml: Configura una acción de GitHub que valida tu canalización después de que se implementa.

En los pasos posteriores de este documento, expandirás estas definiciones con la extensión Data Agent Kit para VS Code para crear e implementar una canalización de orquestación de forma local.

Sin agente

Para inicializar las canalizaciones de orquestación, haz lo siguiente:

  1. Haz clic en el ícono de Google Cloud Data Agent Kit en la barra de actividades.
  2. Expande Ingeniería de datos y, luego, haz clic en Inicializar canalización de orquestación.
  3. Ingresa los parámetros de la nueva canalización de orquestación:
  4. ID de canalización: Ingresa el ID de tu canalización. Ejemplo: example-pipeline.
  5. ID del proyecto de Google Cloud: Es el nombre del proyecto en el que reside el entorno. Ejemplo: example-project.
  6. Región: Es la región en la que reside tu entorno. Ejemplo: us-central1.
  7. ID del entorno: Es el nombre del entorno con el que deseas desarrollar. Ejemplo: dev/staging.
  8. Scheduler Managed Service for Apache Airflow Environment: Es el nombre del entorno en el que deseas organizar tus canalizaciones. Para este documento, especifica el mismo entorno en este parámetro.

  9. Artifacts Bucket: Es el nombre del bucket que se usa para los artefactos de la canalización, sin el prefijo gs://. Ejemplo: example-pipelines-bucket.

  10. Haz clic en Siguiente.

  11. Haz clic en Inicializar.

  12. Especifica un espacio de trabajo en el que deseas inicializar la canalización.

Agentic

Pídele al agente que cree un andamio para las canalizaciones de organización de tu repositorio.

Ingresa una instrucción similar a la siguiente:

Initialize orchestration pipelines in my repository. Don't add any actions
or schedule yet. I want to do it later.

The pipeline is my-sample-pipeline, the project ID is my-project, and the
region is us-central1.

The environment ID is my-test-environment. Use the same environment ID for
the Scheduler Managed Service.

Store pipeline artifacts in example-pipelines-bucket.

Después de inicializar las canalizaciones en tu repositorio, no podrás volver a hacerlo, ya que el nuevo andamiaje sobrescribirá los cambios de configuración que hayas realizado. Puedes agregar canalizaciones nuevas creando archivos de definición de canalización nuevos en tu proyecto y agregándolos a la configuración de implementación.

Agrega una tarea nueva a la canalización

Como la configuración inicial de la canalización no tiene ninguna acción, le agregarás una acción que ejecute tu secuencia de comandos de PySpark.

Sin agente

Para editar una canalización, haz lo siguiente:

  1. Haz clic en el ícono de Google Cloud Data Agent Kit en la barra de actividades.
  2. Expande Ingeniería de datos y, luego, Canalizaciones de organización.
  3. Selecciona example-pipeline.yaml. Se abrirá un editor de canalizaciones para la canalización seleccionada.
  4. Opcional: Selecciona el nodo Schedule trigger. Puedes ajustar la programación de tu canalización especificando una expresión similar a cron y las horas de inicio y finalización de la programación. El programa predeterminado para la canalización recién inicializada es 0 2 * * *, que se ejecuta a las 2 a.m. todos los días.
  1. Agregar una nueva tarea En esta guía, agregarás una tarea de PySpark que ejecuta un script de PySpark que agregaste anteriormente:

    1. Haz clic en Agregar primera tarea para agregar un nodo de tarea nuevo.
    2. Selecciona Ejecutar secuencia de comandos de PySpark y el archivo script/wordcount.py.

    Se abrirá el panel Ejecutar secuencia de comandos de PySpark.

    1. En el modo de clúster de Spark, selecciona Serverless Spark.
    2. En Ubicación, especifica la ubicación en la que reside tu entorno. Ejemplo: us-central1.
    3. Haz clic en Guardar.

Agentic

Ejecuta la siguiente instrucción:

Add the wordcount.py script to the pipeline. I want to run it in Serverless
Spark every day at 1 AM. Run it in the same region where the environment that
runs my pipeline is located. Use the minimal resource profile.

Implementa la versión local de la canalización

Implementa la versión local de la canalización para confirmar que esté configurada correctamente.

Cuando implementas una versión local de la canalización de orquestación, la extensión Data Agent Kit para VS Code sube una versión local del paquete de la canalización al entorno de Airflow administrado y la ejecuta. La implementación local está diseñada para usarse cuando se trabaja en un entorno de desarrollo.

El comando de implementación implementa un programa que no está en pausa. Para evitar esto, puedes pausar la programación manualmente en el panel Administración de canalizaciones. También puedes editar el archivo YAML de tu canalización para comentar o quitar el bloque triggers: - schedule.

Sin agente

Para implementar una versión local de la canalización de orquestación de ejemplo, haz lo siguiente:

  1. Haz clic en el ícono de Google Cloud Data Agent Kit en la barra de actividades.
  2. Expande Ingeniería de datos y, luego, Canalizaciones de organización.
  3. Selecciona example-pipeline.yaml. Se abrirá un editor de canalizaciones para la canalización seleccionada.
  4. Selecciona Ejecutar canalización y, luego, selecciona el entorno de desarrollo o de pruebas que creaste anteriormente.

Agentic

Ejecuta la siguiente instrucción:

Deploy my pipeline

Supervisa la ejecución de la canalización y comprueba los registros de ejecución

Después de implementar tu canalización, puedes ver la información detallada, el historial de las ejecuciones de la canalización y los registros de ejecución de la canalización:

  1. Haz clic en el ícono de Google Cloud Data Agent Kit en la barra de actividades.
  2. Expande Ingeniería de datos y, luego, selecciona Administración de canalizaciones.
  3. Haz clic en el nombre de tu canalización (example-pipeline) para ver su historial de ejecución. En la lista de ejecuciones para una fecha específica, puedes ver las ejecuciones de canalizaciones individuales y el desglose de las acciones individuales dentro de cada ejecución de canalización.
  4. Haz clic en un ID de tarea para ver los registros de ejecución de la tarea. Dado que la secuencia de comandos de PySpark de ejemplo se ejecutó en Managed Service para Apache Spark, los registros de tareas tendrán un vínculo a los registros de Batch.

Soluciona problemas y corrige errores de canalizaciones

Cuando falla tu canalización, verás un botón Diagnose en el panel Pipelines management.

Agentic

Cuando haces clic en el botón Diagnose, el agente genera una instrucción para solucionar la falla de la canalización. La instrucción se copia en el portapapeles o se abre en una nueva sesión de chat.

El agente usa habilidades especializadas para solucionar problemas de canalizaciones, enfocándose en recopilar registros, verificar el código implementado y el espacio de trabajo, y generar un análisis de la causa raíz (RCA).

Estos son los posibles próximos pasos después de recibir el RCA:

  • Aplica el análisis de causa raíz en el espacio de trabajo actual.
  • Pídele al agente que cree una rama nueva y aplique los cambios allí.
  • Abre un ticket de Atención al cliente de Cloud con los detalles del RCA.

Para obtener ayuda para solucionar problemas con la extensión, consulta Soluciona problemas de la extensión Data Agent Kit para VS Code.

¿Qué sigue?