將資料從向量資料庫遷移至 AlloyDB

本教學課程說明如何使用 LangChain VectorStore,將資料從第三方向量資料庫遷移至 PostgreSQL 適用的 AlloyDB。本教學課程假設第三方向量資料庫中的資料是使用 LangChain VectorStore 整合功能建立。如果您未使用 LangChain,而是將資訊放入下列其中一個資料庫,可能需要編輯下方提供的指令碼,以符合資料的結構定義。系統支援下列向量資料庫:

本教學課程假設您熟悉 Google Cloud、AlloyDB 和非同步 Python 程式設計。

必要條件

請確認您具備下列其中一個 LangChain 第三方資料庫向量儲存空間:

啟用計費功能和必要 API

  1. 在 Google Cloud 控制台的專案選擇器頁面中,選取或建立專案。Google Cloud

    前往專案選取器

  2. 確認您已為 Google Cloud 專案啟用計費功能

  3. 啟用建立及連線至 PostgreSQL 適用的 AlloyDB 時所需的 Cloud API。

    啟用 API

    1. 在「確認專案」步驟中,按一下「下一步」,確認要變更的專案名稱。
    2. 在「啟用 API」步驟中,按一下「啟用」,啟用下列項目:

      • AlloyDB API
      • Compute Engine API
      • Service Networking API

必要的角色

如要取得完成本教學課程中工作所需的權限,請具備下列身分與存取權管理 (IAM) 角色,以便建立資料表及插入資料:

如要使用 IAM 驗證機制驗證資料庫,而非本教學課程中的內建驗證機制,請使用筆記本,瞭解如何使用 PostgreSQL 適用的 AlloyDB 搭配 AlloyDBVectorStore 類別儲存向量嵌入

建立 AlloyDB 叢集和使用者

  1. 建立 AlloyDB 叢集和執行個體
    • 啟用公開 IP,即可隨時隨地執行本教學課程。如果您使用私人 IP,必須在虛擬私有雲中執行本教學課程。
  2. 建立或選取 AlloyDB 資料庫使用者
    • 建立執行個體時,系統會建立 postgres 使用者並指派密碼。這位使用者擁有超級使用者權限。
    • 本教學課程使用內建驗證功能,盡量減少驗證方面的不便。您可以使用 AlloyDBEngine 進行 IAM 驗證。

擷取程式碼範例

  1. 複製存放區,從 GitHub 複製程式碼範例:

    git clone https://github.com/googleapis/langchain-google-alloydb-pg-python.git
  2. 請前往 migrations 目錄:

    cd langchain-google-alloydb-pg-python/samples/migrations

從現有向量資料庫擷取資料

  1. 建立用戶端。

    Pinecone

    from pinecone import Pinecone  # type: ignore
    
    pinecone_client = Pinecone(api_key=pinecone_api_key)
    pinecone_index = pinecone_client.Index(pinecone_index_name)

    Weaviate

    import weaviate
    
    # For a locally running weaviate instance, use `weaviate.connect_to_local()`
    weaviate_client = weaviate.connect_to_weaviate_cloud(
        cluster_url=weaviate_cluster_url,
        auth_credentials=weaviate.auth.AuthApiKey(weaviate_api_key),
    )

    Chroma

    from langchain_chroma import Chroma
    
    chromadb_client = Chroma(
        collection_name=chromadb_collection_name,
        embedding_function=embeddings_service,
        persist_directory=chromadb_path,
    )

    Qdrant

    from qdrant_client import QdrantClient
    
    qdrant_client = QdrantClient(path=qdrant_path)
    

    Milvus

    milvus_client = MilvusClient(uri=milvus_uri)
  2. 從資料庫取得所有資料。

    Pinecone

    從 Pinecone 索引擷取向量 ID:

    results = pinecone_index.list_paginated(
        prefix="", namespace=pinecone_namespace, limit=pinecone_batch_size
    )
    ids = [v.id for v in results.vectors]
    if ids:  # Prevents yielding an empty list.
        yield ids
    
    # Check BOTH pagination and pagination.next
    while results.pagination is not None and results.pagination.get("next") is not None:
        pagination_token = results.pagination.get("next")
        results = pinecone_index.list_paginated(
            prefix="",
            pagination_token=pagination_token,
            namespace=pinecone_namespace,
            limit=pinecone_batch_size,
        )
    
        # Extract and yield the next batch of IDs
        ids = [v.id for v in results.vectors]
        if ids:  # Prevents yielding an empty list.
            yield ids

    然後從 Pinecone 索引依 ID 擷取記錄:

    import uuid
    
    # Iterate through the IDs and download their contents
    for ids in id_iterator:
        all_data = pinecone_index.fetch(ids=ids, namespace=pinecone_namespace)
        ids = []
        embeddings = []
        contents = []
        metadatas = []
    
        # Process each vector in the current batch
        for doc in all_data["vectors"].values():
            # You might need to update this data translation logic according to one or more of your field names
            if pinecone_id_column_name in doc:
                # pinecone_id_column_name stores the unqiue identifier for the content
                ids.append(doc[pinecone_id_column_name])
            else:
                # Generate a uuid if pinecone_id_column_name is missing in source
                ids.append(str(uuid.uuid4()))
            # values is the vector embedding of the content
            embeddings.append(doc["values"])
            # Check if pinecone_content_column_name exists in metadata before accessing
            if pinecone_content_column_name in doc.metadata:
                # pinecone_content_column_name stores the content which was encoded
                contents.append(str(doc.metadata[pinecone_content_column_name]))
                # Remove pinecone_content_column_name after processing
                del doc.metadata[pinecone_content_column_name]
            else:
                # Handle the missing pinecone_content_column_name field appropriately
                contents.append("")
            # metadata is the additional context
            metadatas.append(doc["metadata"])
    
        # Yield the current batch of results
        yield ids, contents, embeddings, metadatas

    Weaviate

    # Iterate through the IDs and download their contents
    weaviate_collection = weaviate_client.collections.get(weaviate_collection_name)
    ids: list[str] = []
    content: list[Any] = []
    embeddings: list[list[float]] = []
    metadatas: list[Any] = []
    
    for item in weaviate_collection.iterator(include_vector=True):
        # You might need to update this data translation logic according to one or more of your field names
        # uuid is the unqiue identifier for the content
        ids.append(str(item.uuid))
        # weaviate_text_key is the content which was encoded
        content.append(item.properties[weaviate_text_key])
        # vector is the vector embedding of the content
        embeddings.append(item.vector["default"])  # type: ignore
        del item.properties[weaviate_text_key]  # type: ignore
        # properties is the additional context
        metadatas.append(item.properties)
    
        if len(ids) >= weaviate_batch_size:
            # Yield the current batch of results
            yield ids, content, embeddings, metadatas
            # Reset lists to start a new batch
            ids = []
            content = []
            embeddings = []
            metadatas = []

    Chroma

    # Iterate through the IDs and download their contents
    offset = 0
    while True:
        # You might need to update this data translation logic according to one or more of your field names
        # documents is the content which was encoded
        # embeddings is the vector embedding of the content
        # metadatas is the additional context
        docs = chromadb_client.get(
            include=["metadatas", "documents", "embeddings"],
            limit=chromadb_batch_size,
            offset=offset,
        )
    
        if len(docs["documents"]) == 0:
            break
    
        # ids is the unqiue identifier for the content
        yield docs["ids"], docs["documents"], docs["embeddings"].tolist(), docs[
            "metadatas"
        ]
    
        offset += chromadb_batch_size
    

    Qdrant

    # Iterate through the IDs and download their contents
    offset = None
    while True:
        docs, offset = qdrant_client.scroll(
            collection_name=qdrant_collection_name,
            with_vectors=True,
            limit=qdrant_batch_size,
            offset=offset,
            with_payload=True,
        )
    
        ids: List[str] = []
        contents: List[Any] = []
        embeddings: List[List[float]] = []
        metadatas: List[Any] = []
    
        for doc in docs:
            if doc.payload and doc.vector:
                # You might need to update this data translation logic according to one or more of your field names
                # id is the unqiue identifier for the content
                ids.append(str(doc.id))
                # page_content is the content which was encoded
                contents.append(doc.payload["page_content"])
                # vector is the vector embedding of the content
                embeddings.append(doc.vector)  # type: ignore
                # metatdata is the additional context
                metadatas.append(doc.payload["metadata"])
    
        yield ids, contents, embeddings, metadatas
    
        if not offset:
            break
    

    Milvus

    # Iterate through the IDs and download their contents
    iterator = milvus_client.query_iterator(
        collection_name=milvus_collection_name,
        filter='pk >= "0"',
        output_fields=["pk", "text", "vector", "idv"],
        batch_size=milvus_batch_size,
    )
    
    while True:
        ids = []
        content = []
        embeddings = []
        metadatas = []
        page = iterator.next()
        if len(page) == 0:
            iterator.close()
            break
        for i in range(len(page)):
            # You might need to update this data translation logic according to one or more of your field names
            doc = page[i]
            # pk is the unqiue identifier for the content
            ids.append(doc["pk"])
            # text is the content which was encoded
            content.append(doc["text"])
            # vector is the vector embedding of the content
            embeddings.append(doc["vector"])
            del doc["pk"]
            del doc["text"]
            del doc["vector"]
            # doc is the additional context
            metadatas.append(doc)
        yield ids, content, embeddings, metadatas

初始化 AlloyDB 資料表

  1. 定義嵌入服務。

    VectorStore 介面需要嵌入服務。這個工作流程不會產生新的嵌入內容,因此使用 FakeEmbeddings 類別可避免任何費用。

    Pinecone

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Weaviate

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Chroma

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Qdrant

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Milvus

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)
  2. 準備 AlloyDB 資料表。

    1. 使用公開 IP 連線連線至 AlloyDB。 詳情請參閱「指定 IP 位址類型」。

      Pinecone

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,  # Optionally use IPTypes.PRIVATE
      )

      Weaviate

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )

      Chroma

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )

      Qdrant

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )

      Milvus

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )
    2. 如果資料表不存在,請建立資料表,以便複製資料。

      Pinecone

      from langchain_google_alloydb_pg import Column
      
      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
          # Customize the ID column types if not using the UUID data type
          # id_column=Column("langchain_id", "TEXT"),  # Default is Column("langchain_id", "UUID")
          # overwrite_existing=True,  # Drop the old table and Create a new vector store table
      )

      Weaviate

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
          # Customize the ID column types with `id_column` if not using the UUID data type
      )
      

      Chroma

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
          # Customize the ID column types with `id_column` if not using the UUID data type
      )

      Qdrant

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
          # Customize the ID column types with `id_column` if not using the UUID data type
      )

      Milvus

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
          # Customize the ID column types with `id_column` if not using the UUID data type
      )

初始化向量儲存庫物件

這段程式碼會以 JSON 格式,在 langchain_metadata 資料欄中新增額外的向量嵌入中繼資料。如要更有效率地篩選,請將這項中繼資料整理到不同的資料欄。詳情請參閱「建立自訂向量儲存空間」。

  1. 如要初始化向量儲存物件,請執行下列指令:

    Pinecone

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Weaviate

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Chroma

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Qdrant

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Milvus

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )
  2. 將資料插入 AlloyDB 資料表:

    Pinecone

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Weaviate

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Chroma

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Qdrant

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Milvus

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

執行遷移指令碼

  1. 設定 Python 環境

  2. 安裝範例依附元件:

    pip install -r requirements.txt
  3. 執行範例遷移作業。

    Pinecone

    python migrate_pinecone_vectorstore_to_alloydb.py

    執行範例前,請先進行下列替換作業:

    • PINECONE_API_KEY:Pinecone API 金鑰。
    • PINECONE_NAMESPACE:Pinecone 命名空間。
    • PINECONE_INDEX_NAME:Pinecone 索引的名稱。
    • PROJECT_ID:專案 ID。
    • REGION:部署 AlloyDB 叢集的區域。
    • CLUSTER:叢集名稱。
    • INSTANCE:執行個體的名稱。
    • DB_NAME:資料庫名稱。
    • DB_USER:資料庫使用者的名稱。
    • DB_PWD:資料庫密碼。

    Weaviate

    python migrate_weaviate_vectorstore_to_alloydb.py

    執行範例前,請先進行下列替換作業:

    • WEAVIATE_API_KEY:Weaviate API 金鑰。
    • WEAVIATE_CLUSTER_URL:Weaviate 叢集網址。
    • WEAVIATE_COLLECTION_NAME:Weaviate 集合名稱。
    • PROJECT_ID:專案 ID。
    • REGION:部署 AlloyDB 叢集的區域。
    • CLUSTER:叢集名稱。
    • INSTANCE:執行個體的名稱。
    • DB_NAME:資料庫名稱。
    • DB_USER:資料庫使用者的名稱。
    • DB_PWD:資料庫密碼。

    Chroma

    python migrate_chromadb_vectorstore_to_alloydb.py

    執行範例前,請先進行下列替換作業:

    • CHROMADB_PATH:Chroma 資料庫路徑。
    • CHROMADB_COLLECTION_NAME:Chroma 資料庫集合的名稱。
    • PROJECT_ID:專案 ID。
    • REGION:部署 AlloyDB 叢集的區域。
    • CLUSTER:叢集名稱。
    • INSTANCE:執行個體的名稱。
    • DB_NAME:資料庫名稱。
    • DB_USER:資料庫使用者的名稱。
    • DB_PWD:資料庫密碼。

    Qdrant

    python migrate_qdrant_vectorstore_to_alloydb.py

    執行範例前,請先進行下列替換作業:

    • QDRANT_PATH:Qdrant 資料庫路徑。
    • QDRANT_COLLECTION_NAME:Qdrant 集合名稱。
    • PROJECT_ID:專案 ID。
    • REGION:部署 AlloyDB 叢集的區域。
    • CLUSTER:叢集名稱。
    • INSTANCE:執行個體的名稱。
    • DB_NAME:資料庫名稱。
    • DB_USER:資料庫使用者的名稱。
    • DB_PWD:資料庫密碼。

    Milvus

    python migrate_milvus_vectorstore_to_alloydb.py

    執行範例前,請先進行下列替換作業:

    • MILVUS_URI:Milvus URI。
    • MILVUS_COLLECTION_NAME:Milvus 集合的名稱。
    • PROJECT_ID:專案 ID。
    • REGION:部署 AlloyDB 叢集的區域。
    • CLUSTER:叢集名稱。
    • INSTANCE:執行個體的名稱。
    • DB_NAME:資料庫名稱。
    • DB_USER:資料庫使用者的名稱。
    • DB_PWD:資料庫密碼。

    如果遷移成功,系統會列印類似下列內容的記錄,且不會發生任何錯誤:
    Migration completed, inserted all the batches of data to AlloyDB

  4. 開啟 AlloyDB Studio,查看已遷移的資料。詳情請參閱使用 AlloyDB Studio 管理資料