En esta página se describe cómo crear un conector personalizado.
Antes de empezar
Antes de empezar, comprueba que tienes lo siguiente:
Comprueba si la facturación está habilitada en tu Google Cloud proyecto.
Instala e inicializa la CLI de Google Cloud . Asegúrate de que esté autenticado en tu proyecto.
Obtén acceso de administrador de Discovery Engine para tu Google Cloud proyecto.
Obtén las credenciales de acceso de tu fuente de datos de terceros (como claves de API o autenticación de bases de datos).
Crea un plan de asignación de datos claro. Debe incluir qué campos se deben indexar y cómo representar el control de acceso, incluidas las identidades de terceros.
Crear un conector básico
En esta sección se muestra cómo crear un conector personalizado en el idioma que elijas. Los principios y patrones que se muestran aquí se aplican a cualquier sistema externo. Solo tienes que adaptar las llamadas a la API y las transformaciones de datos a tu fuente específica en el idioma que elijas para crear un conector básico.
Obtener datos
Para empezar, obtén datos de tu fuente de datos de terceros. En este ejemplo, mostramos cómo obtener publicaciones mediante la paginación. En los entornos de producción, recomendamos usar un enfoque de streaming para los conjuntos de datos de gran tamaño. De esta forma, se evitan problemas de memoria que pueden producirse al cargar todos los datos a la vez.
Python
def fetch_posts(base_url: str, per_page: int = 15) -> List[dict]:
#Fetch all posts from the given site.#
url = base_url.rstrip("/") + "/wp-json/wp/v2/posts"
posts: List[dict] = []
page = 1
while True:
resp = requests.get(
url,
params={"page": page, "per_page": per_page},
)
resp.raise_for_status()
batch = resp.json()
posts.extend(batch)
if len(batch) < per_page:
break
page += 1
return posts
Transformar datos
Para convertir los datos de origen al formato de documento de Discovery Engine, estructúrelos como se muestra en la siguiente carga útil de ejemplo. Puede incluir tantos pares clave-valor como necesite. Por ejemplo, puedes incluir todo el contenido para hacer una búsqueda exhaustiva. También puedes incluir campos estructurados para una búsqueda por facetas o una combinación de ambos.
Python
def convert_posts_to_documents(posts: List[dict]) -> List[discoveryengine.Document]:
# Convert WP posts into Discovery Engine Document messages.
docs: List[discoveryengine.Document] = []
for post in posts:
payload = {
"title": post.get("title", {}).get("rendered"),
"body": post.get("content", {}).get("rendered"),
"url": post.get("link"),
"author": post.get("author"),
"categories": post.get("categories"),
"tags": post.get("tags"),
"date": post.get("date"),
}
doc = discoveryengine.Document(
id=str(post["id"]),
json_data=json.dumps(payload),
)
docs.append(doc)
return docs
Recuperar o crear un almacén de identidades
Para gestionar las identidades y los grupos de usuarios con el fin de controlar el acceso, debes obtener o crear un almacén de identidades. Esta función obtiene un almacén de identidades por su ID, proyecto y ubicación. Si no existe, crea y devuelve un nuevo almacén de identidades vacío.
Python
def get_or_create_ims_data_store(
project_id: str,
location: str,
identity_mapping_store_id: str,
) -> discoveryengine.DataStore:
"""Get or create a DataStore."""
# Initialize the client
client_ims = discoveryengine.IdentityMappingStoreServiceClient()
# Construct the parent resource name
parent_ims = client_ims.location_path(project=project_id, location=location)
try:
# Create the request object
name = f"projects/{project_id}/locations/{location}/identityMappingStores/{identity_mapping_store_id}"
request = discoveryengine.GetIdentityMappingStoreRequest(
name=name,
)
return client_ims.get_identity_mapping_store(request=request)
except:
# Create the IdentityMappingStore object (it can be empty for basic creation)
identity_mapping_store = discoveryengine.IdentityMappingStore()
# Create the request object
request = discoveryengine.CreateIdentityMappingStoreRequest(
parent=parent_ims,
identity_mapping_store=identity_mapping_store,
identity_mapping_store_id=identity_mapping_store_id,
)
return client_ims.create_identity_mapping_store(request=request)
La función get_or_create_ims_data_store usa las siguientes variables de clave:
project_id: el ID de tu proyecto de Google Cloud .location: la Google Cloud ubicación de la tienda de mapeado de identidades.identity_mapping_store_id: identificador único del almacén de identidades.client_ims: una instancia de discoveryengine.IdentityMappingStoreServiceClient que se usa para interactuar con la API Identity Store.parent_ims: nombre del recurso de la ubicación principal, creado con client_ims.location_path.name: nombre completo del recurso del almacén de asignaciones de identidades, que se usa en GetIdentityMappingStoreRequest.
Ingerir mapeado de identidades en el almacén de identidades
Usa esta función para cargar entradas de mapeado de identidades en el almacén de identidades especificado. Toma una lista de entradas de asignación de identidades e inicia una operación de importación insertada. Esto es fundamental para establecer las relaciones de usuarios, grupos e identidades externas necesarias para el control de acceso y la personalización.
Python
def load_ims_data(
ims_store: discoveryengine.DataStore,
id_mapping_data: list[discoveryengine.IdentityMappingEntry],
) -> discoveryengine.DataStore:
"""Get the IMS data store."""
# Initialize the client
client_ims = discoveryengine.IdentityMappingStoreServiceClient()
# Create the InlineSource object
inline_source = discoveryengine.ImportIdentityMappingsRequest.InlineSource(
identity_mapping_entries=id_mapping_data
)
# Create the main request object
request_ims = discoveryengine.ImportIdentityMappingsRequest(
identity_mapping_store=ims_store.name,
inline_source=inline_source,
)
try:
# Create the InlineSource object, which holds your list of entries
operation = client_ims.import_identity_mappings(
request=request_ims,
)
result = operation.result()
return result
except Exception as e:
print(f"IMS Load Error: {e}")
result = operation.result()
return result
La función load_ims_data usa las siguientes variables clave:
ims_store: el objeto discoveryengine.DataStore que representa el almacén de mapeo de identidades en el que se cargarán los datos.id_mapping_data: lista de objetos discoveryengine.IdentityMappingEntry, cada uno de los cuales contiene una identidad externa y su ID de usuario o de grupo correspondiente.result: valor devuelto de tipo discoveryengine.DataStore.
Crear almacén de datos
Para usar un conector personalizado, debes inicializar un almacén de datos para tu contenido. Usa el default_collection para conectores personalizados. El parámetro IndustryVertical personaliza el comportamiento del almacén de datos para casos prácticos específicos. GENERIC es adecuado para la mayoría de los casos. Sin embargo, puede elegir un valor diferente para un sector concreto, como MEDIA o HEALTHCARE_FHIR. Configura el nombre visible y otras propiedades para que se ajusten a las convenciones y los requisitos de nomenclatura de tu proyecto.
Python
def get_or_create_data_store(
project_id: str,
location: str,
display_name: str,
data_store_id: str,
identity_mapping_store: str,
) -> discoveryengine.DataStore:
"""Get or create a DataStore."""
client = discoveryengine.DataStoreServiceClient()
ds_name = client.data_store_path(project_id, location, data_store_id)
try:
result = client.get_data_store(request={"name": ds_name})
return result
except:
parent = client.collection_path(project_id, location, "default_collection")
operation = client.create_data_store(
request={
"parent": parent,
"data_store": discoveryengine.DataStore(
display_name=display_name,
acl_enabled=True,
industry_vertical=discoveryengine.IndustryVertical.GENERIC,
identity_mapping_store=identity_mapping_store,
),
"data_store_id": data_store_id,
}
)
result = operation.result()
return result
La función get_or_create_data_store usa las siguientes variables de clave:
project_id: el ID de tu proyecto de Google Cloud .location: la Google Cloud ubicación del almacén de datos.display_name: nombre visible legible por humanos del almacén de datos.data_store_id: identificador único del almacén de datos.identity_mapping_store: nombre del recurso del almacén de mapeado de identidades que se va a vincular.result: valor devuelto de tipo discoveryengine.DataStore.
Subir documentos en línea
Para enviar documentos directamente a Discovery Engine, utiliza la subida insertada. Este método usa el modo de conciliación incremental de forma predeterminada y no admite el modo de conciliación completa. En el modo incremental, se añaden documentos nuevos y se actualizan los que ya hay, pero no se eliminan los documentos que ya no están en la fuente. El modo de conciliación completa sincroniza el almacén de datos con los datos de origen, lo que incluye la eliminación de los documentos que ya no están presentes en el origen.
La conciliación incremental es ideal para sistemas como los CRM, que gestionan cambios pequeños y frecuentes en los datos. En lugar de sincronizar toda la base de datos, solo se envían los cambios específicos, lo que hace que el proceso sea más rápido y eficiente. Sin embargo, se puede realizar una sincronización completa periódicamente para mantener la integridad general de los datos.
Python
def upload_documents_inline(
project_id: str,
location: str,
data_store_id: str,
branch_id: str,
documents: List[discoveryengine.Document],
) -> discoveryengine.ImportDocumentsMetadata:
"""Inline import of Document messages."""
client = discoveryengine.DocumentServiceClient()
parent = client.branch_path(
project=project_id,
location=location,
data_store=data_store_id,
branch=branch_id,
)
request = discoveryengine.ImportDocumentsRequest(
parent=parent,
inline_source=discoveryengine.ImportDocumentsRequest.InlineSource(
documents=documents,
),
)
operation = client.import_documents(request=request)
operation.result()
result = operation.metadata
return result
La función upload_documents_inline usa las siguientes variables clave:
project_id: el ID de tu proyecto de Google Cloud .location: la Google Cloud ubicación del almacén de datos.data_store_id: ID del almacén de datos.branch_id: ID de la rama del almacén de datos (normalmente, "0").documents: Lista de objetos discoveryengine.Document que se van a subir.result: valor devuelto de tipo discoveryengine.ImportDocumentsMetadata.
Validar el conector
Para validar que el conector funciona correctamente, haz una prueba para asegurarte de que los datos fluyen de la fuente a Discovery Engine.
Python
SITE = "https://altostrat.com"
PROJECT_ID = "ucs-3p-connectors-testing"
LOCATION = "global"
IDENTITY_MAPPING_STORE_ID = "your-unique-ims-id17" # A unique ID for your new store
DATA_STORE_ID = "my-acl-ds-id1"
BRANCH_ID = "0"
posts = fetch_posts(SITE)
docs = convert_posts_to_documents(posts)
print(f"Fetched {len(posts)} posts and converted to {len(docs)} documents.")
try:
# Step #1: Retrieve an existing identity mapping store or create a new identity mapping store
ims_store = get_or_create_ims_data_store(PROJECT_ID, LOCATION, IDENTITY_MAPPING_STORE_ID)
print(f"STEP #1: IMS Store Retrieval/Creation: {ims_store}")
RAW_IDENTITY_MAPPING_DATA = [
discoveryengine.IdentityMappingEntry(
external_identity="external_id_1",
user_id="testuser1@example.com",
),
discoveryengine.IdentityMappingEntry(
external_identity="external_id_2",
user_id="testuser2@example.com",
),
discoveryengine.IdentityMappingEntry(
external_identity="external_id_2",
group_id="testgroup1@example.com",
)
]
# Step #2: Load IMS Data
response = load_ims_data(ims_store, RAW_IDENTITY_MAPPING_DATA)
print(
"\nStep #2: Load Data in IMS Store successful.", response
)
# Step #3: Create Entity Data Store & Bind IMS Data Store
data_store = get_or_create_data_store(PROJECT_ID, LOCATION, "my-acl-datastore", DATA_STORE_ID, ims_store.name)
print("\nStep #3: Entity Data Store Create Result: ", data_store)
metadata = upload_documents_inline(
PROJECT_ID, LOCATION, DATA_STORE_ID, BRANCH_ID, docs
)
print(f"Uploaded {metadata.success_count} documents inline.")
except gcp_exceptions.GoogleAPICallError as e:
print(f"\n--- API Call Failed ---")
print(f"Server Error Message: {e.message}")
print(f"Status Code: {e.code}")
except Exception as e:
print(f"An error occurred: {e}")
Valida que el código del conector use las siguientes variables clave:
SITE: la URL base de la fuente de datos de terceros.PROJECT_ID: tu ID de proyecto Google Cloud .LOCATION: la Google Cloud ubicación de los recursos.IDENTITY_MAPPING_STORE_ID: ID único de tu tienda de mapeado de identidades.DATA_STORE_ID: ID único de tu almacén de datos.BRANCH_ID: ID de la rama del almacén de datos.posts: almacena las publicaciones obtenidas de la fuente de terceros.docs: almacena los documentos convertidos en formato discoveryengine.Document.ims_store: objeto discoveryengine.DataStore recuperado o creado para la asignación de identidades.RAW_IDENTITY_MAPPING_DATA: lista de objetos discoveryengine.IdentityMappingEntry.
Resultado esperado:
Shell
Fetched 20 posts and converted to 20 documents.
STEP #1: IMS Store Retrieval/Creation: "projects/ <Project Number>/locations/global/identityMappingStores/your-unique-ims-id17"
Step #2: Load Data in IMS Store successful.
Step #3: Entity Data Store Create Result: "projects/ <Project Number>/locations/global/collections/default_collection/dataStores/my-acl-ds-id1"
display_name: "my-acl-datastore"
industry_vertical: GENERIC
create_time {
seconds: 1760906997
nanos: 192641000
}
default_schema_id: "default_schema"
acl_enabled: true
identity_mapping_store: "projects/ <Project Number>/locations/global/identityMappingStores/your-unique-ims-id17".
Uploaded 20 documents inline.
También puedes ver tu almacén de datos en la consola de Google Google Cloud :
Crear un conector con la opción de Google Cloud subida de almacenamiento
Aunque la importación insertada funciona bien para el desarrollo, los conectores de producción deben usar Google Cloud Storage para mejorar la escalabilidad y habilitar el modo de conciliación completa. Este enfoque gestiona grandes conjuntos de datos de forma eficiente y admite la eliminación automática de los documentos que ya no estén presentes en la fuente de datos de terceros.
Convertir documentos a JSONL
Para preparar documentos para la importación masiva en Discovery Engine, conviértalos al formato JSON Lines.
Python
def convert_documents_to_jsonl(
documents: List[discoveryengine.Document],
) -> str:
"""Serialize Document messages to JSONL."""
return "\n".join(
discoveryengine.Document.to_json(doc, indent=None)
for doc in documents
) + "\n"
La función convert_documents_to_jsonl usa la siguiente variable:
documents: Lista de objetos discoveryengine.Document que se van a convertir.
Subir a Google Cloud Storage
Para habilitar la importación en bloque eficiente, organiza tus datos en Google Cloud Storage.
Python
def upload_jsonl_to_gcs(jsonl: str, bucket_name: str, blob_name: str) -> str:
"""Upload JSONL content to Google Cloud Storage."""
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
blob.upload_from_string(jsonl, content_type="application/json")
return f"gs://{bucket_name}/{blob_name}"
La función upload_jsonl_to_gcs usa las siguientes variables de clave:
jsonl: el contenido de la cadena con formato JSONL que se va a subir.bucket_name: el nombre del Google Cloud segmento de almacenamiento.blob_name: nombre del blob (objeto) del segmento especificado.
Importar desde Google Cloud Storage con conciliación completa
Para realizar una sincronización de datos completa con el modo de conciliación completa, usa este método. De esta forma, tu almacén de datos reflejará exactamente la fuente de datos de terceros y se eliminarán automáticamente los documentos que ya no existan.
Python
def import_documents_from_gcs(
project_id: str,
location: str,
data_store_id: str,
branch_id: str,
gcs_uri: str,
) -> discoveryengine.ImportDocumentsMetadata:
"""Bulk-import documents from Google Cloud Storage with FULL reconciliation mode."""
client = discoveryengine.DocumentServiceClient()
parent = client.branch_path(
project=project_id,
location=location,
data_store=data_store_id,
branch=branch_id,
)
gcs_source = discoveryengine.GcsSource(input_uris=[gcs_uri])
request = discoveryengine.ImportDocumentsRequest(
parent=parent,
gcs_source=gcs_source,
reconciliation_mode=
discoveryengine.ImportDocumentsRequest
.ReconciliationMode.FULL,
)
operation = client.import_documents(request=request)
operation.result()
return operation.metadata
La función import_documents_from_gcs usa las siguientes variables clave:
project_id: el ID de tu proyecto de Google Cloud .location: la Google Cloud ubicación del almacén de datos.data_store_id: ID del almacén de datos.branch_id: ID de la rama del almacén de datos (normalmente, "0").gcs_uri: el Google Cloud URI de almacenamiento que apunta al archivo JSONL.
Prueba de subida de Google Cloud almacenamiento
Para verificar el flujo de trabajo de importación basado en almacenamiento de Google Cloud , ejecuta lo siguiente:
Python
BUCKET = "your-existing-bucket"
BLOB = "path-to-any-blob/wp/posts.jsonl"
SITE = "https://altostrat.com"
PROJECT_ID = "ucs-3p-connectors-testing"
LOCATION = "global"
IDENTITY_MAPPING_STORE_ID = "your-unique-ims-id17" # A unique ID for your new store
DATA_STORE_ID = "your-data-store-id"
BRANCH_ID = "0"
jsonl_payload = convert_documents_to_jsonl(docs)
gcs_uri = upload_jsonl_to_gcs(jsonl_payload, BUCKET, BLOB)
posts = fetch_posts(SITE)
docs = convert_posts_to_documents(posts)
print(f"Fetched {len(posts)} posts and converted to {len(docs)} documents.")
print("Uploaded to:", gcs_uri)
metadata = import_documents_from_gcs(
PROJECT_ID, LOCATION, DATA_STORE_ID, BRANCH_ID, gcs_uri
)
print(f"Imported: {metadata.success_count} documents")
En las pruebas de subida de Google Cloud Storage se usan las siguientes variables clave:
BUCKET: el nombre del Google Cloud segmento de almacenamiento.BLOB: la ruta al blob dentro del contenedor.SITE: la URL base de la fuente de datos de terceros.PROJECT_ID: tu ID de proyecto Google Cloud .LOCATION: la Google Cloud ubicación de los recursos (por ejemplo, "global").IDENTITY_MAPPING_STORE_ID: ID único de tu tienda de mapeado de identidades.DATA_STORE_ID: ID único de tu almacén de datos.BRANCH_ID: el ID de la rama del almacén de datos (normalmente, "0").jsonl_payload: los documentos convertidos al formato JSONL.gcs_uri: el Google Cloud URI de almacenamiento del archivo JSONL subido.
Resultado esperado:
Shell
Fetched 20 posts and converted to 20 documents.
Uploaded to: gs://alex-de-bucket/wp/posts.jsonl
Imported: 20 documents
Gestionar permisos
Para gestionar el acceso a nivel de documento en entornos empresariales, Gemini Enterprise admite listas de control de acceso (LCAs) y la asignación de identidades, lo que ayuda a limitar el contenido que pueden ver los usuarios.
Habilitar las listas de control de acceso en el almacén de datos
Para habilitar las listas de control de acceso al crear tu almacén de datos, ejecuta lo siguiente:
Python
# get_or_create_data_store()
"data_store": discoveryengine.DataStore(
display_name=data_store_id,
industry_vertical=discoveryengine.IndustryVertical.GENERIC,
acl_enabled=True, # ADDED
)
Añadir LCAs a documentos
Para calcular e incluir AclInfo al transformar los documentos, ejecuta lo siguiente:
Python
# convert_posts_to_documents()
doc = discoveryengine.Document(
id=str(post["id"]),
json_data=json.dumps(payload),
acl_info=discoveryengine.Document.AclInfo(
readers=[{
"principals": [
{"user_id": "baklavainthebalkans@gmail.com"},
{"user_id": "cloudysanfrancisco@gmail.com"}
]
}]
),
)
Hacer público el contenido
Para que un documento sea accesible públicamente, define el campo readers de la siguiente manera:
Python
readers=[{"idp_wide": True}]
Validar LCAs
Para validar que las configuraciones de LCA funcionan correctamente, tenga en cuenta lo siguiente:
Buscar como un usuario que no tiene acceso al documento.
Inspecciona la estructura del documento subido en Cloud Storage y compárala con una referencia.
JSON
{
"id": "108",
"jsonData": "{...}",
"aclInfo": {
"readers": [
{
"principals": [
{ "userId": "baklavainthebalkans@gmail.com" },
{ "userId": "cloudysanfrancisco@gmail.com" }
],
"idpWide": false
}
]
}
}
Usar el mapeado de identidades
Usa el mapeado de identidades en los siguientes casos:
Tu fuente de datos de terceros usa identidades ajenas a Google
Quieres hacer referencia a grupos personalizados (por ejemplo, wp-admins) en lugar de a usuarios concretos
Tu API solo devuelve nombres de grupos
Necesitas agrupar usuarios manualmente para conseguir escalabilidad o coherencia
Para hacer la asignación de identidades, sigue estos pasos:
- Crea y vincula el almacén de datos de identidad.
Importa identidades externas (por ejemplo, external_group:wp-admins). No incluyas el prefijo external_group: al importar. Por ejemplo:
JSON
{ "externalIdentity": "wp-admins", "userId": "user@example.com" }En la información de la LCA de tu documento, define el ID de entidad externa en
principal identifier. Cuando hagas referencia a grupos personalizados, usa el prefijoexternal_group:en el campogroupId.El prefijo
external_group:es obligatorio para los IDs de grupo en la información de la lista de control de acceso del documento durante la importación, pero no se usa al importar identidades al almacén de asignaciones. Ejemplo de documento con mapeado de identidades:JSON
{ "id": "108", "aclInfo": { "readers": [ { "principals": [ { "userId": "cloudysanfrancisco@gmail.com" }, { "groupId": "external_group:wp-admins" } ] } ] }, "structData": { "id": 108, "date": "2025-04-24T18:16:04", ... } }
Siguientes pasos
- Para proporcionar una interfaz de usuario para consultar tus datos, crea una aplicación en Gemini Enterprise y conéctala al almacén de datos Conector personalizado.