Crear canalizaciones de ingeniería de datos

En esta guía, se describe cómo crear e implementar una canalización de organización en la extensión Google Cloud Data Agent Kit para Visual Studio Code.

La canalización de ejemplo ejecuta una secuencia de comandos de PySpark en Managed Service para Apache Spark.

Puedes implementar canalizaciones de organización desde VS Code como versiones locales o a través de una acción de GitHub, como cuando se combinan cambios en la rama main. En este documento, se muestra cómo implementar la versión local de una canalización de organización.

Antes de comenzar

Antes de comenzar, completa lo siguiente:

  1. Instala la extensión Data Agent Kit para VS Code.
  2. Define tu configuración.
  3. Agrega un repositorio de GitHub a tu espacio de trabajo de VS Code para almacenar canalizaciones de organización y recursos, como secuencias de comandos.

Revisa los roles de IAM necesarios

Para obtener los permisos para crear recursos en tu proyecto, implementar y ejecutar canalizaciones de organización, pídele a tu administrador que te otorgue los roles necesarios.

Para crear y administrar entornos de Managed Service para Apache Airflow y administrar objetos en sus buckets asociados, necesitas los siguientes roles. Si quieres obtener más información sobre estos roles de usuario, consulta Otorgar roles a los usuarios en la documentación de Managed Service para Apache Airflow.

  • Administrador de objetos de almacenamiento y entorno (composer.environmentAndStorageObjectAdmin)
  • Usuario de cuenta de servicio (iam.serviceAccountUser)

Para trabajar con recursos de BigQuery y Cloud Storage, necesitas los siguientes roles.

  • Editor de datos de BigQuery (roles/bigquery.dataEditor)
  • Storage Object Admin (roles/storage.objectAdmin)

Según los recursos a los que planeas acceder, es posible que necesites roles adicionales más allá de los que te permiten usar la extensión y trabajar con canalizaciones de organización.

Crea una cuenta de servicio y otórgale roles de IAM

Usa una cuenta de servicio única para el entorno de Managed Airflow (3ª gen.). La cuenta de servicio crea un entorno de Managed Airflow (3ª gen.) y ejecuta todas las canalizaciones de organización que implementes.

Pídele a tu administrador que complete los siguientes pasos:

  1. Crea una cuenta de servicio como se describe en la documentación de IAM.
  2. Otórgale a la cuenta de servicio el rol Trabajador de Composer (composer.worker). Este rol proporciona los permisos necesarios en la mayoría de los casos.

Como práctica recomendada, si necesitas acceder a otros recursos de tu Google Cloud proyecto, otórgale permisos adicionales a esta cuenta de servicio solo cuando sea necesario para la operación de la Canalizaciones de organización.

Crear Google Cloud recursos para tu canalización de organización

En este paso, crea Google Cloud recursos para tu canalización de organización.

Crear un entorno de Managed Airflow (3ª gen.)

Crea un entorno de Managed Airflow (3ª gen.) con la siguiente configuración:

  • Nombre del entorno: Ingresa un nombre que usarás más adelante para configurar la canalización de organización. Por ejemplo, example-pipeline-scheduler.
  • Ubicación: Selecciona una ubicación. Te recomendamos que crees todos los recursos de esta guía en la misma ubicación. Por ejemplo, us-central1.
  • Cuenta de servicio: Selecciona la cuenta de servicio que creaste para este entorno.

En el siguiente ejemplo de comando de Google Cloud CLI, se muestra la sintaxis:

gcloud composer environments create example-pipeline-scheduler \
  --location us-central1 \
  --image-version composer-3-airflow-2 \
  --service-account "example-account@example-project.iam.gserviceaccount.com"

Agrega parámetros de entorno a la configuración del programador

Proporciona los detalles de conexión del entorno de Managed Airflow que ejecutará tu canalización de organización.

Agrega los parámetros de configuración del entorno que creaste con el editor de configuración de Google Cloud Data Agent Kit:

  1. Haz clic en el ícono de Google Cloud Data Agent Kit en la barra de actividades.
  2. Expande Configuración y, luego, haz clic en Configuración.
  3. Selecciona Programador.
  4. Ingresa los parámetros del entorno de Managed Airflow (3ª gen.) que creaste antes:
    • ID del proyecto: Es el nombre del proyecto en el que se encuentra el entorno. Ejemplo: example-project.
    • Región: Es la región en la que se encuentra el entorno. Ejemplo: us-central1.
    • Entorno: Es el nombre del entorno. Ejemplo: example-pipeline-scheduler.
  5. Haz clic en Guardar.

Crea un bucket para los artefactos de la canalización

Crea un bucket de Cloud Storage en el mismo proyecto que el entorno de Managed Airflow y asígnale un nombre similar a example-pipelines-bucket. Este bucket es necesario para almacenar tu trabajo de Managed Service para Apache Spark.

Algunas acciones de canalización, como la salida, los resultados a un bucket de Cloud Storage.

Crea un conjunto de datos y una tabla nuevos en BigQuery

En esta guía, se muestra una canalización que escribe datos en una tabla de BigQuery. Crea los siguientes recursos de BigQuery en tu proyecto:

  1. Crea un conjunto de datos nuevo llamado wordcount_dataset.
  2. Crea una tabla de BigQuery nueva llamada wordcount_output.

Agrega recursos de canalización

En esta guía, se muestra una tarea común de ingeniería de datos (ETL: extraer, transformar y cargar) con PySpark, leer desde BigQuery, transformar los datos (recuento de palabras) y volver a cargarlos en BigQuery.

Non-agentic

Agrega el siguiente archivo a la carpeta /scripts de tu repositorio. Más adelante, agregarás una acción de canalización que ejecute esta secuencia de comandos en Managed Service para Apache Spark.

Archivo wordcount.py de ejemplo:

#!/usr/bin/python
"""BigQuery I/O PySpark example for Word Count"""

from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.appName('spark-bigquery-demo') \
.getOrCreate()

# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the connector.
bucket = ARTIFACTS_BUCKET_NAME
spark.conf.set('temporaryGcsBucket', bucket)

# Load data from BigQuery public dataset (Shakespeare).
words = spark.read.format('bigquery') \
.option('table', 'bigquery-public-data:samples.shakespeare') \
.load()
words.createOrReplaceTempView('words')

# Perform word count using Spark SQL.
# This query counts occurrences of each word.
word_count = spark.sql(
    'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word ORDER BY word_count DESC'
)
word_count.show()
word_count.printSchema()

# Saving the results to a new table in BigQuery.
# Replace YOUR_PROJECT_ID with your project ID.
destination_table = 'PROJECT_ID:wordcount_dataset.wordcount_output'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.mode('overwrite') \
.save()

print(f"Successfully wrote word counts to BigQuery table: {destination_table}")

Reemplaza lo siguiente:

  • ARTIFACTS_BUCKET_NAME: Es el nombre del bucket de Cloud Storage que creaste antes. Ejemplo: example-pipelines-bucket.
  • PROJECT_ID: Es el nombre del proyecto en el que reside el entorno. Ejemplo: example-project.

Agentic

Pídele al agente que genere una secuencia de comandos de PySpark de muestra en la carpeta /scripts de tu repositorio. Más adelante, agregarás una acción de canalización que ejecute esta secuencia de comandos en Managed Service para Apache Spark.

Ingresa una instrucción similar a la siguiente:

I want to create a PySpark script that does the following:

1. Loads data from the bigquery-public-data:samples.shakespeare.
2. Counts occurrences of each word across all works using a Spark SQL query.
Sum the existing word counts for each word to get the total occurrences.
I want the results to be ordered by the word popularity, most popular first.
3. Saves results to a new table in BigQuery, in my project.

My project is sample-project, the destination table is
wordcount_dataset.wordcount_output, and I want to store temporary BigQuery
export data in example-pipelines-bucket.

Save the resulting script to /scripts as wordcount.py

Inicializa canalizaciones de organización en tu repositorio

Cuando inicializas canalizaciones de organización, la extensión Data Agent Kit para VS Code crea una estructura que incluye lo siguiente:

  • Un archivo YAML de canalización de organización: Una definición de canalización de ejemplo que contiene una programación, pero no acciones definidas.
  • deployment.yaml: Un ejemplo de configuración de implementación de canalización que define cómo se debe implementar tu canalización. En este archivo, se muestra la configuración necesaria para el entorno de Managed Airflow, el bucket de artefactos y cualquier otro recurso que usen las acciones de tu canalización.
  • .github/workflows/deploy.yaml: Configura una acción de GitHub que implementa tu canalización cuando combinas cambios en la rama main de tu repositorio de GitHub.
  • .github/workflows/validate.yaml: Configura una acción de GitHub que valida tu canalización después de que se implementa.

En los pasos posteriores de este documento, expandirás estas definiciones con la extensión Data Agent Kit para VS Code para crear e implementar una canalización de organización de forma local.

Non-agentic

Para inicializar canalizaciones de organización, haz lo siguiente:

  1. Haz clic en el ícono de Google Cloud Data Agent Kit en la barra de actividades.
  2. Expande Ingeniería de datos y, luego, haz clic en Inicializar canalización de organización .
  3. Ingresa los parámetros de la nueva canalización de organización:
  4. ID de la canalización: Ingresa el ID de tu canalización. Ejemplo: example-pipeline.
  5. ID del proyecto de Google Cloud: Es el nombre del proyecto en el que reside el entorno reside. Ejemplo: example-project.
  6. Región: Es la región en la que reside tu entorno. Ejemplo: us-central1.
  7. ID del entorno: Es el nombre del entorno con el que deseas desarrollar. Ejemplo: dev/staging.
  8. Entorno de Managed Service para Apache Airflow del programador: Es el nombre de l entorno en el que deseas organizar tus canalizaciones. Para este documento, especifica el mismo entorno en este parámetro.

  9. Bucket de artefactos: Es el nombre del bucket que se usa para los artefactos de la canalización, sin el prefijo gs://. Ejemplo: example-pipelines-bucket.

  10. Haz clic en Siguiente.

  11. Haz clic en Inicializar.

  12. Especifica un espacio de trabajo en el que deseas que se inicialice la canalización.

Agentic

Pídele al agente que cree una estructura para las canalizaciones de organización de tu repositorio.

Ingresa una instrucción similar a la siguiente:

Initialize orchestration pipelines in my repository. Don't add any actions
or schedule yet. I want to do it later.

The pipeline is my-sample-pipeline, the project ID is my-project, and the
region is us-central1.

The environment ID is my-test-environment. Use the same environment ID for
the Scheduler Managed Service.

Store pipeline artifacts in example-pipelines-bucket.

Después de inicializar las canalizaciones en tu repositorio, no podrás volver a hacerlo porque la nueva estructura anulará los cambios de configuración que hayas realizado. Para agregar canalizaciones nuevas, crea archivos de definición de canalización nuevos en tu proyecto y agrégalos a la configuración de implementación.

Agrega una nueva tarea a la canalización

Como la configuración inicial de la canalización no tiene ninguna acción, le agregas una acción que ejecuta tu secuencia de comandos de PySpark.

Non-agentic

Para editar una canalización, haz lo siguiente:

  1. Haz clic en el ícono de Google Cloud Data Agent Kit en la barra de actividades.
  2. Expanda Ingeniería de datos y, luego, Canalizaciones de organización.
  3. Selecciona example-pipeline.yaml. Se abrirá un editor de canalizaciones para la canalización seleccionada.
  4. Opcional: Selecciona el nodo Activar programación. Puedes ajustar la programación de tu canalización si especificas una expresión similar a cron y programas las horas de inicio y finalización. La programación predeterminada para la canalización recién inicializada es 0 2 * * *, que se ejecuta todos los días a las 2 a.m.
  1. Agrega una nueva tarea. En esta guía, agregarás una tarea de PySpark que ejecuta una secuencia de comandos de PySpark que agregaste antes:

    1. Haz clic en Agregar primera tarea para agregar un nodo de tarea nuevo.
    2. Selecciona Ejecutar secuencia de comandos de PySpark y el archivo script/wordcount.py.

    Se abrirá el panel Ejecutar secuencia de comandos de PySpark.

    1. En el modo de clúster de Spark, selecciona Spark sin servidores.
    2. En Ubicación, especifica la ubicación en la que reside tu entorno. Ejemplo: us-central1.
    3. Haz clic en Guardar.

Agentic

Ejecuta la siguiente instrucción:

Add the wordcount.py script to the pipeline. I want to run it in Serverless
Spark every day at 1 AM. Run it in the same region where the environment that
runs my pipeline is located. Use the minimal resource profile.

Implementa la versión local de la canalización

Implementa la versión local de la canalización para confirmar que esté configurada correctamente.

Cuando implementas una versión local de las Canalizaciones de organización, la extensión Data Agent Kit para VS Code sube una versión local del paquete de canalización al entorno de Managed Airflow y la ejecuta. La implementación local está diseñada para usarse cuando se trabaja en un entorno de desarrollo.

El comando de implementación implementa una programación sin pausar. Para evitar esto, puedes pausar la programación de forma manual en el panel Administración de canalizaciones. También puedes editar tu archivo YAML de canalización para comentar o quitar el bloque triggers: - schedule.

Non-agentic

Para implementar una versión local de la canalización de organización de ejemplo, haz lo siguiente:

  1. Haz clic en el ícono de Google Cloud Data Agent Kit en la barra de actividades.
  2. Expande Ingeniería de datos y, luego, Canalizaciones de organización.
  3. Selecciona example-pipeline.yaml. Se abrirá un editor de canalizaciones para la canalización seleccionada.
  4. Selecciona Ejecutar canalización y, luego, selecciona el entorno de desarrollo o de etapa de pruebas que creaste antes.

Agentic

Ejecuta la siguiente instrucción:

Deploy my pipeline

Supervisa la ejecución de la canalización y verifica los registros de ejecución

Después de implementar tu canalización, puedes ver la información detallada, el historial de ejecuciones de canalización y los registros de ejecución de la canalización:

  1. Haz clic en el ícono de Google Cloud Data Agent Kit en la barra de actividades.
  2. Expande Ingeniería de datos y, luego, selecciona Administración de canalizaciones.
  3. Haz clic en el nombre de tu canalización (example-pipeline) para ver su historial de ejecución. En la lista de ejecuciones para una fecha específica, puedes ver ejecuciones de canalización individuales y el desglose de acciones individuales dentro de cada ejecución de canalización.
  4. Haz clic en un ID de tarea para ver los registros de ejecución de la tarea. Como la secuencia de comandos de PySpark de ejemplo se ejecutó en Managed Service para Apache Spark, los registros de tareas tendrán un vínculo a los registros por lotes.

Soluciona problemas y corrige fallas de canalización

Cuando falla tu canalización, verás un botón Diagnosticar en el panel Administración de canalizaciones.

Agentic

Cuando haces clic en el botón Diagnosticar, el agente genera una instrucción para solucionar el problema de la canalización. La instrucción se copia en el portapapeles o se abre en una sesión de chat nueva.

El agente usa habilidades especializadas para solucionar problemas de canalizaciones, enfocándose en recopilar registros, verificar el código implementado y el espacio de trabajo, y generar un análisis de causa raíz (RCA).

Los posibles pasos siguientes después de recibir el RCA son los siguientes:

  • Aplica el análisis de causa raíz en el espacio de trabajo actual.
  • Pídele al agente que cree una rama nueva y aplique los cambios allí.
  • Abre un ticket de Atención al cliente de Cloud con los detalles del RCA.

Si necesitas ayuda para solucionar problemas con la extensión, consulta Solución de problemas.

¿Qué sigue?