本页面介绍了如何创建自定义连接器。
准备工作
在开始之前,请确保您已执行以下操作:
获取您的 Google Cloud 项目的 Discovery Engine 管理员访问权限。
获取第三方数据源的访问凭证(例如 API 密钥或数据库身份验证)。
制定明确的数据映射计划。这必须包括要编入索引的字段以及如何表示访问权限控制(包括第三方身份)。
创建基本连接器
本部分演示了如何使用所选语言创建自定义连接器。此处所述的原则和模式适用于任何外部系统。只需使用所选语言调整特定来源的 API 调用和数据转换,即可创建基本连接器。
提取数据
首先,从第三方数据源检索数据。在此示例中,我们将演示如何使用分页功能来提取帖子。对于生产环境,我们建议使用流式处理方法来处理大型数据集。这样可以防止一次性加载所有数据时可能出现的内存问题。
Python
def fetch_posts(base_url: str, per_page: int = 15) -> List[dict]:
#Fetch all posts from the given site.#
url = base_url.rstrip("/") + "/wp-json/wp/v2/posts"
posts: List[dict] = []
page = 1
while True:
resp = requests.get(
url,
params={"page": page, "per_page": per_page},
)
resp.raise_for_status()
batch = resp.json()
posts.extend(batch)
if len(batch) < per_page:
break
page += 1
return posts
转换数据
如需将源数据转换为 Discovery Engine 文档格式,请按以下示例载荷所示设计文档结构。您可以根据需要添加任意数量的键值对。例如,您可以添加完整内容以进行全面搜索。或者,您也可以添加结构化字段以进行分面搜索,或同时添加两者的组合。
Python
def convert_posts_to_documents(posts: List[dict]) -> List[discoveryengine.Document]:
# Convert WP posts into Discovery Engine Document messages.
docs: List[discoveryengine.Document] = []
for post in posts:
payload = {
"title": post.get("title", {}).get("rendered"),
"body": post.get("content", {}).get("rendered"),
"url": post.get("link"),
"author": post.get("author"),
"categories": post.get("categories"),
"tags": post.get("tags"),
"date": post.get("date"),
}
doc = discoveryengine.Document(
id=str(post["id"]),
json_data=json.dumps(payload),
)
docs.append(doc)
return docs
检索或创建身份存储区
如需管理用户身份和群组以进行访问权限控制,您必须检索或创建身份存储区。此函数通过 ID、项目和位置获取现有身份存储区。如果身份存储区不存在,则会创建并返回一个新的空身份存储区。
Python
def get_or_create_ims_data_store(
project_id: str,
location: str,
identity_mapping_store_id: str,
) -> discoveryengine.DataStore:
"""Get or create a DataStore."""
# Initialize the client
client_ims = discoveryengine.IdentityMappingStoreServiceClient()
# Construct the parent resource name
parent_ims = client_ims.location_path(project=project_id, location=location)
try:
# Create the request object
name = f"projects/{project_id}/locations/{location}/identityMappingStores/{identity_mapping_store_id}"
request = discoveryengine.GetIdentityMappingStoreRequest(
name=name,
)
return client_ims.get_identity_mapping_store(request=request)
except:
# Create the IdentityMappingStore object (it can be empty for basic creation)
identity_mapping_store = discoveryengine.IdentityMappingStore()
# Create the request object
request = discoveryengine.CreateIdentityMappingStoreRequest(
parent=parent_ims,
identity_mapping_store=identity_mapping_store,
identity_mapping_store_id=identity_mapping_store_id,
)
return client_ims.create_identity_mapping_store(request=request)
get_or_create_ims_data_store 函数使用以下键变量:
project_id:您的 Google Cloud 项目的 ID。location:身份映射存储区的 Google Cloud 位置。identity_mapping_store_id:身份存储区的唯一标识符。client_ims:discoveryengine.IdentityMappingStoreServiceClient 的实例,用于与身份存储区 API 进行交互。parent_ims:父级位置的资源名称,使用 client_ims.location_path 构建。name:身份映射存储区的完整资源名称,用于 GetIdentityMappingStoreRequest。
将身份映射注入到身份存储区
如需将身份映射条目加载到指定的身份存储区,请使用此函数。该函数接受身份映射条目列表,并启动内嵌导入操作。这对于建立访问权限控制和个性化所需的“用户-群组-外部身份”关系至关重要。
Python
def load_ims_data(
ims_store: discoveryengine.DataStore,
id_mapping_data: list[discoveryengine.IdentityMappingEntry],
) -> discoveryengine.DataStore:
"""Get the IMS data store."""
# Initialize the client
client_ims = discoveryengine.IdentityMappingStoreServiceClient()
# Create the InlineSource object
inline_source = discoveryengine.ImportIdentityMappingsRequest.InlineSource(
identity_mapping_entries=id_mapping_data
)
# Create the main request object
request_ims = discoveryengine.ImportIdentityMappingsRequest(
identity_mapping_store=ims_store.name,
inline_source=inline_source,
)
try:
# Create the InlineSource object, which holds your list of entries
operation = client_ims.import_identity_mappings(
request=request_ims,
)
result = operation.result()
return result
except Exception as e:
print(f"IMS Load Error: {e}")
result = operation.result()
return result
load_ims_data 函数使用以下键变量:
ims_store:discoveryengine.DataStore 对象,表示加载数据的身份映射存储区。id_mapping_data:discoveryengine.IdentityMappingEntry 对象的列表,每个对象都包含一个外部身份及其对应的用户或群组 ID。result:类型为 discoveryengine.DataStore 的返回值。
创建数据存储区
如需使用自定义连接器,您必须为内容初始化数据存储区。请将 default_collection 用于自定义连接器。IndustryVertical 参数用于针对特定应用场景自定义数据存储区的行为。GENERIC 适用于大多数情况。不过,您可以为特定行业选择其他值,例如 MEDIA 或 HEALTHCARE_FHIR。根据项目的命名惯例和要求配置显示名称和其他属性。
Python
def get_or_create_data_store(
project_id: str,
location: str,
display_name: str,
data_store_id: str,
identity_mapping_store: str,
) -> discoveryengine.DataStore:
"""Get or create a DataStore."""
client = discoveryengine.DataStoreServiceClient()
ds_name = client.data_store_path(project_id, location, data_store_id)
try:
result = client.get_data_store(request={"name": ds_name})
return result
except:
parent = client.collection_path(project_id, location, "default_collection")
operation = client.create_data_store(
request={
"parent": parent,
"data_store": discoveryengine.DataStore(
display_name=display_name,
acl_enabled=True,
industry_vertical=discoveryengine.IndustryVertical.GENERIC,
identity_mapping_store=identity_mapping_store,
),
"data_store_id": data_store_id,
}
)
result = operation.result()
return result
get_or_create_data_store 函数使用以下键变量:
project_id:您的 Google Cloud 项目的 ID。location:数据存储区的 Google Cloud位置。display_name:数据存储区的人类可读显示名称。data_store_id:数据存储区的唯一标识符。identity_mapping_store:要绑定的身份映射存储区的资源名称。result:类型为 discoveryengine.DataStore 的返回值。
内嵌上传文档
如需将文档直接发送到 Discovery Engine,请使用内嵌上传功能。此方法默认使用增量协调模式,不支持完全协调模式。在增量模式下,系统会添加新文档并更新现有文档,但不会删除来源中不再存在的文档。完全协调模式会将数据存储区与您的源数据同步,包括删除来源中不再存在的文档。
增量协调非常适合处理频繁、少量数据更改的系统,例如 CRM。无需同步整个数据库,只需发送特定更改,从而加快流程并提高效率。您仍可以定期执行完全同步,以保持整体数据完整性。
Python
def upload_documents_inline(
project_id: str,
location: str,
data_store_id: str,
branch_id: str,
documents: List[discoveryengine.Document],
) -> discoveryengine.ImportDocumentsMetadata:
"""Inline import of Document messages."""
client = discoveryengine.DocumentServiceClient()
parent = client.branch_path(
project=project_id,
location=location,
data_store=data_store_id,
branch=branch_id,
)
request = discoveryengine.ImportDocumentsRequest(
parent=parent,
inline_source=discoveryengine.ImportDocumentsRequest.InlineSource(
documents=documents,
),
)
operation = client.import_documents(request=request)
operation.result()
result = operation.metadata
return result
upload_documents_inline 函数使用以下键变量:
project_id:您的 Google Cloud 项目的 ID。location:数据存储区的 Google Cloud 位置。data_store_id:数据存储区的 ID。branch_id:数据存储区中分支的 ID(通常为“0”)。documents:要上传的 discoveryengine.Document 对象的列表。result:类型为 discoveryengine.ImportDocumentsMetadata 的返回值。
验证连接器
如需验证连接器是否按预期运行,请执行测试运行,以确保从来源到 Discovery Engine 的数据传输正确。
Python
SITE = "https://altostrat.com"
PROJECT_ID = "ucs-3p-connectors-testing"
LOCATION = "global"
IDENTITY_MAPPING_STORE_ID = "your-unique-ims-id17" # A unique ID for your new store
DATA_STORE_ID = "my-acl-ds-id1"
BRANCH_ID = "0"
posts = fetch_posts(SITE)
docs = convert_posts_to_documents(posts)
print(f"Fetched {len(posts)} posts and converted to {len(docs)} documents.")
try:
# Step #1: Retrieve an existing identity mapping store or create a new identity mapping store
ims_store = get_or_create_ims_data_store(PROJECT_ID, LOCATION, IDENTITY_MAPPING_STORE_ID)
print(f"STEP #1: IMS Store Retrieval/Creation: {ims_store}")
RAW_IDENTITY_MAPPING_DATA = [
discoveryengine.IdentityMappingEntry(
external_identity="external_id_1",
user_id="testuser1@example.com",
),
discoveryengine.IdentityMappingEntry(
external_identity="external_id_2",
user_id="testuser2@example.com",
),
discoveryengine.IdentityMappingEntry(
external_identity="external_id_2",
group_id="testgroup1@example.com",
)
]
# Step #2: Load IMS Data
response = load_ims_data(ims_store, RAW_IDENTITY_MAPPING_DATA)
print(
"\nStep #2: Load Data in IMS Store successful.", response
)
# Step #3: Create Entity Data Store & Bind IMS Data Store
data_store = get_or_create_data_store(PROJECT_ID, LOCATION, "my-acl-datastore", DATA_STORE_ID, ims_store.name)
print("\nStep #3: Entity Data Store Create Result: ", data_store)
metadata = upload_documents_inline(
PROJECT_ID, LOCATION, DATA_STORE_ID, BRANCH_ID, docs
)
print(f"Uploaded {metadata.success_count} documents inline.")
except gcp_exceptions.GoogleAPICallError as e:
print(f"\n--- API Call Failed ---")
print(f"Server Error Message: {e.message}")
print(f"Status Code: {e.code}")
except Exception as e:
print(f"An error occurred: {e}")
验证连接器代码是否使用以下键变量:
SITE:第三方数据源的基础网址。PROJECT_ID:您的 Google Cloud 项目 ID。LOCATION:资源所在的 Google Cloud 位置。IDENTITY_MAPPING_STORE_ID:身份映射存储区的唯一 ID。DATA_STORE_ID:数据存储区的唯一 ID。BRANCH_ID:数据存储区中分支的 ID。posts:存储从第三方来源提取的帖子。docs:以 discoveryengine.Document 格式存储转换后的文档。ims_store:已检索或创建的 discoveryengine.DataStore 对象,用于身份映射。RAW_IDENTITY_MAPPING_DATA:discoveryengine.IdentityMappingEntry 对象的列表。
预期输出:
Shell
Fetched 20 posts and converted to 20 documents.
STEP #1: IMS Store Retrieval/Creation: "projects/ <Project Number>/locations/global/identityMappingStores/your-unique-ims-id17"
Step #2: Load Data in IMS Store successful.
Step #3: Entity Data Store Create Result: "projects/ <Project Number>/locations/global/collections/default_collection/dataStores/my-acl-ds-id1"
display_name: "my-acl-datastore"
industry_vertical: GENERIC
create_time {
seconds: 1760906997
nanos: 192641000
}
default_schema_id: "default_schema"
acl_enabled: true
identity_mapping_store: "projects/ <Project Number>/locations/global/identityMappingStores/your-unique-ims-id17".
Uploaded 20 documents inline.
此时,您还可以在 Google Google Cloud 控制台中看到数据存储区:
创建使用 Google Cloud Storage 上传功能的连接器
虽然内嵌导入功能非常适合开发,但生产连接器应使用 Google Cloud Storage,以提高可伸缩性并启用完全协调模式。此方法可高效处理大型数据集,并支持自动删除第三方数据源中不再存在的文档。
将文档转换为 JSONL
如需准备文档以批量导入 Discovery Engine,请将其转换为 JSON 行格式。
Python
def convert_documents_to_jsonl(
documents: List[discoveryengine.Document],
) -> str:
"""Serialize Document messages to JSONL."""
return "\n".join(
discoveryengine.Document.to_json(doc, indent=None)
for doc in documents
) + "\n"
convert_documents_to_jsonl 函数使用以下变量:
documents:要转换的 discoveryengine.Document 对象的列表。
上传到 Google Cloud Storage
为了实现高效的批量导入,请将数据暂存在 Google Cloud Storage 中。
Python
def upload_jsonl_to_gcs(jsonl: str, bucket_name: str, blob_name: str) -> str:
"""Upload JSONL content to Google Cloud Storage."""
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
blob.upload_from_string(jsonl, content_type="application/json")
return f"gs://{bucket_name}/{blob_name}"
upload_jsonl_to_gcs 函数使用以下键变量:
jsonl:要上传的 JSONL 格式的字符串内容。bucket_name: Google Cloud Storage 存储桶的名称。blob_name:指定存储桶中的 blob(对象)的名称。
从 Google Cloud Storage 导入并进行完全协调
如需使用完全协调模式执行完整的数据同步,请使用此方法。这样可确保数据存储区完全镜像第三方数据源,并自动移除不再存在的所有文档。
Python
def import_documents_from_gcs(
project_id: str,
location: str,
data_store_id: str,
branch_id: str,
gcs_uri: str,
) -> discoveryengine.ImportDocumentsMetadata:
"""Bulk-import documents from Google Cloud Storage with FULL reconciliation mode."""
client = discoveryengine.DocumentServiceClient()
parent = client.branch_path(
project=project_id,
location=location,
data_store=data_store_id,
branch=branch_id,
)
gcs_source = discoveryengine.GcsSource(input_uris=[gcs_uri])
request = discoveryengine.ImportDocumentsRequest(
parent=parent,
gcs_source=gcs_source,
reconciliation_mode=
discoveryengine.ImportDocumentsRequest
.ReconciliationMode.FULL,
)
operation = client.import_documents(request=request)
operation.result()
return operation.metadata
import_documents_from_gcs 函数使用以下键变量:
project_id:您的 Google Cloud 项目的 ID。location:数据存储区的 Google Cloud 位置。data_store_id:数据存储区的 ID。branch_id:数据存储区中分支的 ID(通常为“0”)。gcs_uri:指向 JSONL 文件的 Google Cloud Storage URI。
测试 Google Cloud Storage 上传
如需验证基于 Google Cloud Storage 的导入工作流,请执行以下操作:
Python
BUCKET = "your-existing-bucket"
BLOB = "path-to-any-blob/wp/posts.jsonl"
SITE = "https://altostrat.com"
PROJECT_ID = "ucs-3p-connectors-testing"
LOCATION = "global"
IDENTITY_MAPPING_STORE_ID = "your-unique-ims-id17" # A unique ID for your new store
DATA_STORE_ID = "your-data-store-id"
BRANCH_ID = "0"
jsonl_payload = convert_documents_to_jsonl(docs)
gcs_uri = upload_jsonl_to_gcs(jsonl_payload, BUCKET, BLOB)
posts = fetch_posts(SITE)
docs = convert_posts_to_documents(posts)
print(f"Fetched {len(posts)} posts and converted to {len(docs)} documents.")
print("Uploaded to:", gcs_uri)
metadata = import_documents_from_gcs(
PROJECT_ID, LOCATION, DATA_STORE_ID, BRANCH_ID, gcs_uri
)
print(f"Imported: {metadata.success_count} documents")
以下键变量用于测试 Google Cloud Storage 上传:
BUCKET: Google Cloud Storage 存储桶的名称。BLOB:存储桶中 blob 的路径。SITE:第三方数据源的基础网址。PROJECT_ID:您的 Google Cloud 项目 ID。LOCATION:资源所在的 Google Cloud 位置(例如,“全球”)。IDENTITY_MAPPING_STORE_ID:身份映射存储区的唯一 ID。DATA_STORE_ID:数据存储区的唯一 ID。BRANCH_ID:数据存储区中分支的 ID(通常为“0”)。jsonl_payload:转换为 JSONL 格式的文档。gcs_uri:上传的 JSONL 文件的 Google Cloud Storage URI。
预期输出:
Shell
Fetched 20 posts and converted to 20 documents.
Uploaded to: gs://alex-de-bucket/wp/posts.jsonl
Imported: 20 documents
管理权限
为了在企业环境中管理文档级访问权限,Gemini Enterprise 支持访问控制列表 (ACL) 和身份映射,有助于限制用户可以查看的内容。
在数据存储区中启用 ACL
如需在创建数据存储区时启用 ACL,请执行以下操作:
Python
# get_or_create_data_store()
"data_store": discoveryengine.DataStore(
display_name=data_store_id,
industry_vertical=discoveryengine.IndustryVertical.GENERIC,
acl_enabled=True, # ADDED
)
向文档添加 ACL
如需在转换文档时计算并包含 AclInfo,请执行以下操作:
Python
# convert_posts_to_documents()
doc = discoveryengine.Document(
id=str(post["id"]),
json_data=json.dumps(payload),
acl_info=discoveryengine.Document.AclInfo(
readers=[{
"principals": [
{"user_id": "baklavainthebalkans@gmail.com"},
{"user_id": "cloudysanfrancisco@gmail.com"}
]
}]
),
)
将内容设为公开
如需将文档设为可公开访问,请按如下所述设置 readers 字段:
Python
readers=[{"idp_wide": True}]
验证 ACL
如需验证 ACL 配置是否按预期运行,请考虑以下事项:
以无权访问相应文档的用户的身份进行搜索。
检查 Cloud Storage 中上传的文档结构,并将其与引用进行比较。
JSON
{
"id": "108",
"jsonData": "{...}",
"aclInfo": {
"readers": [
{
"principals": [
{ "userId": "baklavainthebalkans@gmail.com" },
{ "userId": "cloudysanfrancisco@gmail.com" }
],
"idpWide": false
}
]
}
}
使用身份映射
在以下情况下使用身份映射:
您的第三方数据源使用非 Google 身份
您希望引用自定义群组(例如 wp-admins),而不是单个用户
API 仅返回群组名称
您需要手动对用户进行分组,以实现规模或一致性
如需执行身份映射,请按以下步骤操作:
- 创建并关联身份数据存储区。
导入外部身份(例如,external_group:wp-admins)。 导入时,请勿添加 external_group: 前缀,例如:
JSON
{ "externalIdentity": "wp-admins", "userId": "user@example.com" }在文档的 ACL 信息中,定义
principal identifier中的外部实体 ID。引用自定义群组时,请在groupId字段中使用external_group:前缀。在导入期间,文档的 ACL 信息中的群组 ID 需要使用
external_group:前缀,但在将身份导入映射存储区时,系统不会使用该前缀。 包含身份映射的示例文档:JSON
{ "id": "108", "aclInfo": { "readers": [ { "principals": [ { "userId": "cloudysanfrancisco@gmail.com" }, { "groupId": "external_group:wp-admins" } ] } ] }, "structData": { "id": 108, "date": "2025-04-24T18:16:04", ... } }