En esta página, se describe cómo crear un conector personalizado.
Antes de comenzar
Antes de comenzar, asegúrate de tener lo siguiente:
Verifica si la facturación está habilitada en tu proyecto de Google Cloud .
Instala e inicializa la Google Cloud CLI. Asegúrate de que esté autenticado para tu proyecto.
Obtén acceso de administrador de Discovery Engine para tu proyecto Google Cloud .
Obtén credenciales de acceso para tu fuente de datos externa (como claves de API o autenticación de bases de datos).
Crea un plan claro de asignación de datos. Esto debe incluir qué campos indexar y cómo representar el control de acceso, incluidas las identidades de terceros.
Crea un conector básico
En esta sección, se muestra cómo crear un conector personalizado en el idioma que elijas. Los principios y patrones que se muestran aquí se aplican a cualquier sistema externo. Solo tienes que adaptar las llamadas a la API y las transformaciones de datos para tu fuente específica en el idioma que elijas para crear un conector básico.
Cómo obtener datos
Para comenzar, recupera datos de tu fuente de datos externa. En este ejemplo, demostramos cómo recuperar publicaciones con paginación. Para los entornos de producción, recomendamos usar un enfoque de transmisión para los conjuntos de datos grandes. Esto evita problemas de memoria que pueden ocurrir cuando se cargan todos los datos a la vez.
Python
def fetch_posts(base_url: str, per_page: int = 15) -> List[dict]:
#Fetch all posts from the given site.#
url = base_url.rstrip("/") + "/wp-json/wp/v2/posts"
posts: List[dict] = []
page = 1
while True:
resp = requests.get(
url,
params={"page": page, "per_page": per_page},
)
resp.raise_for_status()
batch = resp.json()
posts.extend(batch)
if len(batch) < per_page:
break
page += 1
return posts
Transforma los datos
Para convertir tus datos de origen al formato de documento de Discovery Engine, estructúralos como se muestra en el siguiente ejemplo de carga útil. Puedes incluir tantos pares clave-valor como necesites. Por ejemplo, puedes incluir el contenido completo para realizar una búsqueda exhaustiva. También puedes incluir campos estructurados para una búsqueda facetada o una combinación de ambos.
Python
def convert_posts_to_documents(posts: List[dict]) -> List[discoveryengine.Document]:
# Convert WP posts into Discovery Engine Document messages.
docs: List[discoveryengine.Document] = []
for post in posts:
payload = {
"title": post.get("title", {}).get("rendered"),
"body": post.get("content", {}).get("rendered"),
"url": post.get("link"),
"author": post.get("author"),
"categories": post.get("categories"),
"tags": post.get("tags"),
"date": post.get("date"),
}
doc = discoveryengine.Document(
id=str(post["id"]),
json_data=json.dumps(payload),
)
docs.append(doc)
return docs
Recupera o crea un almacén de identidades
Para administrar identidades y grupos de usuarios para el control de acceso, debes recuperar o crear un almacén de identidades. Esta función obtiene un almacén de identidades existente por su ID, proyecto y ubicación. Si no existe el almacén de identidades, crea y devuelve uno nuevo y vacío.
Python
def get_or_create_ims_data_store(
project_id: str,
location: str,
identity_mapping_store_id: str,
) -> discoveryengine.DataStore:
"""Get or create a DataStore."""
# Initialize the client
client_ims = discoveryengine.IdentityMappingStoreServiceClient()
# Construct the parent resource name
parent_ims = client_ims.location_path(project=project_id, location=location)
try:
# Create the request object
name = f"projects/{project_id}/locations/{location}/identityMappingStores/{identity_mapping_store_id}"
request = discoveryengine.GetIdentityMappingStoreRequest(
name=name,
)
return client_ims.get_identity_mapping_store(request=request)
except:
# Create the IdentityMappingStore object (it can be empty for basic creation)
identity_mapping_store = discoveryengine.IdentityMappingStore()
# Create the request object
request = discoveryengine.CreateIdentityMappingStoreRequest(
parent=parent_ims,
identity_mapping_store=identity_mapping_store,
identity_mapping_store_id=identity_mapping_store_id,
)
return client_ims.create_identity_mapping_store(request=request)
La función get_or_create_ims_data_store usa las siguientes variables clave:
project_id: Es el ID de tu proyecto de Google Cloud .location: Es la Google Cloud ubicación del almacén de asignaciones de identidades.identity_mapping_store_id: Es un identificador único para el almacén de identidades.client_ims: Es una instancia de discoveryengine.IdentityMappingStoreServiceClient que se usa para interactuar con la API de Identity Store.parent_ims: Es el nombre del recurso de la ubicación principal, que se construye con client_ims.location_path.name: Es el nombre completo del recurso del almacén de asignación de identidades, que se usa para GetIdentityMappingStoreRequest.
Ingiere la asignación de identidad en el almacén de identidades
Usa esta función para cargar entradas de asignación de identidades en el almacén de identidades especificado. Toma una lista de entradas de asignación de identidades y, luego, inicia una operación de importación intercalada. Esto es fundamental para establecer las relaciones de usuarios, grupos e identidades externas necesarias para el control de acceso y la personalización.
Python
def load_ims_data(
ims_store: discoveryengine.DataStore,
id_mapping_data: list[discoveryengine.IdentityMappingEntry],
) -> discoveryengine.DataStore:
"""Get the IMS data store."""
# Initialize the client
client_ims = discoveryengine.IdentityMappingStoreServiceClient()
# Create the InlineSource object
inline_source = discoveryengine.ImportIdentityMappingsRequest.InlineSource(
identity_mapping_entries=id_mapping_data
)
# Create the main request object
request_ims = discoveryengine.ImportIdentityMappingsRequest(
identity_mapping_store=ims_store.name,
inline_source=inline_source,
)
try:
# Create the InlineSource object, which holds your list of entries
operation = client_ims.import_identity_mappings(
request=request_ims,
)
result = operation.result()
return result
except Exception as e:
print(f"IMS Load Error: {e}")
result = operation.result()
return result
La función load_ims_data usa las siguientes variables clave:
ims_store: Es el objeto discoveryengine.DataStore que representa el almacén de asignación de identidad en el que se cargarán los datos.id_mapping_data: Es una lista de objetos discoveryengine.IdentityMappingEntry, cada uno de los cuales contiene una identidad externa y su ID de usuario o grupo correspondiente.result: Es el valor de devolución de tipo discoveryengine.DataStore.
Crear almacenamiento de datos
Para usar un conector personalizado, debes inicializar un almacén de datos para tu contenido. Usa default_collection para los conectores personalizados. El parámetro IndustryVertical personaliza el comportamiento del almacén de datos para casos de uso específicos. GENERIC es adecuado para la mayoría de las situaciones. Sin embargo, puedes elegir un valor diferente para una industria en particular, como MEDIA o HEALTHCARE_FHIR. Configura el nombre visible y otras propiedades para que se alineen con las convenciones de nombres y los requisitos de tu proyecto.
Python
def get_or_create_data_store(
project_id: str,
location: str,
display_name: str,
data_store_id: str,
identity_mapping_store: str,
) -> discoveryengine.DataStore:
"""Get or create a DataStore."""
client = discoveryengine.DataStoreServiceClient()
ds_name = client.data_store_path(project_id, location, data_store_id)
try:
result = client.get_data_store(request={"name": ds_name})
return result
except:
parent = client.collection_path(project_id, location, "default_collection")
operation = client.create_data_store(
request={
"parent": parent,
"data_store": discoveryengine.DataStore(
display_name=display_name,
acl_enabled=True,
industry_vertical=discoveryengine.IndustryVertical.GENERIC,
identity_mapping_store=identity_mapping_store,
),
"data_store_id": data_store_id,
}
)
result = operation.result()
return result
La función get_or_create_data_store usa las siguientes variables clave:
project_id: Es el ID de tu proyecto de Google Cloud .location: Es la Google Cloud ubicación del almacén de datos.display_name: Es el nombre visible del almacén de datos en lenguaje natural.data_store_id: Es un identificador único del almacén de datos.identity_mapping_store: Es el nombre del recurso del almacén de asignación de identidades que se vinculará.result: Es el valor de devolución del tipo discoveryengine.DataStore.
Cómo subir documentos intercalados
Para enviar documentos directamente a Discovery Engine, usa la carga intercalada. Este método usa el modo de conciliación incremental de forma predeterminada y no admite el modo de conciliación completa. En el modo incremental, se agregan documentos nuevos y se actualizan los existentes, pero no se borran los documentos que ya no están en la fuente. El modo de conciliación completa sincroniza el almacén de datos con tus datos de origen, lo que incluye borrar los documentos que ya no están presentes en el origen.
La reconciliación incremental es ideal para sistemas como un CRM que manejan cambios frecuentes y pequeños en los datos. En lugar de sincronizar toda la base de datos, solo se envían los cambios específicos, lo que hace que el proceso sea más rápido y eficiente. Aun así, se puede realizar una sincronización completa periódicamente para mantener la integridad general de los datos.
Python
def upload_documents_inline(
project_id: str,
location: str,
data_store_id: str,
branch_id: str,
documents: List[discoveryengine.Document],
) -> discoveryengine.ImportDocumentsMetadata:
"""Inline import of Document messages."""
client = discoveryengine.DocumentServiceClient()
parent = client.branch_path(
project=project_id,
location=location,
data_store=data_store_id,
branch=branch_id,
)
request = discoveryengine.ImportDocumentsRequest(
parent=parent,
inline_source=discoveryengine.ImportDocumentsRequest.InlineSource(
documents=documents,
),
)
operation = client.import_documents(request=request)
operation.result()
result = operation.metadata
return result
La función upload_documents_inline usa las siguientes variables clave:
project_id: Es el ID de tu proyecto de Google Cloud .location: Es la Google Cloud ubicación del almacén de datos.data_store_id: ID del almacén de datos.branch_id: Es el ID de la rama dentro del almacén de datos (por lo general, “0”).documents: Es una lista de objetos discoveryengine.Document que se subirán.result: Es el valor de devolución del tipo discoveryengine.ImportDocumentsMetadata.
Valida tu conector
Para validar que tu conector funcione según lo previsto, realiza una prueba para asegurarte de que los datos fluyan correctamente desde la fuente a Discovery Engine.
Python
SITE = "https://altostrat.com"
PROJECT_ID = "ucs-3p-connectors-testing"
LOCATION = "global"
IDENTITY_MAPPING_STORE_ID = "your-unique-ims-id17" # A unique ID for your new store
DATA_STORE_ID = "my-acl-ds-id1"
BRANCH_ID = "0"
posts = fetch_posts(SITE)
docs = convert_posts_to_documents(posts)
print(f"Fetched {len(posts)} posts and converted to {len(docs)} documents.")
try:
# Step #1: Retrieve an existing identity mapping store or create a new identity mapping store
ims_store = get_or_create_ims_data_store(PROJECT_ID, LOCATION, IDENTITY_MAPPING_STORE_ID)
print(f"STEP #1: IMS Store Retrieval/Creation: {ims_store}")
RAW_IDENTITY_MAPPING_DATA = [
discoveryengine.IdentityMappingEntry(
external_identity="external_id_1",
user_id="testuser1@example.com",
),
discoveryengine.IdentityMappingEntry(
external_identity="external_id_2",
user_id="testuser2@example.com",
),
discoveryengine.IdentityMappingEntry(
external_identity="external_id_2",
group_id="testgroup1@example.com",
)
]
# Step #2: Load IMS Data
response = load_ims_data(ims_store, RAW_IDENTITY_MAPPING_DATA)
print(
"\nStep #2: Load Data in IMS Store successful.", response
)
# Step #3: Create Entity Data Store & Bind IMS Data Store
data_store = get_or_create_data_store(PROJECT_ID, LOCATION, "my-acl-datastore", DATA_STORE_ID, ims_store.name)
print("\nStep #3: Entity Data Store Create Result: ", data_store)
metadata = upload_documents_inline(
PROJECT_ID, LOCATION, DATA_STORE_ID, BRANCH_ID, docs
)
print(f"Uploaded {metadata.success_count} documents inline.")
except gcp_exceptions.GoogleAPICallError as e:
print(f"\n--- API Call Failed ---")
print(f"Server Error Message: {e.message}")
print(f"Status Code: {e.code}")
except Exception as e:
print(f"An error occurred: {e}")
Valida que el código del conector use las siguientes variables clave:
SITE: Es la URL base de la fuente de datos externa.PROJECT_ID: Es el ID del proyecto de Google Cloud .LOCATION: Es la ubicación Google Cloud de los recursos.IDENTITY_MAPPING_STORE_ID: Es un ID único para tu almacén de asignación de identidades.DATA_STORE_ID: Es un ID único para tu almacén de datos.BRANCH_ID: Es el ID de la rama dentro del almacén de datos.posts: Almacena las publicaciones recuperadas de la fuente externa.docs: Almacena los documentos convertidos en formato discoveryengine.Document.ims_store: Es el objeto discoveryengine.DataStore recuperado o creado para la asignación de identidad.RAW_IDENTITY_MAPPING_DATA: Es una lista de objetos discoveryengine.IdentityMappingEntry.
Resultado esperado:
Shell
Fetched 20 posts and converted to 20 documents.
STEP #1: IMS Store Retrieval/Creation: "projects/ <Project Number>/locations/global/identityMappingStores/your-unique-ims-id17"
Step #2: Load Data in IMS Store successful.
Step #3: Entity Data Store Create Result: "projects/ <Project Number>/locations/global/collections/default_collection/dataStores/my-acl-ds-id1"
display_name: "my-acl-datastore"
industry_vertical: GENERIC
create_time {
seconds: 1760906997
nanos: 192641000
}
default_schema_id: "default_schema"
acl_enabled: true
identity_mapping_store: "projects/ <Project Number>/locations/global/identityMappingStores/your-unique-ims-id17".
Uploaded 20 documents inline.
También puedes ver tu almacén de datos en la consola de Google Google Cloud en este punto:
Crea un conector con Google Cloud Carga de almacenamiento
Si bien la importación intercalada funciona bien para el desarrollo, los conectores de producción deben usar Google Cloud Storage para una mejor escalabilidad y para habilitar el modo de conciliación completa. Este enfoque maneja grandes conjuntos de datos de manera eficiente y admite el borrado automático de documentos que ya no están presentes en la fuente de datos externa.
Cómo convertir documentos a JSONL
Para preparar documentos para la importación masiva en Discovery Engine, conviértelos al formato de líneas JSON.
Python
def convert_documents_to_jsonl(
documents: List[discoveryengine.Document],
) -> str:
"""Serialize Document messages to JSONL."""
return "\n".join(
discoveryengine.Document.to_json(doc, indent=None)
for doc in documents
) + "\n"
La función convert_documents_to_jsonl usa la siguiente variable:
documents: Es una lista de objetos discoveryengine.Document que se convertirán.
Subir a Google Cloud Storage
Para habilitar la importación masiva eficiente, organiza tus datos en Google Cloud Storage.
Python
def upload_jsonl_to_gcs(jsonl: str, bucket_name: str, blob_name: str) -> str:
"""Upload JSONL content to Google Cloud Storage."""
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
blob.upload_from_string(jsonl, content_type="application/json")
return f"gs://{bucket_name}/{blob_name}"
La función upload_jsonl_to_gcs usa las siguientes variables clave:
jsonl: Es el contenido de la cadena con formato JSONL que se subirá.bucket_name: Es el nombre del bucket de Google Cloud Storage.blob_name: Es el nombre del blob (objeto) dentro del bucket especificado.
Importar desde Google Cloud Storage con conciliación completa
Usa este método para realizar una sincronización de datos completa con el modo de conciliación total. Esto garantiza que tu almacén de datos refleje exactamente la fuente de datos externa, ya que se quitarán automáticamente los documentos que ya no existan.
Python
def import_documents_from_gcs(
project_id: str,
location: str,
data_store_id: str,
branch_id: str,
gcs_uri: str,
) -> discoveryengine.ImportDocumentsMetadata:
"""Bulk-import documents from Google Cloud Storage with FULL reconciliation mode."""
client = discoveryengine.DocumentServiceClient()
parent = client.branch_path(
project=project_id,
location=location,
data_store=data_store_id,
branch=branch_id,
)
gcs_source = discoveryengine.GcsSource(input_uris=[gcs_uri])
request = discoveryengine.ImportDocumentsRequest(
parent=parent,
gcs_source=gcs_source,
reconciliation_mode=
discoveryengine.ImportDocumentsRequest
.ReconciliationMode.FULL,
)
operation = client.import_documents(request=request)
operation.result()
return operation.metadata
La función import_documents_from_gcs usa las siguientes variables clave:
project_id: Es el ID de tu proyecto de Google Cloud .location: Es la Google Cloud ubicación del almacén de datos.data_store_id: ID del almacén de datos.branch_id: Es el ID de la rama dentro del almacén de datos (por lo general, “0”).gcs_uri: Es el URI de Google Cloud Storage que apunta al archivo JSONL.
Prueba de Google Cloud carga de almacenamiento
Para verificar el flujo de trabajo de importación basado en Google Cloud almacenamiento, ejecuta lo siguiente:
Python
BUCKET = "your-existing-bucket"
BLOB = "path-to-any-blob/wp/posts.jsonl"
SITE = "https://altostrat.com"
PROJECT_ID = "ucs-3p-connectors-testing"
LOCATION = "global"
IDENTITY_MAPPING_STORE_ID = "your-unique-ims-id17" # A unique ID for your new store
DATA_STORE_ID = "your-data-store-id"
BRANCH_ID = "0"
jsonl_payload = convert_documents_to_jsonl(docs)
gcs_uri = upload_jsonl_to_gcs(jsonl_payload, BUCKET, BLOB)
posts = fetch_posts(SITE)
docs = convert_posts_to_documents(posts)
print(f"Fetched {len(posts)} posts and converted to {len(docs)} documents.")
print("Uploaded to:", gcs_uri)
metadata = import_documents_from_gcs(
PROJECT_ID, LOCATION, DATA_STORE_ID, BRANCH_ID, gcs_uri
)
print(f"Imported: {metadata.success_count} documents")
Las siguientes variables clave se usan para probar la carga de Google Cloud Storage:
BUCKET: Es el nombre del bucket de Google Cloud Storage.BLOB: Es la ruta de acceso al blob dentro del bucket.SITE: Es la URL base de la fuente de datos externa.PROJECT_ID: Es el ID del proyecto de Google Cloud .LOCATION: Es la ubicación Google Cloud de los recursos (p.ej., "global").IDENTITY_MAPPING_STORE_ID: Es un ID único para tu almacén de asignación de identidades.DATA_STORE_ID: Es un ID único para tu almacén de datos.BRANCH_ID: Es el ID de la rama dentro del almacén de datos (por lo general, “0”).jsonl_payload: Son los documentos convertidos al formato JSONL.gcs_uri: Es el URI de Google Cloud Storage del archivo JSONL subido.
Resultado esperado:
Shell
Fetched 20 posts and converted to 20 documents.
Uploaded to: gs://alex-de-bucket/wp/posts.jsonl
Imported: 20 documents
Administrar permisos
Para administrar el acceso a nivel de documento en entornos empresariales, Gemini Enterprise admite listas de control de acceso (LCA) y asignación de identidad, lo que ayuda a limitar el contenido que pueden ver los usuarios.
Habilita las ACL en el almacén de datos
Para habilitar las ACL cuando crees tu almacén de datos, ejecuta el siguiente comando:
Python
# get_or_create_data_store()
"data_store": discoveryengine.DataStore(
display_name=data_store_id,
industry_vertical=discoveryengine.IndustryVertical.GENERIC,
acl_enabled=True, # ADDED
)
Agrega LCA a los documentos
Para calcular e incluir AclInfo cuando transformes los documentos, ejecuta lo siguiente:
Python
# convert_posts_to_documents()
doc = discoveryengine.Document(
id=str(post["id"]),
json_data=json.dumps(payload),
acl_info=discoveryengine.Document.AclInfo(
readers=[{
"principals": [
{"user_id": "baklavainthebalkans@gmail.com"},
{"user_id": "cloudysanfrancisco@gmail.com"}
]
}]
),
)
Cómo hacer público el contenido
Para que un documento sea de acceso público, configura el campo readers de la siguiente manera:
Python
readers=[{"idp_wide": True}]
Valida las LCA
Para validar que la configuración de las ACL funcione según lo previsto, ten en cuenta lo siguiente:
Realiza la búsqueda como un usuario que no tiene acceso al documento.
Inspecciona la estructura del documento subido en Cloud Storage y compárala con una referencia.
JSON
{
"id": "108",
"jsonData": "{...}",
"aclInfo": {
"readers": [
{
"principals": [
{ "userId": "baklavainthebalkans@gmail.com" },
{ "userId": "cloudysanfrancisco@gmail.com" }
],
"idpWide": false
}
]
}
}
Usa la asignación de identidad
Usa la asignación de identidades en los siguientes casos:
Tu fuente de datos de terceros usa identidades ajenas a Google
Quieres hacer referencia a grupos personalizados (p. ej., administradores de WP) en lugar de usuarios individuales.
Tu API solo devuelve nombres de grupos
Debes agrupar a los usuarios de forma manual para lograr coherencia o escala.
Para realizar la asignación de identidad, sigue estos pasos:
- Crea y vincula el almacén de datos de identidad.
Importa identidades externas (por ejemplo, external_group:wp-admins). No incluyas el prefijo external_group: cuando importes, por ejemplo:
JSON
{ "externalIdentity": "wp-admins", "userId": "user@example.com" }En la información de LCA de tu documento, define el ID de la entidad externa en
principal identifier. Cuando hagas referencia a grupos personalizados, usa el prefijoexternal_group:en el campogroupId.El prefijo
external_group:es obligatorio para los IDs de grupo dentro de la información de la ACL del documento durante la importación, pero no se usa cuando se importan identidades al almacén de asignaciones. Ejemplo de documento con asignación de identidad:JSON
{ "id": "108", "aclInfo": { "readers": [ { "principals": [ { "userId": "cloudysanfrancisco@gmail.com" }, { "groupId": "external_group:wp-admins" } ] } ] }, "structData": { "id": 108, "date": "2025-04-24T18:16:04", ... } }
¿Qué sigue?
- Para proporcionar una interfaz de usuario para consultar tus datos, crea una app en Gemini Enterprise y conéctala al almacén de datos existente Custom connector.