Questa pagina descrive come creare un connettore personalizzato.
Prima di iniziare
Prima di iniziare, assicurati di avere quanto segue:
Controlla se la fatturazione è abilitata per il tuo progetto Google Cloud .
Installa e inizializza l'interfaccia a riga di comando Google Cloud . Assicurati che sia autenticato per il tuo progetto.
Ottieni l'accesso amministratore di Discovery Engine per il tuo progetto Google Cloud .
Ottieni le credenziali di accesso per l'origine dati di terze parti (ad esempio chiavi API o autenticazione del database).
Crea un piano di mappatura dei dati chiaro. Devono essere inclusi i campi da indicizzare e come rappresentare il controllo dell'accesso, comprese le identità di terze parti.
Crea connettore di base
Questa sezione mostra come creare un connettore personalizzato nella lingua scelta. I principi e i pattern mostrati qui si applicano a qualsiasi sistema esterno. Per creare un connettore di base, è sufficiente adattare le chiamate API e le trasformazioni dei dati per l'origine specifica nella lingua scelta.
Recupera dati
Per iniziare, recupera i dati dall'origine dati di terze parti. In questo esempio, mostriamo come recuperare i post utilizzando la paginazione. Per gli ambienti di produzione, consigliamo di utilizzare un approccio di streaming per i set di dati di grandi dimensioni. In questo modo si evitano problemi di memoria che possono verificarsi durante il caricamento di tutti i dati contemporaneamente.
Riposo
def fetch_posts(base_url: str, per_page: int = 15) -> List[dict]:
#Fetch all posts from the given site.#
url = base_url.rstrip("/") + "/wp-json/wp/v2/posts"
posts: List[dict] = []
page = 1
while True:
resp = requests.get(
url,
params={"page": page, "per_page": per_page},
)
resp.raise_for_status()
batch = resp.json()
posts.extend(batch)
if len(batch) < per_page:
break
page += 1
return posts
Trasformare i dati
Per convertire i dati di origine nel formato del documento Discovery Engine, strutturali come mostrato nel seguente payload di esempio. Puoi includere tutte le coppie chiave-valore che ti servono. Ad esempio, puoi includere l'intero contenuto per una ricerca completa. In alternativa, puoi includere campi strutturati per una ricerca per sfaccettature o una combinazione di entrambi.
Riposo
def convert_posts_to_documents(posts: List[dict]) -> List[discoveryengine.Document]:
# Convert WP posts into Discovery Engine Document messages.
docs: List[discoveryengine.Document] = []
for post in posts:
payload = {
"title": post.get("title", {}).get("rendered"),
"body": post.get("content", {}).get("rendered"),
"url": post.get("link"),
"author": post.get("author"),
"categories": post.get("categories"),
"tags": post.get("tags"),
"date": post.get("date"),
}
doc = discoveryengine.Document(
id=str(post["id"]),
json_data=json.dumps(payload),
)
docs.append(doc)
return docs
Recuperare o creare un archivio delle identità
Per gestire le identità e i gruppi di utenti per il controllo dell'accesso, devi recuperare o creare un archivio delle identità. Questa funzione recupera un archivio delle identità esistente in base a ID, progetto e località. Se l'archivio delle identità non esiste, ne crea uno nuovo e vuoto e lo restituisce.
Riposo
def get_or_create_ims_data_store(
project_id: str,
location: str,
identity_mapping_store_id: str,
) -> discoveryengine.DataStore:
"""Get or create a DataStore."""
# Initialize the client
client_ims = discoveryengine.IdentityMappingStoreServiceClient()
# Construct the parent resource name
parent_ims = client_ims.location_path(project=project_id, location=location)
try:
# Create the request object
name = f"projects/{project_id}/locations/{location}/identityMappingStores/{identity_mapping_store_id}"
request = discoveryengine.GetIdentityMappingStoreRequest(
name=name,
)
return client_ims.get_identity_mapping_store(request=request)
except:
# Create the IdentityMappingStore object (it can be empty for basic creation)
identity_mapping_store = discoveryengine.IdentityMappingStore()
# Create the request object
request = discoveryengine.CreateIdentityMappingStoreRequest(
parent=parent_ims,
identity_mapping_store=identity_mapping_store,
identity_mapping_store_id=identity_mapping_store_id,
)
return client_ims.create_identity_mapping_store(request=request)
La funzione get_or_create_ims_data_store utilizza le seguenti variabili chiave:
project_id: l'ID del tuo Google Cloud progetto.location: la Google Cloud posizione dell'archivio di mappatura delle identità.identity_mapping_store_id: un identificatore univoco per l'identity store.client_ims: un'istanza di discoveryengine.IdentityMappingStoreServiceClient utilizzata per interagire con l'API Identity Store.parent_ims: il nome della risorsa della località principale, creato utilizzando client_ims.location_path.name: Il nome completo della risorsa dell'archivio di mapping delle identità, utilizzato per GetIdentityMappingStoreRequest.
Importare la mappatura delle identità nell'archivio delle identità
Utilizza questa funzione per caricare le voci di mappatura delle identità nell'identity store specificato. Prende un elenco di voci di mappatura delle identità e avvia un'operazione di importazione in linea. Questo è fondamentale per stabilire le relazioni tra utenti, gruppi e identità esterne necessarie per il controllo dell'accesso e la personalizzazione.
Riposo
def load_ims_data(
ims_store: discoveryengine.DataStore,
id_mapping_data: list[discoveryengine.IdentityMappingEntry],
) -> discoveryengine.DataStore:
"""Get the IMS data store."""
# Initialize the client
client_ims = discoveryengine.IdentityMappingStoreServiceClient()
# Create the InlineSource object
inline_source = discoveryengine.ImportIdentityMappingsRequest.InlineSource(
identity_mapping_entries=id_mapping_data
)
# Create the main request object
request_ims = discoveryengine.ImportIdentityMappingsRequest(
identity_mapping_store=ims_store.name,
inline_source=inline_source,
)
try:
# Create the InlineSource object, which holds your list of entries
operation = client_ims.import_identity_mappings(
request=request_ims,
)
result = operation.result()
return result
except Exception as e:
print(f"IMS Load Error: {e}")
result = operation.result()
return result
La funzione load_ims_data utilizza le seguenti variabili chiave:
ims_store: l'oggetto discoveryengine.DataStore che rappresenta l'archivio di mapping delle identità in cui verranno caricati i dati.id_mapping_data: un elenco di oggetti discoveryengine.IdentityMappingEntry, ognuno dei quali contiene un'identità esterna e il relativo ID utente o gruppo.result: Valore restituito di tipo discoveryengine.DataStore.
Crea datastore
Per utilizzare un connettore personalizzato, devi inizializzare un datastore per i tuoi contenuti. Utilizza default_collection per i connettori personalizzati. Il parametro IndustryVertical personalizza il comportamento dell'archivio dati per casi d'uso specifici. GENERIC è adatto alla maggior parte degli scenari. Tuttavia, puoi scegliere un valore diverso per un settore specifico, ad esempio MEDIA o HEALTHCARE_FHIR. Configura il nome visualizzato e altre proprietà in modo che siano in linea con le convenzioni e i requisiti di denominazione del progetto.
Riposo
def get_or_create_data_store(
project_id: str,
location: str,
display_name: str,
data_store_id: str,
identity_mapping_store: str,
) -> discoveryengine.DataStore:
"""Get or create a DataStore."""
client = discoveryengine.DataStoreServiceClient()
ds_name = client.data_store_path(project_id, location, data_store_id)
try:
result = client.get_data_store(request={"name": ds_name})
return result
except:
parent = client.collection_path(project_id, location, "default_collection")
operation = client.create_data_store(
request={
"parent": parent,
"data_store": discoveryengine.DataStore(
display_name=display_name,
acl_enabled=True,
industry_vertical=discoveryengine.IndustryVertical.GENERIC,
identity_mapping_store=identity_mapping_store,
),
"data_store_id": data_store_id,
}
)
result = operation.result()
return result
La funzione get_or_create_data_store utilizza le seguenti variabili chiave:
project_id: l'ID del tuo Google Cloud progetto.location: la posizione Google Cloud dell'archivio dati.display_name: Il nome visualizzato leggibile da una persona per il datastore.data_store_id: un identificatore univoco per il datastore.identity_mapping_store: Il nome della risorsa dell'archivio di mapping delle identità da associare.result: Valore restituito di tipo discoveryengine.DataStore.
Caricare documenti incorporati
Per inviare direttamente i documenti a Discovery Engine, utilizza il caricamento incorporato. Questo metodo utilizza la modalità di riconciliazione incrementale per impostazione predefinita e non supporta la modalità di riconciliazione completa. In modalità incrementale, vengono aggiunti nuovi documenti e quelli esistenti vengono aggiornati, ma i documenti non più presenti nell'origine non vengono eliminati. La modalità di riconciliazione completa sincronizza il datastore con i dati di origine, eliminando anche i documenti non più presenti nell'origine.
La riconciliazione incrementale è ideale per sistemi come un CRM che gestiscono modifiche frequenti e di piccole dimensioni ai dati. Invece di sincronizzare l'intero database, invia solo modifiche specifiche, rendendo il processo più rapido ed efficiente.
Come best practice, esegui una sincronizzazione completa iniziale, seguita da sincronizzazioni incrementali più frequenti.
Riposo
def upload_documents_inline(
project_id: str,
location: str,
data_store_id: str,
branch_id: str,
documents: List[discoveryengine.Document],
) -> discoveryengine.ImportDocumentsMetadata:
"""Inline import of Document messages."""
client = discoveryengine.DocumentServiceClient()
parent = client.branch_path(
project=project_id,
location=location,
data_store=data_store_id,
branch=branch_id,
)
request = discoveryengine.ImportDocumentsRequest(
parent=parent,
inline_source=discoveryengine.ImportDocumentsRequest.InlineSource(
documents=documents,
),
)
operation = client.import_documents(request=request)
operation.result()
result = operation.metadata
return result
La funzione upload_documents_inline utilizza le seguenti variabili chiave:
project_id: l'ID del tuo Google Cloud progetto.location: la posizione Google Cloud dell'archivio dati.data_store_id: l'ID del datastore.branch_id: l'ID del ramo all'interno del datastore (in genere "0").documents: Un elenco di oggetti discoveryengine.Document da caricare.result: Valore restituito di tipo discoveryengine.ImportDocumentsMetadata.
Il campo uri all'interno dell'oggetto discoveryengine.Document viene utilizzato per indicare la fonte dei contenuti importati, come byte non elaborati o un URI in Google Cloud Storage. È diverso dall'URI dei contenuti di origine di terze parti. L'URI dei contenuti dell'origine di terze parti deve essere definito come campo all'interno del payload json_data del documento. Ad esempio, nella funzione convert_posts_to_documents, il campo url nel payload svolge questa funzione.
Convalida il connettore
Per verificare che il connettore funzioni come previsto, esegui un test per assicurarti che il flusso di dati dall'origine a Discovery Engine sia corretto.
Riposo
SITE = "https://altostrat.com"
PROJECT_ID = "ucs-3p-connectors-testing"
LOCATION = "global"
IDENTITY_MAPPING_STORE_ID = "your-unique-ims-id17" # A unique ID for your new store
DATA_STORE_ID = "my-acl-ds-id1"
BRANCH_ID = "0"
posts = fetch_posts(SITE)
docs = convert_posts_to_documents(posts)
print(f"Fetched {len(posts)} posts and converted to {len(docs)} documents.")
try:
# Step #1: Retrieve an existing identity mapping store or create a new identity mapping store
ims_store = get_or_create_ims_data_store(PROJECT_ID, LOCATION, IDENTITY_MAPPING_STORE_ID)
print(f"STEP #1: IMS Store Retrieval/Creation: {ims_store}")
RAW_IDENTITY_MAPPING_DATA = [
discoveryengine.IdentityMappingEntry(
external_identity="external_id_1",
user_id="testuser1@example.com",
),
discoveryengine.IdentityMappingEntry(
external_identity="external_id_2",
user_id="testuser2@example.com",
),
discoveryengine.IdentityMappingEntry(
external_identity="external_id_2",
group_id="testgroup1@example.com",
)
]
# Step #2: Load IMS Data
response = load_ims_data(ims_store, RAW_IDENTITY_MAPPING_DATA)
print(
"\nStep #2: Load Data in IMS Store successful.", response
)
# Step #3: Create Entity Data Store & Bind IMS Data Store
data_store = get_or_create_data_store(PROJECT_ID, LOCATION, "my-acl-datastore", DATA_STORE_ID, ims_store.name)
print("\nStep #3: Entity Data Store Create Result: ", data_store)
metadata = upload_documents_inline(
PROJECT_ID, LOCATION, DATA_STORE_ID, BRANCH_ID, docs
)
print(f"Uploaded {metadata.success_count} documents inline.")
except gcp_exceptions.GoogleAPICallError as e:
print(f"\n--- API Call Failed ---")
print(f"Server Error Message: {e.message}")
print(f"Status Code: {e.code}")
except Exception as e:
print(f"An error occurred: {e}")
Convalida che il codice del connettore utilizzi le seguenti variabili chiave:
SITE: l'URL di base dell'origine dati di terze parti.PROJECT_ID: l'ID progetto Google Cloud .LOCATION: la Google Cloud posizione delle risorse.IDENTITY_MAPPING_STORE_ID: un ID univoco per l'archivio di mapping delle identità.DATA_STORE_ID: un ID univoco per il tuo datastore.BRANCH_ID: l'ID del ramo all'interno del datastore.posts: memorizza i post recuperati dall'origine di terze parti.docs: Memorizza i documenti convertiti nel formato discoveryengine.Document.ims_store: l'oggetto discoveryengine.DataStore recuperato o creato per la mappatura delle identità.RAW_IDENTITY_MAPPING_DATA: un elenco di oggetti discoveryengine.IdentityMappingEntry.
Output previsto:
Conchiglia
Fetched 20 posts and converted to 20 documents.
STEP #1: IMS Store Retrieval/Creation: "projects/ <Project Number>/locations/global/identityMappingStores/your-unique-ims-id17"
Step #2: Load Data in IMS Store successful.
Step #3: Entity Data Store Create Result: "projects/ <Project Number>/locations/global/collections/default_collection/dataStores/my-acl-ds-id1"
display_name: "my-acl-datastore"
industry_vertical: GENERIC
create_time {
seconds: 1760906997
nanos: 192641000
}
default_schema_id: "default_schema"
acl_enabled: true
identity_mapping_store: "projects/ <Project Number>/locations/global/identityMappingStores/your-unique-ims-id17".
Uploaded 20 documents inline.
A questo punto, puoi anche visualizzare il tuo datastore nella console Google Google Cloud :
Quando viene eseguita una ricerca, i campi del payload vengono utilizzati per la ricerca e il campo URI del payload viene utilizzato per le citazioni. I tre campi delle proprietà chiave identificati da Gemini Enterprise sono title, description e uri. I campi del documento corrispondenti possono avere nomi diversi e la loro mappatura può essere eseguita utilizzando l'opzione Schema -> Modifica nella console Google Cloud .
Crea connettore con Google Cloud caricamento dello spazio di archiviazione
Sebbene l'importazione inline funzioni bene per lo sviluppo, i connettori di produzione devono utilizzare Google Cloud Storage per una migliore scalabilità e per attivare la modalità di riconciliazione completa. Questo approccio gestisce in modo efficiente i set di dati di grandi dimensioni e supporta l'eliminazione automatica dei documenti non più presenti nell'origine dati di terze parti.
Convertire documenti in JSONL
Per preparare i documenti per l'importazione collettiva in Discovery Engine, convertili in formato JSON Lines.
Riposo
def convert_documents_to_jsonl(
documents: List[discoveryengine.Document],
) -> str:
"""Serialize Document messages to JSONL."""
return "\n".join(
discoveryengine.Document.to_json(doc, indent=None)
for doc in documents
) + "\n"
La funzione convert_documents_to_jsonl utilizza la seguente variabile:
documents: Un elenco di oggetti discoveryengine.Document da convertire.
Carica su Google Cloud Storage
Per attivare l'importazione collettiva efficiente, organizza i dati in Google Cloud Storage.
Riposo
def upload_jsonl_to_gcs(jsonl: str, bucket_name: str, blob_name: str) -> str:
"""Upload JSONL content to Google Cloud Storage."""
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
blob.upload_from_string(jsonl, content_type="application/json")
return f"gs://{bucket_name}/{blob_name}"
La funzione upload_jsonl_to_gcs utilizza le seguenti variabili chiave:
jsonl: i contenuti della stringa formattata in JSONL da caricare.bucket_name: il nome del bucket di archiviazione Google Cloud .blob_name: il nome del blob (oggetto) all'interno del bucket specificato.
Importa da Google Cloud Storage con riconciliazione completa
Per eseguire una sincronizzazione completa dei dati utilizzando la modalità di riconciliazione completa, utilizza questo metodo. In questo modo, l'archivio dati rispecchia esattamente l'origine dati di terze parti, rimuovendo automaticamente tutti i documenti non più esistenti.
Quando importi da Google Cloud Storage, tieni presente le seguenti limitazioni:
- Una singola richiesta di importazione può contenere al massimo 100 file o 100.000 file se il parametro
dataSchemaè impostato sucontent. Ogni file può avere dimensioni fino a 2 GB o 100 MB se il parametro
dataSchemaè impostato sucontent.
Riposo
def import_documents_from_gcs(
project_id: str,
location: str,
data_store_id: str,
branch_id: str,
gcs_uri: str,
) -> discoveryengine.ImportDocumentsMetadata:
"""Bulk-import documents from Google Cloud Storage with FULL reconciliation mode."""
client = discoveryengine.DocumentServiceClient()
parent = client.branch_path(
project=project_id,
location=location,
data_store=data_store_id,
branch=branch_id,
)
gcs_source = discoveryengine.GcsSource(input_uris=[gcs_uri])
request = discoveryengine.ImportDocumentsRequest(
parent=parent,
gcs_source=gcs_source,
reconciliation_mode=
discoveryengine.ImportDocumentsRequest
.ReconciliationMode.FULL,
)
operation = client.import_documents(request=request)
operation.result()
return operation.metadata
La funzione import_documents_from_gcs utilizza le seguenti variabili chiave:
project_id: l'ID del tuo Google Cloud progetto.location: la posizione Google Cloud dell'archivio dati.data_store_id: l'ID del datastore.branch_id: l'ID del ramo all'interno del datastore (in genere "0").gcs_uri: l'URI di Google Cloud Storage che punta al file JSONL.
Test Google Cloud Caricamento di Storage
Per verificare il flusso di lavoro di importazione basato sull'archiviazione Google Cloud , esegui quanto segue:
Riposo
BUCKET = "your-existing-bucket"
BLOB = "path-to-any-blob/wp/posts.jsonl"
SITE = "https://altostrat.com"
PROJECT_ID = "ucs-3p-connectors-testing"
LOCATION = "global"
IDENTITY_MAPPING_STORE_ID = "your-unique-ims-id17" # A unique ID for your new store
DATA_STORE_ID = "your-data-store-id"
BRANCH_ID = "0"
jsonl_payload = convert_documents_to_jsonl(docs)
gcs_uri = upload_jsonl_to_gcs(jsonl_payload, BUCKET, BLOB)
posts = fetch_posts(SITE)
docs = convert_posts_to_documents(posts)
print(f"Fetched {len(posts)} posts and converted to {len(docs)} documents.")
print("Uploaded to:", gcs_uri)
metadata = import_documents_from_gcs(
PROJECT_ID, LOCATION, DATA_STORE_ID, BRANCH_ID, gcs_uri
)
print(f"Imported: {metadata.success_count} documents")
Le seguenti variabili chiave vengono utilizzate per testare il caricamento dell'archivio Google Cloud :
BUCKET: il nome del bucket di archiviazione Google Cloud .BLOB: il percorso del blob all'interno del bucket.SITE: l'URL di base dell'origine dati di terze parti.PROJECT_ID: l'ID progetto Google Cloud .LOCATION: La Google Cloud posizione delle risorse (ad es. "global").IDENTITY_MAPPING_STORE_ID: un ID univoco per l'archivio di mapping delle identità.DATA_STORE_ID: un ID univoco per il tuo datastore.BRANCH_ID: l'ID del ramo all'interno del datastore (in genere "0").jsonl_payload: i documenti convertiti in formato JSONL.gcs_uri: l'URI Google Cloud Storage del file JSONL caricato.
Output previsto:
Conchiglia
Fetched 20 posts and converted to 20 documents.
Uploaded to: gs://alex-de-bucket/wp/posts.jsonl
Imported: 20 documents
Gestisci autorizzazioni
Per gestire l'accesso a livello di documento negli ambienti aziendali, Gemini Enterprise supporta gli elenchi di controllo dell'accesso (ACL) e il mapping delle identità, che contribuiscono a limitare i contenuti che gli utenti possono visualizzare.
Abilita gli ACL nel datastore
Per abilitare le ACL durante la creazione del datastore, esegui il seguente comando:
Riposo
# get_or_create_data_store()
"data_store": discoveryengine.DataStore(
display_name=data_store_id,
industry_vertical=discoveryengine.IndustryVertical.GENERIC,
acl_enabled=True, # ADDED
)
Aggiungere ACL ai documenti
Per calcolare e includere AclInfo durante la trasformazione dei documenti, esegui il seguente comando:
Riposo
# convert_posts_to_documents()
doc = discoveryengine.Document(
id=str(post["id"]),
json_data=json.dumps(payload),
acl_info=discoveryengine.Document.AclInfo(
readers=[{
"principals": [
{"user_id": "baklavainthebalkans@gmail.com"},
{"user_id": "cloudysanfrancisco@gmail.com"}
]
}]
),
)
Rendere pubblici i contenuti
Per rendere un documento accessibile pubblicamente, imposta il campo readers come segue:
Riposo
readers=[{"idp_wide": True}]
Convalidare gli ACL
Per verificare che le configurazioni ACL funzionino come previsto, considera quanto segue:
Esegui la ricerca come utente che non ha accesso al documento.
Ispeziona la struttura del documento caricato in Cloud Storage e confrontala con un riferimento.
JSON
{
"id": "108",
"jsonData": "{...}",
"aclInfo": {
"readers": [
{
"principals": [
{ "userId": "baklavainthebalkans@gmail.com" },
{ "userId": "cloudysanfrancisco@gmail.com" }
],
"idpWide": false
}
]
}
}
Utilizzare la mappatura delle identità
Utilizza la mappatura delle identità per i seguenti scenari:
L'origine dati di terze parti utilizza identità non Google
Vuoi fare riferimento a gruppi personalizzati (ad es. wp-admins) anziché a singoli utenti
La tua API restituisce solo i nomi dei gruppi
Devi raggruppare manualmente gli utenti per scalabilità o coerenza
Il tuo sistema di terze parti implementa gli ACL utilizzando gruppi non basati su IDP e vuoi rispettare questi ACL utilizzando il datastore personalizzato in Gemini Enterprise.
Per mappare le identità:
- Crea e collega il datastore delle identità.
Importa identità esterne (ad esempio,
external_group:wp-admins). Non includereexternal_group: prefixdurante l'importazione, ad esempio:JSON
{ "externalIdentity": "wp-admins", "userId": "user@example.com" }In Informazioni ACL del tuo documento, definisci l'ID entità esterno in
principal identifier. Quando fai riferimento a gruppi personalizzati, utilizza il prefissoexternal_group:nel campogroupId.Il prefisso
external_group:è obbligatorio per gli ID gruppo all'interno delle informazioni ACL del documento durante l'importazione, ma non viene utilizzato durante l'importazione delle identità nell'archivio di mapping. Esempio di documento con mappatura delle identità:JSON
{ "id": "108", "aclInfo": { "readers": [ { "principals": [ { "userId": "cloudysanfrancisco@gmail.com" }, { "groupId": "external_group:wp-admins" } ] } ] }, "structData": { "id": 108, "date": "2025-04-24T18:16:04", ... } }
Passaggi successivi
- Per fornire un'interfaccia utente per fare query sui dati, crea un'app in Gemini Enterprise e collegala al datastore connettore personalizzato esistente.