Crea connettore personalizzato

Questa pagina descrive come creare un connettore personalizzato.

Prima di iniziare

Prima di iniziare, assicurati di avere quanto segue:

  • Verifica se la fatturazione è attivata per il tuo Google Cloud progetto.

  • Installa e inizializza l'interfaccia a riga di comando di Google Cloud . Assicurati che sia autenticato per il tuo progetto.

  • Ottieni l'accesso amministratore di Discovery Engine per il tuo progetto Google Cloud .

  • Ottieni le credenziali di accesso per l'origine dati di terze parti (ad esempio chiavi API o autenticazione del database).

  • Crea un piano di mappatura dei dati chiaro. Deve includere i campi da indicizzare e come rappresentare controllo dell'accesso'accesso, comprese le identità di terze parti.

Crea connettore di base

Questa sezione mostra come creare un connettore personalizzato nella lingua scelta. I principi e i pattern mostrati qui si applicano a qualsiasi sistema esterno. Per creare un connettore di base, è sufficiente adattare le chiamate API e le trasformazioni dei dati per l'origine specifica nella lingua scelta.

Recupera dati

Per iniziare, recupera i dati dall'origine dati di terze parti. In questo esempio, mostriamo come recuperare i post utilizzando la paginazione. Per gli ambienti di produzione, consigliamo di utilizzare un approccio di streaming per i set di dati di grandi dimensioni. In questo modo si evitano problemi di memoria che possono verificarsi durante il caricamento di tutti i dati contemporaneamente.

Python

    def fetch_posts(base_url: str, per_page: int = 15) -> List[dict]:
        #Fetch all posts from the given site.#
        url = base_url.rstrip("/") + "/wp-json/wp/v2/posts"
        posts: List[dict] = []
        page = 1
        while True:
            resp = requests.get(
                url,
                params={"page": page, "per_page": per_page},
            )
            resp.raise_for_status()
            batch = resp.json()
            posts.extend(batch)
            if len(batch) < per_page:
                break
            page += 1
        return posts

Trasformare i dati

Per convertire i dati di origine nel formato di documento di Discovery Engine, strutturali come mostrato nel seguente payload di esempio. Puoi includere tutte le coppie chiave-valore che ti servono. Ad esempio, puoi includere l'intero contenuto per una ricerca completa. In alternativa, puoi includere campi strutturati per una ricerca per sfaccettature o una combinazione di entrambi.

Python

    def convert_posts_to_documents(posts: List[dict]) -> List[discoveryengine.Document]:
        # Convert WP posts into Discovery Engine Document messages.
        docs: List[discoveryengine.Document] = []
        for post in posts:
            payload = {
                "title": post.get("title", {}).get("rendered"),
                "body": post.get("content", {}).get("rendered"),
                "url": post.get("link"),
                "author": post.get("author"),
                "categories": post.get("categories"),
                "tags": post.get("tags"),
                "date": post.get("date"),
            }
            doc = discoveryengine.Document(
                id=str(post["id"]),
                json_data=json.dumps(payload),
            )
            docs.append(doc)
        return docs

Recuperare o creare un archivio delle identità

Per gestire le identità e i gruppi di utenti per controllo dell'accesso#39;accesso, devi recuperare o creare un archivio delle identità. Questa funzione recupera un archivio delle identità esistente in base a ID, progetto e località. Se l'archivio delle identità non esiste, ne crea uno nuovo e vuoto e lo restituisce.

Python

    def get_or_create_ims_data_store(
        project_id: str,
        location: str,
        identity_mapping_store_id: str,
    ) -> discoveryengine.DataStore:
      """Get or create a DataStore."""
      # Initialize the client
      client_ims = discoveryengine.IdentityMappingStoreServiceClient()
      # Construct the parent resource name
      parent_ims = client_ims.location_path(project=project_id, location=location)

      try:
        # Create the request object
        name = f"projects/{project_id}/locations/{location}/identityMappingStores/{identity_mapping_store_id}"
        request = discoveryengine.GetIdentityMappingStoreRequest(
            name=name,
        )
        return client_ims.get_identity_mapping_store(request=request)
      except:
        # Create the IdentityMappingStore object (it can be empty for basic creation)
        identity_mapping_store = discoveryengine.IdentityMappingStore()
        # Create the request object
        request = discoveryengine.CreateIdentityMappingStoreRequest(
            parent=parent_ims,
            identity_mapping_store=identity_mapping_store,
            identity_mapping_store_id=identity_mapping_store_id,
        )
        return client_ims.create_identity_mapping_store(request=request)

La funzione get_or_create_ims_data_store utilizza le seguenti variabili chiave:

  • project_id: l'ID del tuo Google Cloud progetto.
  • location: La Google Cloud posizione dell'archivio di mappatura delle identità.
  • identity_mapping_store_id: un identificatore univoco per l'archivio delle identità.
  • client_ims: un'istanza di discoveryengine.IdentityMappingStoreServiceClient utilizzata per interagire con l'API Identity Store.
  • parent_ims: il nome della risorsa della località principale, creato utilizzando client_ims.location_path.
  • name: il nome completo della risorsa dell'archivio di mapping delle identità, utilizzato per GetIdentityMappingStoreRequest.

Importare la mappatura delle identità nell'archivio delle identità

Utilizza questa funzione per caricare le voci di mappatura delle identità nell'identity store specificato. Prende un elenco di voci di mappatura delle identità e avvia un'operazione di importazione inline. Questo è fondamentale per stabilire le relazioni tra utenti, gruppi e identità esterne necessarie per controllo dell'accesso'accesso e la personalizzazione.

Python

def load_ims_data(
    ims_store: discoveryengine.DataStore,
    id_mapping_data: list[discoveryengine.IdentityMappingEntry],
) -> discoveryengine.DataStore:
  """Get the IMS data store."""
  # Initialize the client
  client_ims = discoveryengine.IdentityMappingStoreServiceClient()

  #  Create the InlineSource object
  inline_source = discoveryengine.ImportIdentityMappingsRequest.InlineSource(
      identity_mapping_entries=id_mapping_data
  )

  # Create the main request object
  request_ims = discoveryengine.ImportIdentityMappingsRequest(
      identity_mapping_store=ims_store.name,
      inline_source=inline_source,
  )

  try:
    # Create the InlineSource object, which holds your list of entries
    operation = client_ims.import_identity_mappings(
        request=request_ims,
    )
    result = operation.result()
    return result

  except Exception as e:
    print(f"IMS Load Error: {e}")
    result = operation.result()
    return result

La funzione load_ims_data utilizza le seguenti variabili chiave:

  • ims_store: l'oggetto discoveryengine.DataStore che rappresenta l'archivio di mapping delle identità in cui verranno caricati i dati.
  • id_mapping_data: un elenco di oggetti discoveryengine.IdentityMappingEntry, ognuno dei quali contiene un'identità esterna e il relativo ID utente o gruppo.
  • result: Valore restituito di tipo discoveryengine.DataStore.

Crea datastore

Per utilizzare un connettore personalizzato, devi inizializzare un datastore per i tuoi contenuti. Utilizza default_collection per i connettori personalizzati. Il parametro IndustryVertical personalizza il comportamento del datastore per casi d'uso specifici. GENERIC è adatto alla maggior parte degli scenari. Tuttavia, puoi scegliere un valore diverso per un settore specifico, ad esempio MEDIA o HEALTHCARE_FHIR. Configura il nome visualizzato e altre proprietà in modo che siano in linea con le convenzioni e i requisiti di denominazione del progetto.

Python

def get_or_create_data_store(
    project_id: str,
    location: str,
    display_name: str,
    data_store_id: str,
    identity_mapping_store: str,
) -> discoveryengine.DataStore:
  """Get or create a DataStore."""
  client = discoveryengine.DataStoreServiceClient()
  ds_name = client.data_store_path(project_id, location, data_store_id)
  try:
    result = client.get_data_store(request={"name": ds_name})
    return result
  except:
    parent = client.collection_path(project_id, location, "default_collection")
    operation = client.create_data_store(
        request={
            "parent": parent,
            "data_store": discoveryengine.DataStore(
                display_name=display_name,
                acl_enabled=True,
                industry_vertical=discoveryengine.IndustryVertical.GENERIC,
                identity_mapping_store=identity_mapping_store,
            ),
            "data_store_id": data_store_id,
        }
    )
    result = operation.result()
    return result

La funzione get_or_create_data_store utilizza le seguenti variabili chiave:

  • project_id: l'ID del tuo Google Cloud progetto.
  • location: la posizione Google Cloud del datastore.
  • display_name: Il nome visualizzato leggibile da una persona per il datastore.
  • data_store_id: un identificatore univoco per il datastore.
  • identity_mapping_store: Il nome della risorsa dell'archivio di mapping delle identità da associare.
  • result: Valore restituito di tipo discoveryengine.DataStore.

Caricare documenti incorporati

Per inviare direttamente i documenti a Discovery Engine, utilizza il caricamento incorporato. Questo metodo utilizza la modalità di riconciliazione incrementale per impostazione predefinita e non supporta la modalità di riconciliazione completa. In modalità incrementale, vengono aggiunti nuovi documenti e quelli esistenti vengono aggiornati, ma i documenti non più presenti nell'origine non vengono eliminati. La modalità di riconciliazione completa sincronizza il datastore con i dati di origine, eliminando anche i documenti non più presenti nell'origine.

La riconciliazione incrementale è ideale per sistemi come un CRM che gestiscono modifiche frequenti e di piccole dimensioni ai dati. Anziché sincronizzare l'intero database, invia solo modifiche specifiche, rendendo il processo più rapido ed efficiente. Una sincronizzazione completa può comunque essere eseguita periodicamente per mantenere l'integrità complessiva dei dati.

Python

    def upload_documents_inline(
        project_id: str,
        location: str,
        data_store_id: str,
        branch_id: str,
        documents: List[discoveryengine.Document],
    ) -> discoveryengine.ImportDocumentsMetadata:
        """Inline import of Document messages."""
        client = discoveryengine.DocumentServiceClient()
        parent = client.branch_path(
            project=project_id,
            location=location,
            data_store=data_store_id,
            branch=branch_id,
        )
        request = discoveryengine.ImportDocumentsRequest(
            parent=parent,
            inline_source=discoveryengine.ImportDocumentsRequest.InlineSource(
                documents=documents,
            ),
        )
        operation = client.import_documents(request=request)
        operation.result()
        result = operation.metadata
        return result

La funzione upload_documents_inline utilizza le seguenti variabili chiave:

  • project_id: l'ID del tuo Google Cloud progetto.
  • location: la posizione Google Cloud dell'datastore.
  • data_store_id: l'ID del datastore.
  • branch_id: l'ID del ramo all'interno del datastore (in genere "0").
  • documents: Un elenco di oggetti discoveryengine.Document da caricare.
  • result: Valore restituito di tipo discoveryengine.ImportDocumentsMetadata.

Convalida il connettore

Per verificare che il connettore funzioni come previsto, esegui un test per assicurarti che il flusso di dati dall'origine a Discovery Engine sia corretto.

Python

    SITE = "https://altostrat.com"
    PROJECT_ID = "ucs-3p-connectors-testing"
    LOCATION = "global"
    IDENTITY_MAPPING_STORE_ID = "your-unique-ims-id17" # A unique ID for your new store
    DATA_STORE_ID = "my-acl-ds-id1"
    BRANCH_ID = "0"

    posts = fetch_posts(SITE)
    docs = convert_posts_to_documents(posts)
    print(f"Fetched {len(posts)} posts and converted to {len(docs)} documents.")

    try:
      # Step #1: Retrieve an existing identity mapping store or create a new identity mapping store
      ims_store = get_or_create_ims_data_store(PROJECT_ID, LOCATION, IDENTITY_MAPPING_STORE_ID)
      print(f"STEP #1: IMS Store Retrieval/Creation: {ims_store}")

      RAW_IDENTITY_MAPPING_DATA = [
          discoveryengine.IdentityMappingEntry(
              external_identity="external_id_1",
              user_id="testuser1@example.com",
          ),
          discoveryengine.IdentityMappingEntry(
              external_identity="external_id_2",
              user_id="testuser2@example.com",
          ),
          discoveryengine.IdentityMappingEntry(
              external_identity="external_id_2",
              group_id="testgroup1@example.com",
          )
      ]

      # Step #2: Load IMS Data
      response = load_ims_data(ims_store, RAW_IDENTITY_MAPPING_DATA)
      print(
          "\nStep #2: Load Data in IMS Store successful.", response
      )

      # Step #3: Create Entity Data Store & Bind IMS Data Store
      data_store =  get_or_create_data_store(PROJECT_ID, LOCATION, "my-acl-datastore", DATA_STORE_ID, ims_store.name)
      print("\nStep #3: Entity Data Store Create Result: ", data_store)

      metadata = upload_documents_inline(
          PROJECT_ID, LOCATION, DATA_STORE_ID, BRANCH_ID, docs
      )
      print(f"Uploaded {metadata.success_count} documents inline.")

    except gcp_exceptions.GoogleAPICallError as e:
      print(f"\n--- API Call Failed ---")
      print(f"Server Error Message: {e.message}")
      print(f"Status Code: {e.code}")

    except Exception as e:
      print(f"An error occurred: {e}")

Verifica che il codice del connettore utilizzi le seguenti variabili chiave:

  • SITE: l'URL di base dell'origine dati di terze parti.
  • PROJECT_ID: l'ID progetto Google Cloud .
  • LOCATION: la Google Cloud posizione delle risorse.
  • IDENTITY_MAPPING_STORE_ID: un ID univoco per l'archivio di mapping delle identità.
  • DATA_STORE_ID: un ID univoco per il tuo datastore.
  • BRANCH_ID: l'ID del ramo all'interno del datastore.
  • posts: memorizza i post recuperati dall'origine di terze parti.
  • docs: Memorizza i documenti convertiti nel formato discoveryengine.Document.
  • ims_store: l'oggetto discoveryengine.DataStore recuperato o creato per la mappatura delle identità.
  • RAW_IDENTITY_MAPPING_DATA: un elenco di oggetti discoveryengine.IdentityMappingEntry.

Output previsto:

Shell

  Fetched 20 posts and converted to 20 documents.
  STEP #1: IMS Store Retrieval/Creation: "projects/ <Project Number>/locations/global/identityMappingStores/your-unique-ims-id17"
  Step #2: Load Data in IMS Store successful.
  Step #3: Entity Data Store Create Result: "projects/ <Project Number>/locations/global/collections/default_collection/dataStores/my-acl-ds-id1"
  display_name: "my-acl-datastore"
  industry_vertical: GENERIC
  create_time {
    seconds: 1760906997
    nanos: 192641000
  }
  default_schema_id: "default_schema"
  acl_enabled: true
  identity_mapping_store: "projects/ <Project Number>/locations/global/identityMappingStores/your-unique-ims-id17".
  Uploaded 20 documents inline.

A questo punto, puoi anche visualizzare il tuo datastore nella console Google Google Cloud :

Datastore del connettore personalizzato
Datastore del connettore personalizzato.

Crea connettore con Google Cloud caricamento di Storage

Sebbene l'importazione inline funzioni bene per lo sviluppo, i connettori di produzione devono utilizzare Google Cloud Storage per una migliore scalabilità e per attivare la modalità di riconciliazione completa. Questo approccio gestisce in modo efficiente i set di dati di grandi dimensioni e supporta l'eliminazione automatica dei documenti non più presenti nell'origine dati di terze parti.

Convertire documenti in JSONL

Per preparare i documenti per l'importazione collettiva in Discovery Engine, convertili in formato JSON Lines.

Python

    def convert_documents_to_jsonl(
        documents: List[discoveryengine.Document],
    ) -> str:
        """Serialize Document messages to JSONL."""
        return "\n".join(
            discoveryengine.Document.to_json(doc, indent=None)
            for doc in documents
        ) + "\n"

La funzione convert_documents_to_jsonl utilizza la seguente variabile:

  • documents: Un elenco di oggetti discoveryengine.Document da convertire.

Carica su Google Cloud Storage

Per attivare l'importazione collettiva efficiente, organizza i dati in Google Cloud Storage.

Python

    def upload_jsonl_to_gcs(jsonl: str, bucket_name: str, blob_name: str) -> str:
        """Upload JSONL content to Google Cloud Storage."""
        client = storage.Client()
        bucket = client.bucket(bucket_name)
        blob = bucket.blob(blob_name)
        blob.upload_from_string(jsonl, content_type="application/json")
        return f"gs://{bucket_name}/{blob_name}"

La funzione upload_jsonl_to_gcs utilizza le seguenti variabili chiave:

  • jsonl: i contenuti della stringa formattata in JSONL da caricare.
  • bucket_name: il nome del bucket Storage. Google Cloud
  • blob_name: il nome del blob (oggetto) all'interno del bucket specificato.

Importa da Google Cloud Storage con riconciliazione completa

Per eseguire una sincronizzazione completa dei dati utilizzando la modalità di riconciliazione completa, utilizza questo metodo. In questo modo, l'datastore rispecchia esattamente l'origine dati di terze parti, rimuovendo automaticamente tutti i documenti non più esistenti.

Python

    def import_documents_from_gcs(
        project_id: str,
        location: str,
        data_store_id: str,
        branch_id: str,
        gcs_uri: str,
    ) -> discoveryengine.ImportDocumentsMetadata:
        """Bulk-import documents from Google Cloud Storage with FULL reconciliation mode."""
        client = discoveryengine.DocumentServiceClient()
        parent = client.branch_path(
            project=project_id,
            location=location,
            data_store=data_store_id,
            branch=branch_id,
        )
        gcs_source = discoveryengine.GcsSource(input_uris=[gcs_uri])
        request = discoveryengine.ImportDocumentsRequest(
            parent=parent,
            gcs_source=gcs_source,
            reconciliation_mode=
                discoveryengine.ImportDocumentsRequest
                .ReconciliationMode.FULL,
        )
        operation = client.import_documents(request=request)
        operation.result()
        return operation.metadata

La funzione import_documents_from_gcs utilizza le seguenti variabili chiave:

  • project_id: l'ID del tuo Google Cloud progetto.
  • location: la posizione Google Cloud dell'datastore.
  • data_store_id: l'ID del datastore.
  • branch_id: l'ID del ramo all'interno del datastore (in genere "0").
  • gcs_uri: l'URI di Google Cloud Storage che rimanda al file JSONL.

Test Google Cloud Caricamento di Storage

Per verificare il flusso di lavoro di importazione basato sull'archiviazione Google Cloud , esegui quanto segue:

Python

  BUCKET = "your-existing-bucket"
  BLOB = "path-to-any-blob/wp/posts.jsonl"
  SITE = "https://altostrat.com"
  PROJECT_ID = "ucs-3p-connectors-testing"
  LOCATION = "global"
  IDENTITY_MAPPING_STORE_ID = "your-unique-ims-id17" # A unique ID for your new store
  DATA_STORE_ID = "your-data-store-id"
  BRANCH_ID = "0"
  jsonl_payload = convert_documents_to_jsonl(docs)
  gcs_uri = upload_jsonl_to_gcs(jsonl_payload, BUCKET, BLOB)
  posts = fetch_posts(SITE)
  docs = convert_posts_to_documents(posts)
  print(f"Fetched {len(posts)} posts and converted to {len(docs)} documents.")
  print("Uploaded to:", gcs_uri)

  metadata = import_documents_from_gcs(
      PROJECT_ID, LOCATION, DATA_STORE_ID, BRANCH_ID, gcs_uri
  )
  print(f"Imported: {metadata.success_count} documents")

Le seguenti variabili chiave vengono utilizzate per testare il caricamento dell' Google Cloud Storage:

  • BUCKET: il nome del bucket Storage. Google Cloud
  • BLOB: il percorso del blob all'interno del bucket.
  • SITE: l'URL di base dell'origine dati di terze parti.
  • PROJECT_ID: l'ID progetto Google Cloud .
  • LOCATION: la Google Cloud posizione delle risorse (ad es. "global").
  • IDENTITY_MAPPING_STORE_ID: un ID univoco per l'archivio di mapping delle identità.
  • DATA_STORE_ID: un ID univoco per il tuo datastore.
  • BRANCH_ID: l'ID del ramo all'interno del datastore (in genere "0").
  • jsonl_payload: i documenti convertiti nel formato JSONL.
  • gcs_uri: l'URI Google Cloud Storage del file JSONL caricato.

Output previsto:

Shell

    Fetched 20 posts and converted to 20 documents.
    Uploaded to: gs://alex-de-bucket/wp/posts.jsonl
    Imported: 20 documents

Gestisci autorizzazioni

Per gestire l'accesso a livello di documento negli ambienti aziendali, Gemini Enterprise supporta gli elenchi di controllo dell'accesso (ACL) e il mapping delle identità, che contribuiscono a limitare i contenuti che gli utenti possono visualizzare.

Abilita gli ACL nel datastore

Per abilitare le ACL durante la creazione del datastore, esegui il seguente comando:

Python

  # get_or_create_data_store()
  "data_store": discoveryengine.DataStore(
      display_name=data_store_id,
      industry_vertical=discoveryengine.IndustryVertical.GENERIC,
      acl_enabled=True, # ADDED
  )

Aggiungere ACL ai documenti

Per calcolare e includere AclInfo durante la trasformazione dei documenti, esegui il comando seguente:

Python

  # convert_posts_to_documents()
  doc = discoveryengine.Document(
      id=str(post["id"]),
      json_data=json.dumps(payload),
      acl_info=discoveryengine.Document.AclInfo(
          readers=[{
              "principals": [
                  {"user_id": "baklavainthebalkans@gmail.com"},
                  {"user_id": "cloudysanfrancisco@gmail.com"}
              ]
          }]
      ),
  )

Rendere pubblici i contenuti

Per rendere un documento accessibile pubblicamente, imposta il campo readers come segue:

Python

  readers=[{"idp_wide": True}]

Convalidare gli ACL

Per verificare che le configurazioni ACL funzionino come previsto, considera quanto segue:

  • Esegui la ricerca come utente che non ha accesso al documento.

  • Ispeziona la struttura del documento caricato in Cloud Storage e confrontala con un riferimento.

JSON

  {
    "id": "108",
    "jsonData": "{...}",
    "aclInfo": {
      "readers": [
        {
          "principals": [
            { "userId": "baklavainthebalkans@gmail.com" },
            { "userId": "cloudysanfrancisco@gmail.com" }
          ],
          "idpWide": false
        }
      ]
    }
  }

Utilizzare la mappatura delle identità

Utilizza la mappatura delle identità per i seguenti scenari:

  • L'origine dati di terze parti utilizza identità non Google

  • Vuoi fare riferimento a gruppi personalizzati (ad es. wp-admins) anziché a singoli utenti

  • La tua API restituisce solo i nomi dei gruppi

  • Devi raggruppare manualmente gli utenti per scalabilità o coerenza

Per eseguire la mappatura delle identità, segui questi passaggi:

  1. Crea e collega il datastore delle identità.
  2. Importa identità esterne (ad esempio, external_group:wp-admins). Non includere il prefisso external_group: durante l'importazione, ad esempio:

    JSON

      {
        "externalIdentity": "wp-admins",
        "userId": "user@example.com"
      }
    
  3. Nelle informazioni ACL del tuo documento, definisci l'ID entità esterna in principal identifier. Quando fai riferimento a gruppi personalizzati, utilizza il prefisso external_group: nel campo groupId.

  4. Il prefisso external_group: è obbligatorio per gli ID gruppo all'interno delle informazioni ACL del documento durante l'importazione, ma non viene utilizzato durante l'importazione delle identità nell'archivio di mapping. Esempio di documento con mappatura delle identità:

    JSON

      {
        "id": "108",
        "aclInfo": {
          "readers": [
            {
              "principals": [
                {
                  "userId": "cloudysanfrancisco@gmail.com"
                },
                {
                  "groupId": "external_group:wp-admins"
                }
              ]
            }
          ]
        },
        "structData": {
          "id": 108,
          "date": "2025-04-24T18:16:04",
          ...
        }
      }
    

Passaggi successivi

Librerie client Gemini Enterprise