Crear una canalización de incrustaciones de vectores en tiempo real para AlloyDB con Dataflow

En este documento se explica cómo crear una canalización de extracción, transformación y carga (ETL) de AlloyDB para PostgreSQL mediante Dataflow. Google Cloud Dataflow es un servicio totalmente gestionado Google Cloud para desarrollar y ejecutar canalizaciones de procesamiento de datos.

Puedes seguir las instrucciones del documento, que se basan en el cuaderno de Colab Vector Embedding Ingestion with Apache Beam and AlloyDB (Ingestión de incrustaciones de vectores con Apache Beam y AlloyDB), que usa Python para crear la canalización de ingestión basic_ingestion_pipeline.py. Algunos de los casos prácticos en los que puedes aplicar la información de este documento son la búsqueda semántica o la generación aumentada por recuperación (RAG).

En estas instrucciones se describen los siguientes componentes de la canalización de Dataflow:

  • Configurar una conexión de AlloyDB y Dataflow
  • Generar embeddings en AlloyDB para PostgreSQL con el controlador VertexAITextEmbeddings de Apache Beam y el modelo de embeddings de texto de Vertex AI
  • Crear una canalización de streaming en Dataflow

Antes de empezar

Antes de crear el flujo de procesamiento de Dataflow con Colab, completa estos requisitos previos:

Configurar la instancia de AlloyDB para PostgreSQL y los componentes de la canalización

Primero, configura tu canalización para que se conecte a una instancia de AlloyDB para PostgreSQL. Esta configuración incluye la definición del ID de proyecto, el URI de la instancia de AlloyDB para PostgreSQL, el usuario y la contraseña para conectarse mediante el conector de lenguaje de AlloyDB. Google Cloud Para obtener más información sobre cómo configurar la conexión, consulta Configuración de la base de datos.

Los módulos de Apache Beam específicos de la generación aumentada de recuperación (RAG) proporcionan clases para las siguientes tareas:

  • Ingerir datos de AlloyDB para PostgreSQL
  • Generar inserciones
  • Escribir estas inserciones de vectores en AlloyDB para PostgreSQL

Importa las clases necesarias en el código de tu canalización antes de crear la lógica de la canalización. Para obtener más información sobre los componentes de la canalización, consulta Importar componentes de la canalización.

Crear datos de muestra

El Colab Ingestión de inserciones de vectores con Apache Beam y AlloyDB proporciona datos de products_data de ejemplo para ejecutar el flujo de procesamiento. El flujo de procesamiento usa estos datos de muestra como entrada, junto con el modelo de inserción, para generar inserciones.

Para obtener más información, consulta Crear datos de ejemplo.

Crear una tabla para almacenar las inserciones

La canalización almacena las inserciones generadas en la tabla default_dataflow_product_embeddings. Para obtener más información sobre cómo crear el esquema de la tabla, consulta Crear una tabla con el esquema predeterminado.

Opcional: Preparar los datos para la ingestión de inserciones

En función de tu conjunto de datos, puedes dividir los datos en metadatos y texto que el modelo de inserciones debe convertir en inserciones. Las clases MLTransform() y VectorDatabaseWriteTransform() procesan los datos de entrada para que tengan un tamaño compatible con el modelo de inserciones. Incluye los metadatos y da formato a los datos de entrada según las especificaciones del modelo de embedding que estés usando.

Para obtener más información sobre cómo preparar los datos, consulta Asignar datos de productos a fragmentos.

Configurar el controlador de inserciones para generar inserciones

La clase VertexAITextEmbeddings() define el modelo de inserción de texto que crea inserciones vectoriales. Este modelo de incrustación convierte los datos fragmentados en incrustaciones.

Para obtener más información, consulta Configurar Embedding Handler.

También puedes usar un modelo preentrenado creado con el framework SentenceTransformers de Hugging Face para generar incrustaciones de vectores. Para obtener más información, consulta Generar embeddings con Hugging Face.

Crear un flujo de procesamiento de ingestión

La canalización basic_ingestion_pipeline.py, que se proporciona en el Colab Ingestión de inserciones vectoriales con Apache Beam y AlloyDB, incorpora las configuraciones de las secciones anteriores, como la configuración de AlloyDB para PostgreSQL, la carga de datos en AlloyDB para PostgreSQL, la fragmentación de datos opcional y la configuración del controlador de inserciones.

La canalización de ingesta hace lo siguiente:

  • Crea tablas de datos de producto
  • Convierte datos en fragmentos.
  • Genera embeddings
  • Escribe las inserciones convertidas en la tabla products_data de AlloyDB para PostgreSQL

Puedes ejecutar este flujo de procesamiento con un ejecutor local directo o con un ejecutor basado en la nube, como Dataflow.

Para obtener más información sobre cómo crear la canalización de ingestión, consulta Guardar la canalización en un archivo de Python.

Ejecutar el flujo de procesamiento de Dataflow

Puedes ejecutar una canalización de Dataflow desde la línea de comandos. Transfiere las credenciales, como el ID de tu proyecto, los detalles de conexión de AlloyDB para PostgreSQL, la ubicación del segmento de Cloud Storage, los detalles del entorno de ejecución, la información de la red y el nombre de la canalización de ingestión (basic_ingestion_pipeline.py).

En el Colab Ingestión de embeddings de vectores con Apache Beam y AlloyDB, la instancia de AlloyDB para PostgreSQL y los trabajos de Dataflow se ejecutan en la misma red VPC y subred.

Para obtener más información sobre cómo ejecutar una canalización en Dataflow, consulta Ejecutar una canalización en Dataflow.

En la Google Cloud consola, en el panel de control de Dataflow, puedes ver gráficos de ejecución, registros y métricas mientras se ejecuta tu flujo de procesamiento.

Opcional: Ejecutar el flujo de procesamiento de Dataflow de streaming

En el caso de los datos que se espera que cambien con frecuencia, como las búsquedas de similitud o los motores de recomendaciones, te recomendamos que crees una canalización de streaming con Dataflow y Pub/Sub.

En lugar de procesar un lote de datos, esta canalización lee continuamente los mensajes entrantes de un tema de Pub/Sub, los convierte en fragmentos, genera las inserciones mediante un modelo especificado (como Hugging Face o Vertex AI) y actualiza la tabla de AlloyDB para PostgreSQL.

Para obtener más información, consulta Actualizaciones de inserciones de streaming de Pub/Sub.

Verificar las inserciones vectoriales en AlloyDB para PostgreSQL

Una vez que se haya ejecutado la canalización, comprueba que haya escrito las inserciones en tu base de datos de AlloyDB para PostgreSQL.

Para obtener más información, consulta Verificar las inserciones escritas.

Siguientes pasos