使用 Dataflow 為 AlloyDB 建構即時向量嵌入管道

本文說明如何使用 Dataflow 建立 PostgreSQL 適用的 AlloyDB「擷取、轉換及載入」(ETL) 管道。 Google Cloud Dataflow 是全代管 Google Cloud 服務,用於開發及執行資料處理管道。

您可以按照文件中的操作說明,以 Vector Embedding Ingestion with Apache Beam and AlloyDB Colab 為基礎,使用 Python 建立 basic_ingestion_pipeline.py 擷取管道。您可以將本文資訊用於語意搜尋或檢索增強生成 (RAG) 等用途。

這些操作說明會介紹下列 Dataflow 管道元件:

  • 設定 AlloyDB 和 Dataflow 連線
  • 使用 Apache Beam VertexAITextEmbeddings 處理常式和 Vertex AI 文字嵌入模型,在 PostgreSQL 適用的 AlloyDB 中生成嵌入
  • 在 Dataflow 中建立串流管道

事前準備

使用 Colab 建立 Dataflow 管道前,請先完成下列必要條件:

設定 AlloyDB for PostgreSQL 執行個體和管道元件

首先,請設定管道,連線至 AlloyDB for PostgreSQL 執行個體。這項設定包括定義 Google Cloud 專案 ID、PostgreSQL 適用的 AlloyDB 執行個體 URI、使用者和密碼,以便使用 AlloyDB 語言連接器連線。如要進一步瞭解如何設定連線,請參閱「資料庫設定」。

檢索增強生成 (RAG) 專用的 Apache Beam 模組提供下列工作的類別:

  • 從 PostgreSQL 適用的 AlloyDB 擷取資料
  • 生成嵌入項目
  • 將這些向量嵌入內容寫回 PostgreSQL 適用的 AlloyDB

建構管道邏輯前,請先將必要類別匯入管道程式碼。如要進一步瞭解管道元件,請參閱「匯入管道元件」。

建立樣本資料

使用 Apache Beam 和 AlloyDB 擷取向量嵌入 Colab 提供範例 products_data 資料,用於執行管道。管道會將這項範例資料和嵌入模型做為輸入內容,產生嵌入。

詳情請參閱「建立範例資料」。

建立資料表來儲存嵌入

管道會將產生的嵌入儲存在 default_dataflow_product_embeddings 資料表中。如要進一步瞭解如何建立資料表結構定義,請參閱「建立含有預設結構定義的資料表」。

選用:準備要擷取的嵌入內容資料

根據資料集,您可以將資料分割為中繼資料和文字,嵌入模型必須將這些資料轉換為嵌入項目。MLTransform()VectorDatabaseWriteTransform() 類別會將輸入資料處理成嵌入模型支援的大小。請根據所用嵌入模型的規格,加入中繼資料並設定輸入資料格式。

如要進一步瞭解如何準備資料,請參閱「將產品資料對應至區塊」。

設定嵌入處理常式來產生嵌入

VertexAITextEmbeddings() 類別會定義建立向量嵌入項目的文字嵌入模型。這個嵌入模型會將分塊資料轉換為嵌入。

詳情請參閱「設定嵌入處理常式」。

您也可以使用以 Huggingface SentenceTransformers 架構建立的預先訓練模型,生成向量嵌入。詳情請參閱「使用 HuggingFace 生成嵌入內容」。

建立擷取管道

basic_ingestion_pipeline.py 管道 (位於 Vector Embedding Ingestion with Apache Beam and AlloyDB Colab 中) 會納入先前各節的設定,包括 AlloyDB for PostgreSQL 設定、將資料載入 AlloyDB for PostgreSQL、選用資料分塊,以及嵌入處理常式設定。

擷取管道會執行下列作業:

  • 建立產品資料表
  • 將資料轉換為區塊
  • 生成嵌入
  • 將轉換後的嵌入寫入 PostgreSQL 適用的 AlloyDB 中的 products_data 資料表

您可以使用直接本機執行器或雲端執行器 (例如 Dataflow) 執行這個管道。

如要進一步瞭解如何建立擷取管道,請參閱「將管道儲存至 Python 檔案」。

執行 Dataflow 管道

您可以透過指令列執行 Dataflow 管道。傳遞憑證,例如專案 ID、PostgreSQL 適用的 AlloyDB 連線詳細資料、Cloud Storage 值區位置、執行環境詳細資料、網路資訊,以及擷取管道的名稱 (basic_ingestion_pipeline.py)。

在「使用 Apache Beam 和 AlloyDB 擷取向量嵌入內容」Colab 中,PostgreSQL 適用的 AlloyDB 執行個體和 Dataflow 工作會在相同的虛擬私有雲網路和子網路中執行。

如要進一步瞭解如何在 Dataflow 中執行管道,請參閱「在 Dataflow 上執行管道」。

在 Google Cloud 控制台的 Dataflow 資訊主頁中,您可以在管道執行時查看執行圖、記錄和指標。

選用:執行串流 Dataflow 管道

如果資料預計會經常變更 (例如相似性搜尋或推薦引擎),請考慮使用 Dataflow 和 Pub/Sub 建立串流管道。

這個管道不會處理批次資料,而是持續從 Pub/Sub 主題讀取傳入的訊息、將訊息轉換為區塊、使用指定模型 (例如 Hugging Face 或 Vertex AI) 產生嵌入內容,並更新 AlloyDB for PostgreSQL 表格。

詳情請參閱「從 Pub/Sub 串流嵌入更新」。

驗證 AlloyDB for PostgreSQL 中的向量嵌入

管道執行完畢後,請確認管道是否已將嵌入寫入 AlloyDB for PostgreSQL 資料庫。

詳情請參閱「驗證手寫內嵌內容」。

後續步驟