本頁說明如何建立自訂連接器。
事前準備
開始之前,請務必備妥以下項目:
取得專案的 Discovery Engine 管理員存取權。 Google Cloud
取得第三方資料來源的存取憑證 (例如 API 金鑰或資料庫驗證)。
制定明確的資料對應計畫。這必須包括要建立索引的欄位,以及如何表示存取權控管 (包括第三方身分)。
建立基本連接器
本節將示範如何以所選語言建立自訂連接器。這裡顯示的原則和模式適用於任何外部系統。只要以所選語言,為特定來源調整 API 呼叫和資料轉換,即可建立基本連接器。
擷取資料
如要開始使用,請從第三方資料來源擷取資料。在本範例中,我們將示範如何使用分頁擷取貼文。在實際工作環境中,建議針對大型資料集使用串流方法。這樣可避免一次載入所有資料時可能發生的記憶體問題。
Python
def fetch_posts(base_url: str, per_page: int = 15) -> List[dict]:
#Fetch all posts from the given site.#
url = base_url.rstrip("/") + "/wp-json/wp/v2/posts"
posts: List[dict] = []
page = 1
while True:
resp = requests.get(
url,
params={"page": page, "per_page": per_page},
)
resp.raise_for_status()
batch = resp.json()
posts.extend(batch)
if len(batch) < per_page:
break
page += 1
return posts
轉換資料
如要將來源資料轉換為 Discovery Engine 文件格式,請按照下列範例酬載的結構進行。您可以視需要加入任意數量的鍵/值組合。舉例來說,您可以加入完整內容,進行全面搜尋。或者,您也可以加入結構化欄位進行多面向搜尋,或結合兩種方式。
Python
def convert_posts_to_documents(posts: List[dict]) -> List[discoveryengine.Document]:
# Convert WP posts into Discovery Engine Document messages.
docs: List[discoveryengine.Document] = []
for post in posts:
payload = {
"title": post.get("title", {}).get("rendered"),
"body": post.get("content", {}).get("rendered"),
"url": post.get("link"),
"author": post.get("author"),
"categories": post.get("categories"),
"tags": post.get("tags"),
"date": post.get("date"),
}
doc = discoveryengine.Document(
id=str(post["id"]),
json_data=json.dumps(payload),
)
docs.append(doc)
return docs
擷取或建立身分識別儲存空間
如要管理使用者身分和群組以控管存取權,您必須擷取或建立身分識別存放區。這項函式會依據 ID、專案和位置取得現有的身分識別商店。如果身分識別存放區不存在,系統會建立並傳回新的空白身分識別存放區。
Python
def get_or_create_ims_data_store(
project_id: str,
location: str,
identity_mapping_store_id: str,
) -> discoveryengine.DataStore:
"""Get or create a DataStore."""
# Initialize the client
client_ims = discoveryengine.IdentityMappingStoreServiceClient()
# Construct the parent resource name
parent_ims = client_ims.location_path(project=project_id, location=location)
try:
# Create the request object
name = f"projects/{project_id}/locations/{location}/identityMappingStores/{identity_mapping_store_id}"
request = discoveryengine.GetIdentityMappingStoreRequest(
name=name,
)
return client_ims.get_identity_mapping_store(request=request)
except:
# Create the IdentityMappingStore object (it can be empty for basic creation)
identity_mapping_store = discoveryengine.IdentityMappingStore()
# Create the request object
request = discoveryengine.CreateIdentityMappingStoreRequest(
parent=parent_ims,
identity_mapping_store=identity_mapping_store,
identity_mapping_store_id=identity_mapping_store_id,
)
return client_ims.create_identity_mapping_store(request=request)
get_or_create_ims_data_store 函式會使用下列鍵變數:
project_id: Google Cloud 專案的 ID。location:身分對應商店的位置。 Google Cloudidentity_mapping_store_id:身分識別資訊商店的專屬 ID。client_ims:discoveryengine.IdentityMappingStoreServiceClient 的執行個體,用於與身分識別資訊商店 API 互動。parent_ims:父項位置的資源名稱,使用 client_ims.location_path 建構。name:身分對應商店的完整資源名稱,用於 GetIdentityMappingStoreRequest。
將識別資訊對應檔匯入識別資訊存放區
如要將身分對應項目載入指定的身分存放區,請使用這個函式。這項作業會接收身分對應項目清單,並啟動內嵌匯入作業。這對建立存取控管和個人化所需的關係 (使用者、群組和外部身分) 至關重要。
Python
def load_ims_data(
ims_store: discoveryengine.DataStore,
id_mapping_data: list[discoveryengine.IdentityMappingEntry],
) -> discoveryengine.DataStore:
"""Get the IMS data store."""
# Initialize the client
client_ims = discoveryengine.IdentityMappingStoreServiceClient()
# Create the InlineSource object
inline_source = discoveryengine.ImportIdentityMappingsRequest.InlineSource(
identity_mapping_entries=id_mapping_data
)
# Create the main request object
request_ims = discoveryengine.ImportIdentityMappingsRequest(
identity_mapping_store=ims_store.name,
inline_source=inline_source,
)
try:
# Create the InlineSource object, which holds your list of entries
operation = client_ims.import_identity_mappings(
request=request_ims,
)
result = operation.result()
return result
except Exception as e:
print(f"IMS Load Error: {e}")
result = operation.result()
return result
load_ims_data 函式會使用下列主要變數:
ims_store:discoveryengine.DataStore 物件,代表要載入資料的身分對應商店。id_mapping_data:discoveryengine.IdentityMappingEntry 物件清單,每個物件都包含外部身分及其對應的使用者或群組 ID。result:discoveryengine.DataStore 類型的傳回值。
建立資料儲存庫
如要使用自訂連接器,請先初始化內容的資料存放區。自訂連接器請使用 default_collection。IndustryVertical 參數可自訂資料存放區的行為,以因應特定用途。GENERIC 適用於大多數情況。不過,您可以為特定產業選擇不同的值,例如 MEDIA 或 HEALTHCARE_FHIR。設定顯示名稱和其他屬性,以符合專案的命名慣例和需求。
Python
def get_or_create_data_store(
project_id: str,
location: str,
display_name: str,
data_store_id: str,
identity_mapping_store: str,
) -> discoveryengine.DataStore:
"""Get or create a DataStore."""
client = discoveryengine.DataStoreServiceClient()
ds_name = client.data_store_path(project_id, location, data_store_id)
try:
result = client.get_data_store(request={"name": ds_name})
return result
except:
parent = client.collection_path(project_id, location, "default_collection")
operation = client.create_data_store(
request={
"parent": parent,
"data_store": discoveryengine.DataStore(
display_name=display_name,
acl_enabled=True,
industry_vertical=discoveryengine.IndustryVertical.GENERIC,
identity_mapping_store=identity_mapping_store,
),
"data_store_id": data_store_id,
}
)
result = operation.result()
return result
get_or_create_data_store 函式會使用下列鍵變數:
project_id: Google Cloud 專案的 ID。location:資料存放區的位置。 Google Clouddisplay_name:資料儲區的易記顯示名稱。data_store_id:資料儲存庫的專屬 ID。identity_mapping_store:要繫結的身分對應存放區資源名稱。result:discoveryengine.DataStore 類型的傳回值。
直接上傳文件
如要直接將文件傳送至 Discovery Engine,請使用內嵌上傳功能。這個方法預設使用增量對帳模式,不支援完整對帳模式。在增量模式下,系統會新增文件並更新現有文件,但不會刪除來源中已不存在的文件。完整對帳模式會將資料儲存庫與來源資料同步處理,包括刪除來源中已不存在的文件。
如果系統 (例如 CRM) 經常處理資料的微小變更,就非常適合使用增量對帳。系統只會傳送特定變更,而非同步處理整個資料庫,因此速度更快、效率更高。您仍可定期執行完整同步作業,確保整體資料完整性。
Python
def upload_documents_inline(
project_id: str,
location: str,
data_store_id: str,
branch_id: str,
documents: List[discoveryengine.Document],
) -> discoveryengine.ImportDocumentsMetadata:
"""Inline import of Document messages."""
client = discoveryengine.DocumentServiceClient()
parent = client.branch_path(
project=project_id,
location=location,
data_store=data_store_id,
branch=branch_id,
)
request = discoveryengine.ImportDocumentsRequest(
parent=parent,
inline_source=discoveryengine.ImportDocumentsRequest.InlineSource(
documents=documents,
),
)
operation = client.import_documents(request=request)
operation.result()
result = operation.metadata
return result
upload_documents_inline 函式會使用下列重要變數:
project_id: Google Cloud 專案的 ID。location:資料存放區的位置。 Google Clouddata_store_id:資料儲存庫的 ID。branch_id:資料儲存庫內的分支 ID (通常為「0」)。documents:要上傳的 discoveryengine.Document 物件清單。result:類型為 discoveryengine.ImportDocumentsMetadata 的傳回值。
驗證連結器
如要驗證連接器是否正常運作,請執行測試,確保資料能從來源正確流向 Discovery Engine。
Python
SITE = "https://altostrat.com"
PROJECT_ID = "ucs-3p-connectors-testing"
LOCATION = "global"
IDENTITY_MAPPING_STORE_ID = "your-unique-ims-id17" # A unique ID for your new store
DATA_STORE_ID = "my-acl-ds-id1"
BRANCH_ID = "0"
posts = fetch_posts(SITE)
docs = convert_posts_to_documents(posts)
print(f"Fetched {len(posts)} posts and converted to {len(docs)} documents.")
try:
# Step #1: Retrieve an existing identity mapping store or create a new identity mapping store
ims_store = get_or_create_ims_data_store(PROJECT_ID, LOCATION, IDENTITY_MAPPING_STORE_ID)
print(f"STEP #1: IMS Store Retrieval/Creation: {ims_store}")
RAW_IDENTITY_MAPPING_DATA = [
discoveryengine.IdentityMappingEntry(
external_identity="external_id_1",
user_id="testuser1@example.com",
),
discoveryengine.IdentityMappingEntry(
external_identity="external_id_2",
user_id="testuser2@example.com",
),
discoveryengine.IdentityMappingEntry(
external_identity="external_id_2",
group_id="testgroup1@example.com",
)
]
# Step #2: Load IMS Data
response = load_ims_data(ims_store, RAW_IDENTITY_MAPPING_DATA)
print(
"\nStep #2: Load Data in IMS Store successful.", response
)
# Step #3: Create Entity Data Store & Bind IMS Data Store
data_store = get_or_create_data_store(PROJECT_ID, LOCATION, "my-acl-datastore", DATA_STORE_ID, ims_store.name)
print("\nStep #3: Entity Data Store Create Result: ", data_store)
metadata = upload_documents_inline(
PROJECT_ID, LOCATION, DATA_STORE_ID, BRANCH_ID, docs
)
print(f"Uploaded {metadata.success_count} documents inline.")
except gcp_exceptions.GoogleAPICallError as e:
print(f"\n--- API Call Failed ---")
print(f"Server Error Message: {e.message}")
print(f"Status Code: {e.code}")
except Exception as e:
print(f"An error occurred: {e}")
確認連接器程式碼使用下列主要變數:
SITE:第三方資料來源的基本網址。PROJECT_ID:您的 Google Cloud 專案 IDLOCATION:資源的 Google Cloud 位置。IDENTITY_MAPPING_STORE_ID:身分對應商店的專屬 ID。DATA_STORE_ID:資料商店的專屬 ID。BRANCH_ID:資料儲存庫中的分支 ID。posts:儲存從第三方來源擷取的貼文。docs:以 discoveryengine.Document 格式儲存轉換後的文件。ims_store:用於身分對應的已擷取或建立 discoveryengine.DataStore 物件。RAW_IDENTITY_MAPPING_DATA:discoveryengine.IdentityMappingEntry 物件的清單。
預期輸出內容:
Shell
Fetched 20 posts and converted to 20 documents.
STEP #1: IMS Store Retrieval/Creation: "projects/ <Project Number>/locations/global/identityMappingStores/your-unique-ims-id17"
Step #2: Load Data in IMS Store successful.
Step #3: Entity Data Store Create Result: "projects/ <Project Number>/locations/global/collections/default_collection/dataStores/my-acl-ds-id1"
display_name: "my-acl-datastore"
industry_vertical: GENERIC
create_time {
seconds: 1760906997
nanos: 192641000
}
default_schema_id: "default_schema"
acl_enabled: true
identity_mapping_store: "projects/ <Project Number>/locations/global/identityMappingStores/your-unique-ims-id17".
Uploaded 20 documents inline.
此時,您也可以在 Google Google Cloud 控制台中查看資料存放區:
使用「儲存空間上傳」 Google Cloud 建立連接器
雖然內嵌匯入功能很適合開發作業,但生產環境連接器應使用 Google Cloud 儲存空間,以提升可擴充性並啟用完整對帳模式。這個方法可有效處理大型資料集,並支援自動刪除第三方資料來源中已不存在的文件。
將文件轉換為 JSONL
如要準備文件,以便大量匯入 Discovery Engine,請將文件轉換為 JSON Lines 格式。
Python
def convert_documents_to_jsonl(
documents: List[discoveryengine.Document],
) -> str:
"""Serialize Document messages to JSONL."""
return "\n".join(
discoveryengine.Document.to_json(doc, indent=None)
for doc in documents
) + "\n"
convert_documents_to_jsonl 函式會使用下列變數:
documents:要轉換的 discoveryengine.Document 物件清單。
上傳至 Google Cloud 儲存空間
如要有效大量匯入資料,請在 Google Cloud 儲存空間中暫存資料。
Python
def upload_jsonl_to_gcs(jsonl: str, bucket_name: str, blob_name: str) -> str:
"""Upload JSONL content to Google Cloud Storage."""
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
blob.upload_from_string(jsonl, content_type="application/json")
return f"gs://{bucket_name}/{blob_name}"
upload_jsonl_to_gcs 函式會使用下列主要變數:
jsonl:要上傳的 JSONL 格式字串內容。bucket_name: Google Cloud Storage bucket 的名稱。blob_name:指定值區中的 Blob (物件) 名稱。
從 Google Cloud 儲存空間匯入並完成完整對帳
如要使用完整對帳模式執行完整資料同步,請使用這個方法。這可確保資料儲存庫與第三方資料來源完全一致,並自動移除不再存在的文件。
Python
def import_documents_from_gcs(
project_id: str,
location: str,
data_store_id: str,
branch_id: str,
gcs_uri: str,
) -> discoveryengine.ImportDocumentsMetadata:
"""Bulk-import documents from Google Cloud Storage with FULL reconciliation mode."""
client = discoveryengine.DocumentServiceClient()
parent = client.branch_path(
project=project_id,
location=location,
data_store=data_store_id,
branch=branch_id,
)
gcs_source = discoveryengine.GcsSource(input_uris=[gcs_uri])
request = discoveryengine.ImportDocumentsRequest(
parent=parent,
gcs_source=gcs_source,
reconciliation_mode=
discoveryengine.ImportDocumentsRequest
.ReconciliationMode.FULL,
)
operation = client.import_documents(request=request)
operation.result()
return operation.metadata
import_documents_from_gcs 函式會使用下列鍵變數:
project_id: Google Cloud 專案的 ID。location:資料存放區的位置。 Google Clouddata_store_id:資料儲存庫的 ID。branch_id:資料儲存庫內的分支 ID (通常為「0」)。gcs_uri:指向 JSONL 檔案的 Google Cloud 儲存空間 URI。
測試 Google Cloud Storage 上傳作業
如要驗證 Google Cloud 以儲存空間為基礎的匯入工作流程,請執行下列操作:
Python
BUCKET = "your-existing-bucket"
BLOB = "path-to-any-blob/wp/posts.jsonl"
SITE = "https://altostrat.com"
PROJECT_ID = "ucs-3p-connectors-testing"
LOCATION = "global"
IDENTITY_MAPPING_STORE_ID = "your-unique-ims-id17" # A unique ID for your new store
DATA_STORE_ID = "your-data-store-id"
BRANCH_ID = "0"
jsonl_payload = convert_documents_to_jsonl(docs)
gcs_uri = upload_jsonl_to_gcs(jsonl_payload, BUCKET, BLOB)
posts = fetch_posts(SITE)
docs = convert_posts_to_documents(posts)
print(f"Fetched {len(posts)} posts and converted to {len(docs)} documents.")
print("Uploaded to:", gcs_uri)
metadata = import_documents_from_gcs(
PROJECT_ID, LOCATION, DATA_STORE_ID, BRANCH_ID, gcs_uri
)
print(f"Imported: {metadata.success_count} documents")
測試 Google Cloud 儲存空間上傳時會用到下列主要變數:
BUCKET: Google Cloud Storage bucket 的名稱。BLOB:bucket 內 blob 的路徑。SITE:第三方資料來源的基本網址。PROJECT_ID:您的 Google Cloud 專案 IDLOCATION:資源的位置 (例如 Google Cloud 「global」)。IDENTITY_MAPPING_STORE_ID:身分對應商店的專屬 ID。DATA_STORE_ID:資料商店的專屬 ID。BRANCH_ID:資料儲存庫內的分支 ID (通常為「0」)。jsonl_payload:轉換為 JSONL 格式的文件。gcs_uri:上傳的 JSONL 檔案的 Google Cloud 儲存空間 URI。
預期輸出內容:
Shell
Fetched 20 posts and converted to 20 documents.
Uploaded to: gs://alex-de-bucket/wp/posts.jsonl
Imported: 20 documents
管理權限
如要在企業環境中管理文件層級的存取權,Gemini Enterprise 支援存取控制清單 (ACL) 和身分對應,有助於限制使用者可查看的內容。
在資料儲存區中啟用 ACL
如要在建立資料儲存庫時啟用 ACL,請執行下列指令:
Python
# get_or_create_data_store()
"data_store": discoveryengine.DataStore(
display_name=data_store_id,
industry_vertical=discoveryengine.IndustryVertical.GENERIC,
acl_enabled=True, # ADDED
)
在文件中新增 ACL
如要在轉換文件時計算並納入 AclInfo,請執行下列步驟:
Python
# convert_posts_to_documents()
doc = discoveryengine.Document(
id=str(post["id"]),
json_data=json.dumps(payload),
acl_info=discoveryengine.Document.AclInfo(
readers=[{
"principals": [
{"user_id": "baklavainthebalkans@gmail.com"},
{"user_id": "cloudysanfrancisco@gmail.com"}
]
}]
),
)
公開內容
如要將文件設為可公開存取,請依下列方式設定 readers 欄位:
Python
readers=[{"idp_wide": True}]
驗證 ACL
如要驗證 ACL 設定是否正常運作,請考慮下列事項:
以無法存取文件的使用者身分進行搜尋。
檢查 Cloud Storage 中上傳的文件結構,並與參照文件比較。
JSON
{
"id": "108",
"jsonData": "{...}",
"aclInfo": {
"readers": [
{
"principals": [
{ "userId": "baklavainthebalkans@gmail.com" },
{ "userId": "cloudysanfrancisco@gmail.com" }
],
"idpWide": false
}
]
}
}
使用識別資訊對應
在下列情況下使用身分識別對應:
第三方資料來源使用非 Google 身分
您想參照自訂群組 (例如 wp-admins),而非個別使用者
API 只會傳回群組名稱
您需要手動將使用者分組,以確保規模或一致性
如要進行身分對應,請按照下列步驟操作:
- 建立並連結身分資料儲存庫。
匯入外部身分 (例如 external_group:wp-admins)。匯入時請勿加入 external_group: 前置字元,例如:
JSON
{ "externalIdentity": "wp-admins", "userId": "user@example.com" }在文件的 ACL 資訊中,於
principal identifier中定義外部實體 ID。參照自訂群組時,請在groupId欄位中使用external_group:前置字元。匯入時,文件 ACL 資訊中的群組 ID 必須加上
external_group:前置字元,但將身分匯入對應儲存空間時則不需要。 含有身分識別資訊對應的範例文件:JSON
{ "id": "108", "aclInfo": { "readers": [ { "principals": [ { "userId": "cloudysanfrancisco@gmail.com" }, { "groupId": "external_group:wp-admins" } ] } ] }, "structData": { "id": 108, "date": "2025-04-24T18:16:04", ... } }
後續步驟
- 如要提供查詢資料的使用者介面,請在 Gemini Enterprise 中建立應用程式,然後將其連結至現有的自訂連接器資料儲存庫。