Créer un connecteur personnalisé

Cette page explique comment créer un connecteur personnalisé.

Avant de commencer

Avant de commencer, assurez-vous de disposer des éléments suivants :

  • Vérifiez si la facturation est activée pour votre projet Google Cloud .

  • Installez et initialisez la CLI Google Cloud . Assurez-vous qu'il est authentifié pour votre projet.

  • Obtenez un accès administrateur Discovery Engine pour votre projet Google Cloud .

  • Obtenez des identifiants d'accès pour votre source de données tierce (tels que des clés API ou une authentification de base de données).

  • Créez un plan de mappage des données clair. Cela doit inclure les champs à indexer et la manière de représenter le contrôle des accès, y compris les identités tierces.

Créer un connecteur de base

Cette section explique comment créer un connecteur personnalisé dans la langue de votre choix. Les principes et les modèles présentés ici s'appliquent à tous les systèmes externes. Pour créer un connecteur de base, il vous suffit d'adapter les appels d'API et les transformations de données pour votre source spécifique dans la langue de votre choix.

Récupérer des données

Pour commencer, récupérez les données de votre source de données tierce. Dans cet exemple, nous allons vous montrer comment récupérer des posts à l'aide de la pagination. Pour les environnements de production, nous vous recommandons d'utiliser une approche de streaming pour les grands ensembles de données. Cela évite les problèmes de mémoire qui peuvent survenir lors du chargement de toutes les données en même temps.

Python

    def fetch_posts(base_url: str, per_page: int = 15) -> List[dict]:
        #Fetch all posts from the given site.#
        url = base_url.rstrip("/") + "/wp-json/wp/v2/posts"
        posts: List[dict] = []
        page = 1
        while True:
            resp = requests.get(
                url,
                params={"page": page, "per_page": per_page},
            )
            resp.raise_for_status()
            batch = resp.json()
            posts.extend(batch)
            if len(batch) < per_page:
                break
            page += 1
        return posts

Transformer les données

Pour convertir vos données sources au format de document Discovery Engine, structurez-les comme indiqué dans l'exemple de charge utile suivant. Vous pouvez inclure autant de paires clé/valeur que nécessaire. Par exemple, vous pouvez inclure l'intégralité du contenu pour une recherche complète. Vous pouvez également inclure des champs structurés pour une recherche à facettes, ou une combinaison des deux.

Python

    def convert_posts_to_documents(posts: List[dict]) -> List[discoveryengine.Document]:
        # Convert WP posts into Discovery Engine Document messages.
        docs: List[discoveryengine.Document] = []
        for post in posts:
            payload = {
                "title": post.get("title", {}).get("rendered"),
                "body": post.get("content", {}).get("rendered"),
                "url": post.get("link"),
                "author": post.get("author"),
                "categories": post.get("categories"),
                "tags": post.get("tags"),
                "date": post.get("date"),
            }
            doc = discoveryengine.Document(
                id=str(post["id"]),
                json_data=json.dumps(payload),
            )
            docs.append(doc)
        return docs

Récupérer ou créer un magasin d'identités

Pour gérer les identités et les groupes d'utilisateurs pour le contrôle des accès, vous devez récupérer ou créer un magasin d'identités. Cette fonction récupère un magasin d'identités existant par son ID, son projet et son emplacement. Si le magasin d'identités n'existe pas, il en crée un vide et le renvoie.

Python

    def get_or_create_ims_data_store(
        project_id: str,
        location: str,
        identity_mapping_store_id: str,
    ) -> discoveryengine.DataStore:
      """Get or create a DataStore."""
      # Initialize the client
      client_ims = discoveryengine.IdentityMappingStoreServiceClient()
      # Construct the parent resource name
      parent_ims = client_ims.location_path(project=project_id, location=location)

      try:
        # Create the request object
        name = f"projects/{project_id}/locations/{location}/identityMappingStores/{identity_mapping_store_id}"
        request = discoveryengine.GetIdentityMappingStoreRequest(
            name=name,
        )
        return client_ims.get_identity_mapping_store(request=request)
      except:
        # Create the IdentityMappingStore object (it can be empty for basic creation)
        identity_mapping_store = discoveryengine.IdentityMappingStore()
        # Create the request object
        request = discoveryengine.CreateIdentityMappingStoreRequest(
            parent=parent_ims,
            identity_mapping_store=identity_mapping_store,
            identity_mapping_store_id=identity_mapping_store_id,
        )
        return client_ims.create_identity_mapping_store(request=request)

La fonction get_or_create_ims_data_store utilise les variables clés suivantes :

  • project_id : ID de votre projet Google Cloud .
  • location : emplacement Google Cloud du magasin de mappage des identités.
  • identity_mapping_store_id : identifiant unique du magasin d'identités.
  • client_ims : instance de discoveryengine.IdentityMappingStoreServiceClient utilisée pour interagir avec l'API Identity Store.
  • parent_ims : nom de ressource de l'emplacement parent, construit à l'aide de client_ims.location_path.
  • name : nom complet de la ressource du magasin de mappage des identités, utilisé pour GetIdentityMappingStoreRequest.

Ingérer le mappage d'identité dans le magasin d'identités

Utilisez cette fonction pour charger des entrées de mappage d'identité dans le magasin d'identités spécifié. Elle prend une liste d'entrées de mappage d'identité et lance une opération d'importation intégrée. Cette étape est essentielle pour établir les relations entre les utilisateurs, les groupes et les identités externes nécessaires au contrôle des accès et à la personnalisation.

Python

def load_ims_data(
    ims_store: discoveryengine.DataStore,
    id_mapping_data: list[discoveryengine.IdentityMappingEntry],
) -> discoveryengine.DataStore:
  """Get the IMS data store."""
  # Initialize the client
  client_ims = discoveryengine.IdentityMappingStoreServiceClient()

  #  Create the InlineSource object
  inline_source = discoveryengine.ImportIdentityMappingsRequest.InlineSource(
      identity_mapping_entries=id_mapping_data
  )

  # Create the main request object
  request_ims = discoveryengine.ImportIdentityMappingsRequest(
      identity_mapping_store=ims_store.name,
      inline_source=inline_source,
  )

  try:
    # Create the InlineSource object, which holds your list of entries
    operation = client_ims.import_identity_mappings(
        request=request_ims,
    )
    result = operation.result()
    return result

  except Exception as e:
    print(f"IMS Load Error: {e}")
    result = operation.result()
    return result

La fonction load_ims_data utilise les variables clés suivantes :

  • ims_store : objet discoveryengine.DataStore représentant le magasin de mappage des identités dans lequel les données seront chargées.
  • id_mapping_data : liste d'objets discoveryengine.IdentityMappingEntry, chacun contenant une identité externe et l'ID utilisateur ou de groupe correspondant.
  • result : valeur renvoyée de type discoveryengine.DataStore.

Créer un datastore

Pour utiliser un connecteur personnalisé, vous devez initialiser un data store pour votre contenu. Utilisez default_collection pour les connecteurs personnalisés. Le paramètre IndustryVertical permet de personnaliser le comportement du data store pour des cas d'utilisation spécifiques. GENERIC convient à la plupart des scénarios. Toutefois, vous pouvez choisir une autre valeur pour un secteur spécifique, par exemple MEDIA ou HEALTHCARE_FHIR. Configurez le nom à afficher et d'autres propriétés pour les aligner sur les conventions de dénomination et les exigences de votre projet.

Python

def get_or_create_data_store(
    project_id: str,
    location: str,
    display_name: str,
    data_store_id: str,
    identity_mapping_store: str,
) -> discoveryengine.DataStore:
  """Get or create a DataStore."""
  client = discoveryengine.DataStoreServiceClient()
  ds_name = client.data_store_path(project_id, location, data_store_id)
  try:
    result = client.get_data_store(request={"name": ds_name})
    return result
  except:
    parent = client.collection_path(project_id, location, "default_collection")
    operation = client.create_data_store(
        request={
            "parent": parent,
            "data_store": discoveryengine.DataStore(
                display_name=display_name,
                acl_enabled=True,
                industry_vertical=discoveryengine.IndustryVertical.GENERIC,
                identity_mapping_store=identity_mapping_store,
            ),
            "data_store_id": data_store_id,
        }
    )
    result = operation.result()
    return result

La fonction get_or_create_data_store utilise les variables clés suivantes :

  • project_id : ID de votre projet Google Cloud .
  • location : emplacement Google Cloud du data store.
  • display_name : nom à afficher lisible pour le data store.
  • data_store_id : identifiant unique du data store.
  • identity_mapping_store : nom de ressource du magasin de mappage d'identité à lier.
  • result : valeur renvoyée de type discoveryengine.DataStore.

Importer des documents intégrés

Pour envoyer directement des documents à Discovery Engine, utilisez l'importation intégrée. Cette méthode utilise le mode de réconciliation incrémentielle par défaut et n'est pas compatible avec le mode de réconciliation complète. En mode incrémentiel, de nouveaux documents sont ajoutés et ceux existants sont mis à jour, mais les documents qui ne figurent plus dans la source ne sont pas supprimés. Le mode de réconciliation complète synchronise le data store avec vos données sources, y compris en supprimant les documents qui ne sont plus présents dans la source.

La réconciliation incrémentielle est idéale pour les systèmes tels que les CRM qui gèrent des modifications fréquentes et mineures des données. Au lieu de synchroniser l'intégralité de la base de données, n'envoyez que les modifications spécifiques, ce qui rend le processus plus rapide et plus efficace. Une synchronisation complète peut toujours être effectuée périodiquement pour maintenir l'intégrité globale des données.

Python

    def upload_documents_inline(
        project_id: str,
        location: str,
        data_store_id: str,
        branch_id: str,
        documents: List[discoveryengine.Document],
    ) -> discoveryengine.ImportDocumentsMetadata:
        """Inline import of Document messages."""
        client = discoveryengine.DocumentServiceClient()
        parent = client.branch_path(
            project=project_id,
            location=location,
            data_store=data_store_id,
            branch=branch_id,
        )
        request = discoveryengine.ImportDocumentsRequest(
            parent=parent,
            inline_source=discoveryengine.ImportDocumentsRequest.InlineSource(
                documents=documents,
            ),
        )
        operation = client.import_documents(request=request)
        operation.result()
        result = operation.metadata
        return result

La fonction upload_documents_inline utilise les variables clés suivantes :

  • project_id : ID de votre projet Google Cloud .
  • location : emplacement Google Cloud du data store.
  • data_store_id : ID du data store.
  • branch_id : ID de la branche dans le data store (généralement "0").
  • documents : liste d'objets discoveryengine.Document à importer.
  • result : valeur renvoyée de type discoveryengine.ImportDocumentsMetadata.

Valider votre connecteur

Pour vérifier que votre connecteur fonctionne comme prévu, effectuez un test pour vous assurer que les données circulent correctement de la source à Discovery Engine.

Python

    SITE = "https://altostrat.com"
    PROJECT_ID = "ucs-3p-connectors-testing"
    LOCATION = "global"
    IDENTITY_MAPPING_STORE_ID = "your-unique-ims-id17" # A unique ID for your new store
    DATA_STORE_ID = "my-acl-ds-id1"
    BRANCH_ID = "0"

    posts = fetch_posts(SITE)
    docs = convert_posts_to_documents(posts)
    print(f"Fetched {len(posts)} posts and converted to {len(docs)} documents.")

    try:
      # Step #1: Retrieve an existing identity mapping store or create a new identity mapping store
      ims_store = get_or_create_ims_data_store(PROJECT_ID, LOCATION, IDENTITY_MAPPING_STORE_ID)
      print(f"STEP #1: IMS Store Retrieval/Creation: {ims_store}")

      RAW_IDENTITY_MAPPING_DATA = [
          discoveryengine.IdentityMappingEntry(
              external_identity="external_id_1",
              user_id="testuser1@example.com",
          ),
          discoveryengine.IdentityMappingEntry(
              external_identity="external_id_2",
              user_id="testuser2@example.com",
          ),
          discoveryengine.IdentityMappingEntry(
              external_identity="external_id_2",
              group_id="testgroup1@example.com",
          )
      ]

      # Step #2: Load IMS Data
      response = load_ims_data(ims_store, RAW_IDENTITY_MAPPING_DATA)
      print(
          "\nStep #2: Load Data in IMS Store successful.", response
      )

      # Step #3: Create Entity Data Store & Bind IMS Data Store
      data_store =  get_or_create_data_store(PROJECT_ID, LOCATION, "my-acl-datastore", DATA_STORE_ID, ims_store.name)
      print("\nStep #3: Entity Data Store Create Result: ", data_store)

      metadata = upload_documents_inline(
          PROJECT_ID, LOCATION, DATA_STORE_ID, BRANCH_ID, docs
      )
      print(f"Uploaded {metadata.success_count} documents inline.")

    except gcp_exceptions.GoogleAPICallError as e:
      print(f"\n--- API Call Failed ---")
      print(f"Server Error Message: {e.message}")
      print(f"Status Code: {e.code}")

    except Exception as e:
      print(f"An error occurred: {e}")

Validez le code de votre connecteur à l'aide des variables clés suivantes :

  • SITE : URL de base de la source de données tierce.
  • PROJECT_ID : ID de votre projet Google Cloud .
  • LOCATION : emplacement Google Cloud des ressources.
  • IDENTITY_MAPPING_STORE_ID : ID unique de votre magasin de mappage des identités.
  • DATA_STORE_ID : ID unique de votre data store.
  • BRANCH_ID : ID de la branche dans le data store.
  • posts : stocke les posts récupérés à partir de la source tierce.
  • docs : stocke les documents convertis au format discoveryengine.Document.
  • ims_store : objet discoveryengine.DataStore récupéré ou créé pour le mappage des identités.
  • RAW_IDENTITY_MAPPING_DATA : liste d'objets discoveryengine.IdentityMappingEntry.

Résultat attendu :

Shell

  Fetched 20 posts and converted to 20 documents.
  STEP #1: IMS Store Retrieval/Creation: "projects/ <Project Number>/locations/global/identityMappingStores/your-unique-ims-id17"
  Step #2: Load Data in IMS Store successful.
  Step #3: Entity Data Store Create Result: "projects/ <Project Number>/locations/global/collections/default_collection/dataStores/my-acl-ds-id1"
  display_name: "my-acl-datastore"
  industry_vertical: GENERIC
  create_time {
    seconds: 1760906997
    nanos: 192641000
  }
  default_schema_id: "default_schema"
  acl_enabled: true
  identity_mapping_store: "projects/ <Project Number>/locations/global/identityMappingStores/your-unique-ims-id17".
  Uploaded 20 documents inline.

Vous pouvez également voir votre data store dans la console Google Google Cloud à ce stade :

Data store de connecteur personnalisé
data store de connecteur personnalisé.

Créer un connecteur avec l'option Google Cloud Importer depuis le stockage

Bien que l'importation intégrée fonctionne bien pour le développement, les connecteurs de production doivent utiliser Google Cloud Storage pour une meilleure évolutivité et pour activer le mode de réconciliation complète. Cette approche gère efficacement les grands ensembles de données et permet la suppression automatique des documents qui ne sont plus présents dans la source de données tierce.

Convertir des documents au format JSONL

Pour préparer des documents à l'importation groupée dans Discovery Engine, convertissez-les au format JSON Lines.

Python

    def convert_documents_to_jsonl(
        documents: List[discoveryengine.Document],
    ) -> str:
        """Serialize Document messages to JSONL."""
        return "\n".join(
            discoveryengine.Document.to_json(doc, indent=None)
            for doc in documents
        ) + "\n"

La fonction convert_documents_to_jsonl utilise la variable suivante :

  • documents : liste des objets discoveryengine.Document à convertir.

Importer dans Google Cloud  Storage

Pour activer l'importation groupée efficace, préparez vos données dans Google Cloud Storage.

Python

    def upload_jsonl_to_gcs(jsonl: str, bucket_name: str, blob_name: str) -> str:
        """Upload JSONL content to Google Cloud Storage."""
        client = storage.Client()
        bucket = client.bucket(bucket_name)
        blob = bucket.blob(blob_name)
        blob.upload_from_string(jsonl, content_type="application/json")
        return f"gs://{bucket_name}/{blob_name}"

La fonction upload_jsonl_to_gcs utilise les variables clés suivantes :

  • jsonl : contenu de la chaîne au format JSONL à importer.
  • bucket_name : nom du bucket de stockage Google Cloud .
  • blob_name : nom du blob (objet) dans le bucket spécifié.

Importer depuis Google Cloud  Storage avec réconciliation complète

Utilisez cette méthode pour effectuer une synchronisation complète des données à l'aide du mode de réconciliation complète. Cela garantit que votre data store reflète exactement la source de données tierce, en supprimant automatiquement tous les documents qui n'existent plus.

Python

    def import_documents_from_gcs(
        project_id: str,
        location: str,
        data_store_id: str,
        branch_id: str,
        gcs_uri: str,
    ) -> discoveryengine.ImportDocumentsMetadata:
        """Bulk-import documents from Google Cloud Storage with FULL reconciliation mode."""
        client = discoveryengine.DocumentServiceClient()
        parent = client.branch_path(
            project=project_id,
            location=location,
            data_store=data_store_id,
            branch=branch_id,
        )
        gcs_source = discoveryengine.GcsSource(input_uris=[gcs_uri])
        request = discoveryengine.ImportDocumentsRequest(
            parent=parent,
            gcs_source=gcs_source,
            reconciliation_mode=
                discoveryengine.ImportDocumentsRequest
                .ReconciliationMode.FULL,
        )
        operation = client.import_documents(request=request)
        operation.result()
        return operation.metadata

La fonction import_documents_from_gcs utilise les variables clés suivantes :

  • project_id : ID de votre projet Google Cloud .
  • location : emplacement Google Cloud du data store.
  • data_store_id : ID du data store.
  • branch_id : ID de la branche dans le data store (généralement "0").
  • gcs_uri : URI Google Cloud Storage pointant vers le fichier JSONL.

Test Google Cloud Importation dans Storage

Pour vérifier le workflow d'importation basé sur le stockage Google Cloud , exécutez la commande suivante :

Python

  BUCKET = "your-existing-bucket"
  BLOB = "path-to-any-blob/wp/posts.jsonl"
  SITE = "https://altostrat.com"
  PROJECT_ID = "ucs-3p-connectors-testing"
  LOCATION = "global"
  IDENTITY_MAPPING_STORE_ID = "your-unique-ims-id17" # A unique ID for your new store
  DATA_STORE_ID = "your-data-store-id"
  BRANCH_ID = "0"
  jsonl_payload = convert_documents_to_jsonl(docs)
  gcs_uri = upload_jsonl_to_gcs(jsonl_payload, BUCKET, BLOB)
  posts = fetch_posts(SITE)
  docs = convert_posts_to_documents(posts)
  print(f"Fetched {len(posts)} posts and converted to {len(docs)} documents.")
  print("Uploaded to:", gcs_uri)

  metadata = import_documents_from_gcs(
      PROJECT_ID, LOCATION, DATA_STORE_ID, BRANCH_ID, gcs_uri
  )
  print(f"Imported: {metadata.success_count} documents")

Les variables clés suivantes sont utilisées pour tester l'importation Google Cloud Storage :

  • BUCKET : nom du bucket de stockage Google Cloud .
  • BLOB : chemin d'accès au blob dans le bucket.
  • SITE : URL de base de la source de données tierce.
  • PROJECT_ID : ID de votre projet Google Cloud .
  • LOCATION : emplacement Google Cloud des ressources (par exemple, "global").
  • IDENTITY_MAPPING_STORE_ID : ID unique de votre magasin de mappage des identités.
  • DATA_STORE_ID : ID unique de votre data store.
  • BRANCH_ID : ID de la branche dans le data store (généralement "0").
  • jsonl_payload : documents convertis au format JSONL.
  • gcs_uri : URI Google Cloud Storage du fichier JSONL importé.

Résultat attendu :

Shell

    Fetched 20 posts and converted to 20 documents.
    Uploaded to: gs://alex-de-bucket/wp/posts.jsonl
    Imported: 20 documents

Gérer les autorisations

Pour gérer l'accès au niveau des documents dans les environnements d'entreprise, Gemini Enterprise est compatible avec les listes de contrôle d'accès (LCA) et le mappage d'identité, qui permettent de limiter le contenu que les utilisateurs peuvent voir.

Activer les LCA dans le data store

Pour activer les LCA lors de la création de votre data store, exécutez la commande suivante :

Python

  # get_or_create_data_store()
  "data_store": discoveryengine.DataStore(
      display_name=data_store_id,
      industry_vertical=discoveryengine.IndustryVertical.GENERIC,
      acl_enabled=True, # ADDED
  )

Ajouter des LCA aux documents

Pour calculer et inclure AclInfo lors de la transformation des documents, exécutez la commande suivante :

Python

  # convert_posts_to_documents()
  doc = discoveryengine.Document(
      id=str(post["id"]),
      json_data=json.dumps(payload),
      acl_info=discoveryengine.Document.AclInfo(
          readers=[{
              "principals": [
                  {"user_id": "baklavainthebalkans@gmail.com"},
                  {"user_id": "cloudysanfrancisco@gmail.com"}
              ]
          }]
      ),
  )

Rendre du contenu public

Pour rendre un document accessible au public, définissez le champ readers comme suit :

Python

  readers=[{"idp_wide": True}]

Valider les LCA

Pour vérifier que vos configurations de LCA fonctionnent comme prévu, tenez compte des points suivants :

  • Effectuez une recherche en tant qu'utilisateur n'ayant pas accès au document.

  • Inspectez la structure du document importé dans Cloud Storage et comparez-la à une référence.

JSON

  {
    "id": "108",
    "jsonData": "{...}",
    "aclInfo": {
      "readers": [
        {
          "principals": [
            { "userId": "baklavainthebalkans@gmail.com" },
            { "userId": "cloudysanfrancisco@gmail.com" }
          ],
          "idpWide": false
        }
      ]
    }
  }

Utiliser le mappage d'identité

Utilisez le mappage d'identité dans les cas suivants :

  • Votre source de données tierce utilise des identités non Google

  • Vous souhaitez faire référence à des groupes personnalisés (par exemple, "administrateurs WP") plutôt qu'à des utilisateurs individuels.

  • Votre API ne renvoie que les noms de groupes

  • Vous devez regrouper manuellement les utilisateurs pour assurer l'évolutivité ou la cohérence.

Pour effectuer le mappage des identités, procédez comme suit :

  1. Créez et associez le data store d'identité.
  2. Importez des identités externes (par exemple, external_group:wp-admins). N'incluez pas le préfixe external_group: lors de l'importation. Par exemple :

    JSON

      {
        "externalIdentity": "wp-admins",
        "userId": "user@example.com"
      }
    
  3. Dans les informations sur la LCA de votre document, définissez l'ID d'entité externe dans principal identifier. Lorsque vous faites référence à des groupes personnalisés, utilisez le préfixe external_group: dans le champ groupId.

  4. Le préfixe external_group: est obligatoire pour les ID de groupe dans les informations LCA du document lors de l'importation, mais il n'est pas utilisé lors de l'importation d'identités dans le magasin de mappage. Exemple de document avec mappage d'identité :

    JSON

      {
        "id": "108",
        "aclInfo": {
          "readers": [
            {
              "principals": [
                {
                  "userId": "cloudysanfrancisco@gmail.com"
                },
                {
                  "groupId": "external_group:wp-admins"
                }
              ]
            }
          ]
        },
        "structData": {
          "id": 108,
          "date": "2025-04-24T18:16:04",
          ...
        }
      }
    

Étapes suivantes

Bibliothèques clientes Gemini Enterprise