建立自訂連接器

本頁說明如何建立自訂連接器。

事前準備

開始之前,請務必備妥以下項目:

  • 檢查 Google Cloud 專案是否已啟用計費功能

  • 安裝初始化 Google Cloud CLI。確認已向專案驗證。

  • 取得專案的 Discovery Engine 管理員存取權。 Google Cloud

  • 取得第三方資料來源的存取憑證 (例如 API 金鑰或資料庫驗證)。

  • 制定明確的資料對應計畫。這必須包括要建立索引的欄位,以及如何表示存取權控管 (包括第三方身分)。

建立基本連接器

本節將示範如何以所選語言建立自訂連接器。這裡顯示的原則和模式適用於任何外部系統。只要以所選語言,為特定來源調整 API 呼叫和資料轉換,即可建立基本連接器。

擷取資料

如要開始使用,請從第三方資料來源擷取資料。在本範例中,我們將示範如何使用分頁擷取貼文。在實際工作環境中,建議針對大型資料集使用串流方法。這樣可避免一次載入所有資料時可能發生的記憶體問題。

Python

    def fetch_posts(base_url: str, per_page: int = 15) -> List[dict]:
        #Fetch all posts from the given site.#
        url = base_url.rstrip("/") + "/wp-json/wp/v2/posts"
        posts: List[dict] = []
        page = 1
        while True:
            resp = requests.get(
                url,
                params={"page": page, "per_page": per_page},
            )
            resp.raise_for_status()
            batch = resp.json()
            posts.extend(batch)
            if len(batch) < per_page:
                break
            page += 1
        return posts

轉換資料

如要將來源資料轉換為 Discovery Engine 文件格式,請按照下列範例酬載的結構進行。您可以視需要加入任意數量的鍵/值組合。舉例來說,您可以加入完整內容,進行全面搜尋。或者,您也可以加入結構化欄位進行多面向搜尋,或結合兩種方式。

Python

    def convert_posts_to_documents(posts: List[dict]) -> List[discoveryengine.Document]:
        # Convert WP posts into Discovery Engine Document messages.
        docs: List[discoveryengine.Document] = []
        for post in posts:
            payload = {
                "title": post.get("title", {}).get("rendered"),
                "body": post.get("content", {}).get("rendered"),
                "url": post.get("link"),
                "author": post.get("author"),
                "categories": post.get("categories"),
                "tags": post.get("tags"),
                "date": post.get("date"),
            }
            doc = discoveryengine.Document(
                id=str(post["id"]),
                json_data=json.dumps(payload),
            )
            docs.append(doc)
        return docs

擷取或建立身分識別儲存空間

如要管理使用者身分和群組以控管存取權,您必須擷取或建立身分識別存放區。這項函式會依據 ID、專案和位置取得現有的身分識別商店。如果身分識別存放區不存在,系統會建立並傳回新的空白身分識別存放區。

Python

    def get_or_create_ims_data_store(
        project_id: str,
        location: str,
        identity_mapping_store_id: str,
    ) -> discoveryengine.DataStore:
      """Get or create a DataStore."""
      # Initialize the client
      client_ims = discoveryengine.IdentityMappingStoreServiceClient()
      # Construct the parent resource name
      parent_ims = client_ims.location_path(project=project_id, location=location)

      try:
        # Create the request object
        name = f"projects/{project_id}/locations/{location}/identityMappingStores/{identity_mapping_store_id}"
        request = discoveryengine.GetIdentityMappingStoreRequest(
            name=name,
        )
        return client_ims.get_identity_mapping_store(request=request)
      except:
        # Create the IdentityMappingStore object (it can be empty for basic creation)
        identity_mapping_store = discoveryengine.IdentityMappingStore()
        # Create the request object
        request = discoveryengine.CreateIdentityMappingStoreRequest(
            parent=parent_ims,
            identity_mapping_store=identity_mapping_store,
            identity_mapping_store_id=identity_mapping_store_id,
        )
        return client_ims.create_identity_mapping_store(request=request)

get_or_create_ims_data_store 函式會使用下列鍵變數:

  • project_id: Google Cloud 專案的 ID。
  • location:身分對應商店的位置。 Google Cloud
  • identity_mapping_store_id:身分識別資訊商店的專屬 ID。
  • client_ims:discoveryengine.IdentityMappingStoreServiceClient 的執行個體,用於與身分識別資訊商店 API 互動。
  • parent_ims:父項位置的資源名稱,使用 client_ims.location_path 建構。
  • name:身分對應商店的完整資源名稱,用於 GetIdentityMappingStoreRequest。

將識別資訊對應檔匯入識別資訊存放區

如要將身分對應項目載入指定的身分存放區,請使用這個函式。這項作業會接收身分對應項目清單,並啟動內嵌匯入作業。這對建立存取控管和個人化所需的關係 (使用者、群組和外部身分) 至關重要。

Python

def load_ims_data(
    ims_store: discoveryengine.DataStore,
    id_mapping_data: list[discoveryengine.IdentityMappingEntry],
) -> discoveryengine.DataStore:
  """Get the IMS data store."""
  # Initialize the client
  client_ims = discoveryengine.IdentityMappingStoreServiceClient()

  #  Create the InlineSource object
  inline_source = discoveryengine.ImportIdentityMappingsRequest.InlineSource(
      identity_mapping_entries=id_mapping_data
  )

  # Create the main request object
  request_ims = discoveryengine.ImportIdentityMappingsRequest(
      identity_mapping_store=ims_store.name,
      inline_source=inline_source,
  )

  try:
    # Create the InlineSource object, which holds your list of entries
    operation = client_ims.import_identity_mappings(
        request=request_ims,
    )
    result = operation.result()
    return result

  except Exception as e:
    print(f"IMS Load Error: {e}")
    result = operation.result()
    return result

load_ims_data 函式會使用下列主要變數:

  • ims_store:discoveryengine.DataStore 物件,代表要載入資料的身分對應商店。
  • id_mapping_data:discoveryengine.IdentityMappingEntry 物件清單,每個物件都包含外部身分及其對應的使用者或群組 ID。
  • result:discoveryengine.DataStore 類型的傳回值。

建立資料儲存庫

如要使用自訂連接器,請先初始化內容的資料存放區。自訂連接器請使用 default_collectionIndustryVertical 參數可自訂資料存放區的行為,以因應特定用途。GENERIC 適用於大多數情況。不過,您可以為特定產業選擇不同的值,例如 MEDIAHEALTHCARE_FHIR。設定顯示名稱和其他屬性,以符合專案的命名慣例和需求。

Python

def get_or_create_data_store(
    project_id: str,
    location: str,
    display_name: str,
    data_store_id: str,
    identity_mapping_store: str,
) -> discoveryengine.DataStore:
  """Get or create a DataStore."""
  client = discoveryengine.DataStoreServiceClient()
  ds_name = client.data_store_path(project_id, location, data_store_id)
  try:
    result = client.get_data_store(request={"name": ds_name})
    return result
  except:
    parent = client.collection_path(project_id, location, "default_collection")
    operation = client.create_data_store(
        request={
            "parent": parent,
            "data_store": discoveryengine.DataStore(
                display_name=display_name,
                acl_enabled=True,
                industry_vertical=discoveryengine.IndustryVertical.GENERIC,
                identity_mapping_store=identity_mapping_store,
            ),
            "data_store_id": data_store_id,
        }
    )
    result = operation.result()
    return result

get_or_create_data_store 函式會使用下列鍵變數:

  • project_id: Google Cloud 專案的 ID。
  • location:資料存放區的位置。 Google Cloud
  • display_name:資料儲區的易記顯示名稱。
  • data_store_id:資料儲存庫的專屬 ID。
  • identity_mapping_store:要繫結的身分對應存放區資源名稱。
  • result:discoveryengine.DataStore 類型的傳回值。

直接上傳文件

如要直接將文件傳送至 Discovery Engine,請使用內嵌上傳功能。這個方法預設使用增量對帳模式,不支援完整對帳模式。在增量模式下,系統會新增文件並更新現有文件,但不會刪除來源中已不存在的文件。完整對帳模式會將資料儲存庫與來源資料同步處理,包括刪除來源中已不存在的文件。

如果系統 (例如 CRM) 經常處理資料的微小變更,就非常適合使用增量對帳。系統只會傳送特定變更,而非同步處理整個資料庫,因此速度更快、效率更高。您仍可定期執行完整同步作業,確保整體資料完整性。

Python

    def upload_documents_inline(
        project_id: str,
        location: str,
        data_store_id: str,
        branch_id: str,
        documents: List[discoveryengine.Document],
    ) -> discoveryengine.ImportDocumentsMetadata:
        """Inline import of Document messages."""
        client = discoveryengine.DocumentServiceClient()
        parent = client.branch_path(
            project=project_id,
            location=location,
            data_store=data_store_id,
            branch=branch_id,
        )
        request = discoveryengine.ImportDocumentsRequest(
            parent=parent,
            inline_source=discoveryengine.ImportDocumentsRequest.InlineSource(
                documents=documents,
            ),
        )
        operation = client.import_documents(request=request)
        operation.result()
        result = operation.metadata
        return result

upload_documents_inline 函式會使用下列重要變數:

  • project_id: Google Cloud 專案的 ID。
  • location:資料存放區的位置。 Google Cloud
  • data_store_id:資料儲存庫的 ID。
  • branch_id:資料儲存庫內的分支 ID (通常為「0」)。
  • documents:要上傳的 discoveryengine.Document 物件清單。
  • result:類型為 discoveryengine.ImportDocumentsMetadata 的傳回值。

驗證連結器

如要驗證連接器是否正常運作,請執行測試,確保資料能從來源正確流向 Discovery Engine。

Python

    SITE = "https://altostrat.com"
    PROJECT_ID = "ucs-3p-connectors-testing"
    LOCATION = "global"
    IDENTITY_MAPPING_STORE_ID = "your-unique-ims-id17" # A unique ID for your new store
    DATA_STORE_ID = "my-acl-ds-id1"
    BRANCH_ID = "0"

    posts = fetch_posts(SITE)
    docs = convert_posts_to_documents(posts)
    print(f"Fetched {len(posts)} posts and converted to {len(docs)} documents.")

    try:
      # Step #1: Retrieve an existing identity mapping store or create a new identity mapping store
      ims_store = get_or_create_ims_data_store(PROJECT_ID, LOCATION, IDENTITY_MAPPING_STORE_ID)
      print(f"STEP #1: IMS Store Retrieval/Creation: {ims_store}")

      RAW_IDENTITY_MAPPING_DATA = [
          discoveryengine.IdentityMappingEntry(
              external_identity="external_id_1",
              user_id="testuser1@example.com",
          ),
          discoveryengine.IdentityMappingEntry(
              external_identity="external_id_2",
              user_id="testuser2@example.com",
          ),
          discoveryengine.IdentityMappingEntry(
              external_identity="external_id_2",
              group_id="testgroup1@example.com",
          )
      ]

      # Step #2: Load IMS Data
      response = load_ims_data(ims_store, RAW_IDENTITY_MAPPING_DATA)
      print(
          "\nStep #2: Load Data in IMS Store successful.", response
      )

      # Step #3: Create Entity Data Store & Bind IMS Data Store
      data_store =  get_or_create_data_store(PROJECT_ID, LOCATION, "my-acl-datastore", DATA_STORE_ID, ims_store.name)
      print("\nStep #3: Entity Data Store Create Result: ", data_store)

      metadata = upload_documents_inline(
          PROJECT_ID, LOCATION, DATA_STORE_ID, BRANCH_ID, docs
      )
      print(f"Uploaded {metadata.success_count} documents inline.")

    except gcp_exceptions.GoogleAPICallError as e:
      print(f"\n--- API Call Failed ---")
      print(f"Server Error Message: {e.message}")
      print(f"Status Code: {e.code}")

    except Exception as e:
      print(f"An error occurred: {e}")

確認連接器程式碼使用下列主要變數:

  • SITE:第三方資料來源的基本網址。
  • PROJECT_ID:您的 Google Cloud 專案 ID
  • LOCATION:資源的 Google Cloud 位置。
  • IDENTITY_MAPPING_STORE_ID:身分對應商店的專屬 ID。
  • DATA_STORE_ID:資料商店的專屬 ID。
  • BRANCH_ID:資料儲存庫中的分支 ID。
  • posts:儲存從第三方來源擷取的貼文。
  • docs:以 discoveryengine.Document 格式儲存轉換後的文件。
  • ims_store:用於身分對應的已擷取或建立 discoveryengine.DataStore 物件。
  • RAW_IDENTITY_MAPPING_DATA:discoveryengine.IdentityMappingEntry 物件的清單。

預期輸出內容:

Shell

  Fetched 20 posts and converted to 20 documents.
  STEP #1: IMS Store Retrieval/Creation: "projects/ <Project Number>/locations/global/identityMappingStores/your-unique-ims-id17"
  Step #2: Load Data in IMS Store successful.
  Step #3: Entity Data Store Create Result: "projects/ <Project Number>/locations/global/collections/default_collection/dataStores/my-acl-ds-id1"
  display_name: "my-acl-datastore"
  industry_vertical: GENERIC
  create_time {
    seconds: 1760906997
    nanos: 192641000
  }
  default_schema_id: "default_schema"
  acl_enabled: true
  identity_mapping_store: "projects/ <Project Number>/locations/global/identityMappingStores/your-unique-ims-id17".
  Uploaded 20 documents inline.

此時,您也可以在 Google Google Cloud 控制台中查看資料存放區:

自訂連接器資料儲存庫
自訂連接器資料儲存庫。

使用「儲存空間上傳」 Google Cloud 建立連接器

雖然內嵌匯入功能很適合開發作業,但生產環境連接器應使用 Google Cloud 儲存空間,以提升可擴充性並啟用完整對帳模式。這個方法可有效處理大型資料集,並支援自動刪除第三方資料來源中已不存在的文件。

將文件轉換為 JSONL

如要準備文件,以便大量匯入 Discovery Engine,請將文件轉換為 JSON Lines 格式。

Python

    def convert_documents_to_jsonl(
        documents: List[discoveryengine.Document],
    ) -> str:
        """Serialize Document messages to JSONL."""
        return "\n".join(
            discoveryengine.Document.to_json(doc, indent=None)
            for doc in documents
        ) + "\n"

convert_documents_to_jsonl 函式會使用下列變數:

  • documents:要轉換的 discoveryengine.Document 物件清單。

上傳至 Google Cloud 儲存空間

如要有效大量匯入資料,請在 Google Cloud 儲存空間中暫存資料。

Python

    def upload_jsonl_to_gcs(jsonl: str, bucket_name: str, blob_name: str) -> str:
        """Upload JSONL content to Google Cloud Storage."""
        client = storage.Client()
        bucket = client.bucket(bucket_name)
        blob = bucket.blob(blob_name)
        blob.upload_from_string(jsonl, content_type="application/json")
        return f"gs://{bucket_name}/{blob_name}"

upload_jsonl_to_gcs 函式會使用下列主要變數:

  • jsonl:要上傳的 JSONL 格式字串內容。
  • bucket_name: Google Cloud Storage bucket 的名稱。
  • blob_name:指定值區中的 Blob (物件) 名稱。

從 Google Cloud 儲存空間匯入並完成完整對帳

如要使用完整對帳模式執行完整資料同步,請使用這個方法。這可確保資料儲存庫與第三方資料來源完全一致,並自動移除不再存在的文件。

Python

    def import_documents_from_gcs(
        project_id: str,
        location: str,
        data_store_id: str,
        branch_id: str,
        gcs_uri: str,
    ) -> discoveryengine.ImportDocumentsMetadata:
        """Bulk-import documents from Google Cloud Storage with FULL reconciliation mode."""
        client = discoveryengine.DocumentServiceClient()
        parent = client.branch_path(
            project=project_id,
            location=location,
            data_store=data_store_id,
            branch=branch_id,
        )
        gcs_source = discoveryengine.GcsSource(input_uris=[gcs_uri])
        request = discoveryengine.ImportDocumentsRequest(
            parent=parent,
            gcs_source=gcs_source,
            reconciliation_mode=
                discoveryengine.ImportDocumentsRequest
                .ReconciliationMode.FULL,
        )
        operation = client.import_documents(request=request)
        operation.result()
        return operation.metadata

import_documents_from_gcs 函式會使用下列鍵變數:

  • project_id: Google Cloud 專案的 ID。
  • location:資料存放區的位置。 Google Cloud
  • data_store_id:資料儲存庫的 ID。
  • branch_id:資料儲存庫內的分支 ID (通常為「0」)。
  • gcs_uri:指向 JSONL 檔案的 Google Cloud 儲存空間 URI。

測試 Google Cloud Storage 上傳作業

如要驗證 Google Cloud 以儲存空間為基礎的匯入工作流程,請執行下列操作:

Python

  BUCKET = "your-existing-bucket"
  BLOB = "path-to-any-blob/wp/posts.jsonl"
  SITE = "https://altostrat.com"
  PROJECT_ID = "ucs-3p-connectors-testing"
  LOCATION = "global"
  IDENTITY_MAPPING_STORE_ID = "your-unique-ims-id17" # A unique ID for your new store
  DATA_STORE_ID = "your-data-store-id"
  BRANCH_ID = "0"
  jsonl_payload = convert_documents_to_jsonl(docs)
  gcs_uri = upload_jsonl_to_gcs(jsonl_payload, BUCKET, BLOB)
  posts = fetch_posts(SITE)
  docs = convert_posts_to_documents(posts)
  print(f"Fetched {len(posts)} posts and converted to {len(docs)} documents.")
  print("Uploaded to:", gcs_uri)

  metadata = import_documents_from_gcs(
      PROJECT_ID, LOCATION, DATA_STORE_ID, BRANCH_ID, gcs_uri
  )
  print(f"Imported: {metadata.success_count} documents")

測試 Google Cloud 儲存空間上傳時會用到下列主要變數:

  • BUCKET: Google Cloud Storage bucket 的名稱。
  • BLOB:bucket 內 blob 的路徑。
  • SITE:第三方資料來源的基本網址。
  • PROJECT_ID:您的 Google Cloud 專案 ID
  • LOCATION:資源的位置 (例如 Google Cloud 「global」)。
  • IDENTITY_MAPPING_STORE_ID:身分對應商店的專屬 ID。
  • DATA_STORE_ID:資料商店的專屬 ID。
  • BRANCH_ID:資料儲存庫內的分支 ID (通常為「0」)。
  • jsonl_payload:轉換為 JSONL 格式的文件。
  • gcs_uri:上傳的 JSONL 檔案的 Google Cloud 儲存空間 URI。

預期輸出內容:

Shell

    Fetched 20 posts and converted to 20 documents.
    Uploaded to: gs://alex-de-bucket/wp/posts.jsonl
    Imported: 20 documents

管理權限

如要在企業環境中管理文件層級的存取權,Gemini Enterprise 支援存取控制清單 (ACL) 和身分對應,有助於限制使用者可查看的內容。

在資料儲存區中啟用 ACL

如要在建立資料儲存庫時啟用 ACL,請執行下列指令:

Python

  # get_or_create_data_store()
  "data_store": discoveryengine.DataStore(
      display_name=data_store_id,
      industry_vertical=discoveryengine.IndustryVertical.GENERIC,
      acl_enabled=True, # ADDED
  )

在文件中新增 ACL

如要在轉換文件時計算並納入 AclInfo,請執行下列步驟:

Python

  # convert_posts_to_documents()
  doc = discoveryengine.Document(
      id=str(post["id"]),
      json_data=json.dumps(payload),
      acl_info=discoveryengine.Document.AclInfo(
          readers=[{
              "principals": [
                  {"user_id": "baklavainthebalkans@gmail.com"},
                  {"user_id": "cloudysanfrancisco@gmail.com"}
              ]
          }]
      ),
  )

公開內容

如要將文件設為可公開存取,請依下列方式設定 readers 欄位:

Python

  readers=[{"idp_wide": True}]

驗證 ACL

如要驗證 ACL 設定是否正常運作,請考慮下列事項:

  • 以無法存取文件的使用者身分進行搜尋。

  • 檢查 Cloud Storage 中上傳的文件結構,並與參照文件比較。

JSON

  {
    "id": "108",
    "jsonData": "{...}",
    "aclInfo": {
      "readers": [
        {
          "principals": [
            { "userId": "baklavainthebalkans@gmail.com" },
            { "userId": "cloudysanfrancisco@gmail.com" }
          ],
          "idpWide": false
        }
      ]
    }
  }

使用識別資訊對應

在下列情況下使用身分識別對應:

  • 第三方資料來源使用非 Google 身分

  • 您想參照自訂群組 (例如 wp-admins),而非個別使用者

  • API 只會傳回群組名稱

  • 您需要手動將使用者分組,以確保規模或一致性

如要進行身分對應,請按照下列步驟操作:

  1. 建立並連結身分資料儲存庫。
  2. 匯入外部身分 (例如 external_group:wp-admins)。匯入時請勿加入 external_group: 前置字元,例如:

    JSON

      {
        "externalIdentity": "wp-admins",
        "userId": "user@example.com"
      }
    
  3. 文件ACL 資訊中,於 principal identifier 中定義外部實體 ID。參照自訂群組時,請在 groupId 欄位中使用 external_group: 前置字元。

  4. 匯入時,文件 ACL 資訊中的群組 ID 必須加上 external_group: 前置字元,但將身分匯入對應儲存空間時則不需要。 含有身分識別資訊對應的範例文件:

    JSON

      {
        "id": "108",
        "aclInfo": {
          "readers": [
            {
              "principals": [
                {
                  "userId": "cloudysanfrancisco@gmail.com"
                },
                {
                  "groupId": "external_group:wp-admins"
                }
              ]
            }
          ]
        },
        "structData": {
          "id": 108,
          "date": "2025-04-24T18:16:04",
          ...
        }
      }
    

後續步驟

Gemini Enterprise 用戶端程式庫