Questa pagina descrive come creare un connettore personalizzato.
Prima di iniziare
Prima di iniziare, assicurati di avere quanto segue:
Verifica se la fatturazione è attivata per il tuo Google Cloud progetto.
Installa e inizializza l'interfaccia a riga di comando di Google Cloud . Assicurati che sia autenticato per il tuo progetto.
Ottieni l'accesso amministratore di Discovery Engine per il tuo progetto Google Cloud .
Ottieni le credenziali di accesso per l'origine dati di terze parti (ad esempio chiavi API o autenticazione del database).
Crea un piano di mappatura dei dati chiaro. Deve includere i campi da indicizzare e come rappresentare controllo dell'accesso'accesso, comprese le identità di terze parti.
Crea connettore di base
Questa sezione mostra come creare un connettore personalizzato nella lingua scelta. I principi e i pattern mostrati qui si applicano a qualsiasi sistema esterno. Per creare un connettore di base, è sufficiente adattare le chiamate API e le trasformazioni dei dati per l'origine specifica nella lingua scelta.
Recupera dati
Per iniziare, recupera i dati dall'origine dati di terze parti. In questo esempio, mostriamo come recuperare i post utilizzando la paginazione. Per gli ambienti di produzione, consigliamo di utilizzare un approccio di streaming per i set di dati di grandi dimensioni. In questo modo si evitano problemi di memoria che possono verificarsi durante il caricamento di tutti i dati contemporaneamente.
Python
def fetch_posts(base_url: str, per_page: int = 15) -> List[dict]:
#Fetch all posts from the given site.#
url = base_url.rstrip("/") + "/wp-json/wp/v2/posts"
posts: List[dict] = []
page = 1
while True:
resp = requests.get(
url,
params={"page": page, "per_page": per_page},
)
resp.raise_for_status()
batch = resp.json()
posts.extend(batch)
if len(batch) < per_page:
break
page += 1
return posts
Trasformare i dati
Per convertire i dati di origine nel formato di documento di Discovery Engine, strutturali come mostrato nel seguente payload di esempio. Puoi includere tutte le coppie chiave-valore che ti servono. Ad esempio, puoi includere l'intero contenuto per una ricerca completa. In alternativa, puoi includere campi strutturati per una ricerca per sfaccettature o una combinazione di entrambi.
Python
def convert_posts_to_documents(posts: List[dict]) -> List[discoveryengine.Document]:
# Convert WP posts into Discovery Engine Document messages.
docs: List[discoveryengine.Document] = []
for post in posts:
payload = {
"title": post.get("title", {}).get("rendered"),
"body": post.get("content", {}).get("rendered"),
"url": post.get("link"),
"author": post.get("author"),
"categories": post.get("categories"),
"tags": post.get("tags"),
"date": post.get("date"),
}
doc = discoveryengine.Document(
id=str(post["id"]),
json_data=json.dumps(payload),
)
docs.append(doc)
return docs
Recuperare o creare un archivio delle identità
Per gestire le identità e i gruppi di utenti per controllo dell'accesso#39;accesso, devi recuperare o creare un archivio delle identità. Questa funzione recupera un archivio delle identità esistente in base a ID, progetto e località. Se l'archivio delle identità non esiste, ne crea uno nuovo e vuoto e lo restituisce.
Python
def get_or_create_ims_data_store(
project_id: str,
location: str,
identity_mapping_store_id: str,
) -> discoveryengine.DataStore:
"""Get or create a DataStore."""
# Initialize the client
client_ims = discoveryengine.IdentityMappingStoreServiceClient()
# Construct the parent resource name
parent_ims = client_ims.location_path(project=project_id, location=location)
try:
# Create the request object
name = f"projects/{project_id}/locations/{location}/identityMappingStores/{identity_mapping_store_id}"
request = discoveryengine.GetIdentityMappingStoreRequest(
name=name,
)
return client_ims.get_identity_mapping_store(request=request)
except:
# Create the IdentityMappingStore object (it can be empty for basic creation)
identity_mapping_store = discoveryengine.IdentityMappingStore()
# Create the request object
request = discoveryengine.CreateIdentityMappingStoreRequest(
parent=parent_ims,
identity_mapping_store=identity_mapping_store,
identity_mapping_store_id=identity_mapping_store_id,
)
return client_ims.create_identity_mapping_store(request=request)
La funzione get_or_create_ims_data_store utilizza le seguenti variabili chiave:
project_id: l'ID del tuo Google Cloud progetto.location: La Google Cloud posizione dell'archivio di mappatura delle identità.identity_mapping_store_id: un identificatore univoco per l'archivio delle identità.client_ims: un'istanza di discoveryengine.IdentityMappingStoreServiceClient utilizzata per interagire con l'API Identity Store.parent_ims: il nome della risorsa della località principale, creato utilizzando client_ims.location_path.name: il nome completo della risorsa dell'archivio di mapping delle identità, utilizzato per GetIdentityMappingStoreRequest.
Importare la mappatura delle identità nell'archivio delle identità
Utilizza questa funzione per caricare le voci di mappatura delle identità nell'identity store specificato. Prende un elenco di voci di mappatura delle identità e avvia un'operazione di importazione inline. Questo è fondamentale per stabilire le relazioni tra utenti, gruppi e identità esterne necessarie per controllo dell'accesso'accesso e la personalizzazione.
Python
def load_ims_data(
ims_store: discoveryengine.DataStore,
id_mapping_data: list[discoveryengine.IdentityMappingEntry],
) -> discoveryengine.DataStore:
"""Get the IMS data store."""
# Initialize the client
client_ims = discoveryengine.IdentityMappingStoreServiceClient()
# Create the InlineSource object
inline_source = discoveryengine.ImportIdentityMappingsRequest.InlineSource(
identity_mapping_entries=id_mapping_data
)
# Create the main request object
request_ims = discoveryengine.ImportIdentityMappingsRequest(
identity_mapping_store=ims_store.name,
inline_source=inline_source,
)
try:
# Create the InlineSource object, which holds your list of entries
operation = client_ims.import_identity_mappings(
request=request_ims,
)
result = operation.result()
return result
except Exception as e:
print(f"IMS Load Error: {e}")
result = operation.result()
return result
La funzione load_ims_data utilizza le seguenti variabili chiave:
ims_store: l'oggetto discoveryengine.DataStore che rappresenta l'archivio di mapping delle identità in cui verranno caricati i dati.id_mapping_data: un elenco di oggetti discoveryengine.IdentityMappingEntry, ognuno dei quali contiene un'identità esterna e il relativo ID utente o gruppo.result: Valore restituito di tipo discoveryengine.DataStore.
Crea datastore
Per utilizzare un connettore personalizzato, devi inizializzare un datastore per i tuoi contenuti. Utilizza default_collection per i connettori personalizzati. Il parametro IndustryVertical personalizza il comportamento del datastore per casi d'uso specifici. GENERIC è adatto alla maggior parte degli scenari. Tuttavia, puoi scegliere un valore diverso per un settore specifico, ad esempio MEDIA o HEALTHCARE_FHIR. Configura il nome visualizzato e altre proprietà in modo che siano in linea con le convenzioni e i requisiti di denominazione del progetto.
Python
def get_or_create_data_store(
project_id: str,
location: str,
display_name: str,
data_store_id: str,
identity_mapping_store: str,
) -> discoveryengine.DataStore:
"""Get or create a DataStore."""
client = discoveryengine.DataStoreServiceClient()
ds_name = client.data_store_path(project_id, location, data_store_id)
try:
result = client.get_data_store(request={"name": ds_name})
return result
except:
parent = client.collection_path(project_id, location, "default_collection")
operation = client.create_data_store(
request={
"parent": parent,
"data_store": discoveryengine.DataStore(
display_name=display_name,
acl_enabled=True,
industry_vertical=discoveryengine.IndustryVertical.GENERIC,
identity_mapping_store=identity_mapping_store,
),
"data_store_id": data_store_id,
}
)
result = operation.result()
return result
La funzione get_or_create_data_store utilizza le seguenti variabili chiave:
project_id: l'ID del tuo Google Cloud progetto.location: la posizione Google Cloud del datastore.display_name: Il nome visualizzato leggibile da una persona per il datastore.data_store_id: un identificatore univoco per il datastore.identity_mapping_store: Il nome della risorsa dell'archivio di mapping delle identità da associare.result: Valore restituito di tipo discoveryengine.DataStore.
Caricare documenti incorporati
Per inviare direttamente i documenti a Discovery Engine, utilizza il caricamento incorporato. Questo metodo utilizza la modalità di riconciliazione incrementale per impostazione predefinita e non supporta la modalità di riconciliazione completa. In modalità incrementale, vengono aggiunti nuovi documenti e quelli esistenti vengono aggiornati, ma i documenti non più presenti nell'origine non vengono eliminati. La modalità di riconciliazione completa sincronizza il datastore con i dati di origine, eliminando anche i documenti non più presenti nell'origine.
La riconciliazione incrementale è ideale per sistemi come un CRM che gestiscono modifiche frequenti e di piccole dimensioni ai dati. Anziché sincronizzare l'intero database, invia solo modifiche specifiche, rendendo il processo più rapido ed efficiente. Una sincronizzazione completa può comunque essere eseguita periodicamente per mantenere l'integrità complessiva dei dati.
Python
def upload_documents_inline(
project_id: str,
location: str,
data_store_id: str,
branch_id: str,
documents: List[discoveryengine.Document],
) -> discoveryengine.ImportDocumentsMetadata:
"""Inline import of Document messages."""
client = discoveryengine.DocumentServiceClient()
parent = client.branch_path(
project=project_id,
location=location,
data_store=data_store_id,
branch=branch_id,
)
request = discoveryengine.ImportDocumentsRequest(
parent=parent,
inline_source=discoveryengine.ImportDocumentsRequest.InlineSource(
documents=documents,
),
)
operation = client.import_documents(request=request)
operation.result()
result = operation.metadata
return result
La funzione upload_documents_inline utilizza le seguenti variabili chiave:
project_id: l'ID del tuo Google Cloud progetto.location: la posizione Google Cloud dell'datastore.data_store_id: l'ID del datastore.branch_id: l'ID del ramo all'interno del datastore (in genere "0").documents: Un elenco di oggetti discoveryengine.Document da caricare.result: Valore restituito di tipo discoveryengine.ImportDocumentsMetadata.
Convalida il connettore
Per verificare che il connettore funzioni come previsto, esegui un test per assicurarti che il flusso di dati dall'origine a Discovery Engine sia corretto.
Python
SITE = "https://altostrat.com"
PROJECT_ID = "ucs-3p-connectors-testing"
LOCATION = "global"
IDENTITY_MAPPING_STORE_ID = "your-unique-ims-id17" # A unique ID for your new store
DATA_STORE_ID = "my-acl-ds-id1"
BRANCH_ID = "0"
posts = fetch_posts(SITE)
docs = convert_posts_to_documents(posts)
print(f"Fetched {len(posts)} posts and converted to {len(docs)} documents.")
try:
# Step #1: Retrieve an existing identity mapping store or create a new identity mapping store
ims_store = get_or_create_ims_data_store(PROJECT_ID, LOCATION, IDENTITY_MAPPING_STORE_ID)
print(f"STEP #1: IMS Store Retrieval/Creation: {ims_store}")
RAW_IDENTITY_MAPPING_DATA = [
discoveryengine.IdentityMappingEntry(
external_identity="external_id_1",
user_id="testuser1@example.com",
),
discoveryengine.IdentityMappingEntry(
external_identity="external_id_2",
user_id="testuser2@example.com",
),
discoveryengine.IdentityMappingEntry(
external_identity="external_id_2",
group_id="testgroup1@example.com",
)
]
# Step #2: Load IMS Data
response = load_ims_data(ims_store, RAW_IDENTITY_MAPPING_DATA)
print(
"\nStep #2: Load Data in IMS Store successful.", response
)
# Step #3: Create Entity Data Store & Bind IMS Data Store
data_store = get_or_create_data_store(PROJECT_ID, LOCATION, "my-acl-datastore", DATA_STORE_ID, ims_store.name)
print("\nStep #3: Entity Data Store Create Result: ", data_store)
metadata = upload_documents_inline(
PROJECT_ID, LOCATION, DATA_STORE_ID, BRANCH_ID, docs
)
print(f"Uploaded {metadata.success_count} documents inline.")
except gcp_exceptions.GoogleAPICallError as e:
print(f"\n--- API Call Failed ---")
print(f"Server Error Message: {e.message}")
print(f"Status Code: {e.code}")
except Exception as e:
print(f"An error occurred: {e}")
Verifica che il codice del connettore utilizzi le seguenti variabili chiave:
SITE: l'URL di base dell'origine dati di terze parti.PROJECT_ID: l'ID progetto Google Cloud .LOCATION: la Google Cloud posizione delle risorse.IDENTITY_MAPPING_STORE_ID: un ID univoco per l'archivio di mapping delle identità.DATA_STORE_ID: un ID univoco per il tuo datastore.BRANCH_ID: l'ID del ramo all'interno del datastore.posts: memorizza i post recuperati dall'origine di terze parti.docs: Memorizza i documenti convertiti nel formato discoveryengine.Document.ims_store: l'oggetto discoveryengine.DataStore recuperato o creato per la mappatura delle identità.RAW_IDENTITY_MAPPING_DATA: un elenco di oggetti discoveryengine.IdentityMappingEntry.
Output previsto:
Shell
Fetched 20 posts and converted to 20 documents.
STEP #1: IMS Store Retrieval/Creation: "projects/ <Project Number>/locations/global/identityMappingStores/your-unique-ims-id17"
Step #2: Load Data in IMS Store successful.
Step #3: Entity Data Store Create Result: "projects/ <Project Number>/locations/global/collections/default_collection/dataStores/my-acl-ds-id1"
display_name: "my-acl-datastore"
industry_vertical: GENERIC
create_time {
seconds: 1760906997
nanos: 192641000
}
default_schema_id: "default_schema"
acl_enabled: true
identity_mapping_store: "projects/ <Project Number>/locations/global/identityMappingStores/your-unique-ims-id17".
Uploaded 20 documents inline.
A questo punto, puoi anche visualizzare il tuo datastore nella console Google Google Cloud :
Crea connettore con Google Cloud caricamento di Storage
Sebbene l'importazione inline funzioni bene per lo sviluppo, i connettori di produzione devono utilizzare Google Cloud Storage per una migliore scalabilità e per attivare la modalità di riconciliazione completa. Questo approccio gestisce in modo efficiente i set di dati di grandi dimensioni e supporta l'eliminazione automatica dei documenti non più presenti nell'origine dati di terze parti.
Convertire documenti in JSONL
Per preparare i documenti per l'importazione collettiva in Discovery Engine, convertili in formato JSON Lines.
Python
def convert_documents_to_jsonl(
documents: List[discoveryengine.Document],
) -> str:
"""Serialize Document messages to JSONL."""
return "\n".join(
discoveryengine.Document.to_json(doc, indent=None)
for doc in documents
) + "\n"
La funzione convert_documents_to_jsonl utilizza la seguente variabile:
documents: Un elenco di oggetti discoveryengine.Document da convertire.
Carica su Google Cloud Storage
Per attivare l'importazione collettiva efficiente, organizza i dati in Google Cloud Storage.
Python
def upload_jsonl_to_gcs(jsonl: str, bucket_name: str, blob_name: str) -> str:
"""Upload JSONL content to Google Cloud Storage."""
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
blob.upload_from_string(jsonl, content_type="application/json")
return f"gs://{bucket_name}/{blob_name}"
La funzione upload_jsonl_to_gcs utilizza le seguenti variabili chiave:
jsonl: i contenuti della stringa formattata in JSONL da caricare.bucket_name: il nome del bucket Storage. Google Cloudblob_name: il nome del blob (oggetto) all'interno del bucket specificato.
Importa da Google Cloud Storage con riconciliazione completa
Per eseguire una sincronizzazione completa dei dati utilizzando la modalità di riconciliazione completa, utilizza questo metodo. In questo modo, l'datastore rispecchia esattamente l'origine dati di terze parti, rimuovendo automaticamente tutti i documenti non più esistenti.
Python
def import_documents_from_gcs(
project_id: str,
location: str,
data_store_id: str,
branch_id: str,
gcs_uri: str,
) -> discoveryengine.ImportDocumentsMetadata:
"""Bulk-import documents from Google Cloud Storage with FULL reconciliation mode."""
client = discoveryengine.DocumentServiceClient()
parent = client.branch_path(
project=project_id,
location=location,
data_store=data_store_id,
branch=branch_id,
)
gcs_source = discoveryengine.GcsSource(input_uris=[gcs_uri])
request = discoveryengine.ImportDocumentsRequest(
parent=parent,
gcs_source=gcs_source,
reconciliation_mode=
discoveryengine.ImportDocumentsRequest
.ReconciliationMode.FULL,
)
operation = client.import_documents(request=request)
operation.result()
return operation.metadata
La funzione import_documents_from_gcs utilizza le seguenti variabili chiave:
project_id: l'ID del tuo Google Cloud progetto.location: la posizione Google Cloud dell'datastore.data_store_id: l'ID del datastore.branch_id: l'ID del ramo all'interno del datastore (in genere "0").gcs_uri: l'URI di Google Cloud Storage che rimanda al file JSONL.
Test Google Cloud Caricamento di Storage
Per verificare il flusso di lavoro di importazione basato sull'archiviazione Google Cloud , esegui quanto segue:
Python
BUCKET = "your-existing-bucket"
BLOB = "path-to-any-blob/wp/posts.jsonl"
SITE = "https://altostrat.com"
PROJECT_ID = "ucs-3p-connectors-testing"
LOCATION = "global"
IDENTITY_MAPPING_STORE_ID = "your-unique-ims-id17" # A unique ID for your new store
DATA_STORE_ID = "your-data-store-id"
BRANCH_ID = "0"
jsonl_payload = convert_documents_to_jsonl(docs)
gcs_uri = upload_jsonl_to_gcs(jsonl_payload, BUCKET, BLOB)
posts = fetch_posts(SITE)
docs = convert_posts_to_documents(posts)
print(f"Fetched {len(posts)} posts and converted to {len(docs)} documents.")
print("Uploaded to:", gcs_uri)
metadata = import_documents_from_gcs(
PROJECT_ID, LOCATION, DATA_STORE_ID, BRANCH_ID, gcs_uri
)
print(f"Imported: {metadata.success_count} documents")
Le seguenti variabili chiave vengono utilizzate per testare il caricamento dell' Google Cloud Storage:
BUCKET: il nome del bucket Storage. Google CloudBLOB: il percorso del blob all'interno del bucket.SITE: l'URL di base dell'origine dati di terze parti.PROJECT_ID: l'ID progetto Google Cloud .LOCATION: la Google Cloud posizione delle risorse (ad es. "global").IDENTITY_MAPPING_STORE_ID: un ID univoco per l'archivio di mapping delle identità.DATA_STORE_ID: un ID univoco per il tuo datastore.BRANCH_ID: l'ID del ramo all'interno del datastore (in genere "0").jsonl_payload: i documenti convertiti nel formato JSONL.gcs_uri: l'URI Google Cloud Storage del file JSONL caricato.
Output previsto:
Shell
Fetched 20 posts and converted to 20 documents.
Uploaded to: gs://alex-de-bucket/wp/posts.jsonl
Imported: 20 documents
Gestisci autorizzazioni
Per gestire l'accesso a livello di documento negli ambienti aziendali, Gemini Enterprise supporta gli elenchi di controllo dell'accesso (ACL) e il mapping delle identità, che contribuiscono a limitare i contenuti che gli utenti possono visualizzare.
Abilita gli ACL nel datastore
Per abilitare le ACL durante la creazione del datastore, esegui il seguente comando:
Python
# get_or_create_data_store()
"data_store": discoveryengine.DataStore(
display_name=data_store_id,
industry_vertical=discoveryengine.IndustryVertical.GENERIC,
acl_enabled=True, # ADDED
)
Aggiungere ACL ai documenti
Per calcolare e includere AclInfo durante la trasformazione dei documenti, esegui il comando seguente:
Python
# convert_posts_to_documents()
doc = discoveryengine.Document(
id=str(post["id"]),
json_data=json.dumps(payload),
acl_info=discoveryengine.Document.AclInfo(
readers=[{
"principals": [
{"user_id": "baklavainthebalkans@gmail.com"},
{"user_id": "cloudysanfrancisco@gmail.com"}
]
}]
),
)
Rendere pubblici i contenuti
Per rendere un documento accessibile pubblicamente, imposta il campo readers come segue:
Python
readers=[{"idp_wide": True}]
Convalidare gli ACL
Per verificare che le configurazioni ACL funzionino come previsto, considera quanto segue:
Esegui la ricerca come utente che non ha accesso al documento.
Ispeziona la struttura del documento caricato in Cloud Storage e confrontala con un riferimento.
JSON
{
"id": "108",
"jsonData": "{...}",
"aclInfo": {
"readers": [
{
"principals": [
{ "userId": "baklavainthebalkans@gmail.com" },
{ "userId": "cloudysanfrancisco@gmail.com" }
],
"idpWide": false
}
]
}
}
Utilizzare la mappatura delle identità
Utilizza la mappatura delle identità per i seguenti scenari:
L'origine dati di terze parti utilizza identità non Google
Vuoi fare riferimento a gruppi personalizzati (ad es. wp-admins) anziché a singoli utenti
La tua API restituisce solo i nomi dei gruppi
Devi raggruppare manualmente gli utenti per scalabilità o coerenza
Per eseguire la mappatura delle identità, segui questi passaggi:
- Crea e collega il datastore delle identità.
Importa identità esterne (ad esempio, external_group:wp-admins). Non includere il prefisso external_group: durante l'importazione, ad esempio:
JSON
{ "externalIdentity": "wp-admins", "userId": "user@example.com" }Nelle informazioni ACL del tuo documento, definisci l'ID entità esterna in
principal identifier. Quando fai riferimento a gruppi personalizzati, utilizza il prefissoexternal_group:nel campogroupId.Il prefisso
external_group:è obbligatorio per gli ID gruppo all'interno delle informazioni ACL del documento durante l'importazione, ma non viene utilizzato durante l'importazione delle identità nell'archivio di mapping. Esempio di documento con mappatura delle identità:JSON
{ "id": "108", "aclInfo": { "readers": [ { "principals": [ { "userId": "cloudysanfrancisco@gmail.com" }, { "groupId": "external_group:wp-admins" } ] } ] }, "structData": { "id": 108, "date": "2025-04-24T18:16:04", ... } }
Passaggi successivi
- Per fornire un'interfaccia utente per interrogare i dati, crea un'app in Gemini Enterprise e collegala al datastore connettore personalizzato esistente.