Cette page explique comment créer un connecteur personnalisé.
Avant de commencer
Avant de commencer, assurez-vous de disposer des éléments suivants :
Vérifiez si la facturation est activée pour votre projet Google Cloud .
Installez et initialisez la CLI Google Cloud . Assurez-vous qu'il est authentifié pour votre projet.
Obtenez un accès administrateur Discovery Engine pour votre projet Google Cloud .
Obtenez des identifiants d'accès pour votre source de données tierce (tels que des clés API ou une authentification de base de données).
Créez un plan de mappage des données clair. Cela doit inclure les champs à indexer et la manière de représenter le contrôle des accès, y compris les identités tierces.
Créer un connecteur de base
Cette section explique comment créer un connecteur personnalisé dans la langue de votre choix. Les principes et les modèles présentés ici s'appliquent à tous les systèmes externes. Pour créer un connecteur de base, il vous suffit d'adapter les appels d'API et les transformations de données pour votre source spécifique dans la langue de votre choix.
Récupérer des données
Pour commencer, récupérez les données de votre source de données tierce. Dans cet exemple, nous allons vous montrer comment récupérer des posts à l'aide de la pagination. Pour les environnements de production, nous vous recommandons d'utiliser une approche de streaming pour les grands ensembles de données. Cela évite les problèmes de mémoire qui peuvent survenir lors du chargement de toutes les données en même temps.
Python
def fetch_posts(base_url: str, per_page: int = 15) -> List[dict]:
#Fetch all posts from the given site.#
url = base_url.rstrip("/") + "/wp-json/wp/v2/posts"
posts: List[dict] = []
page = 1
while True:
resp = requests.get(
url,
params={"page": page, "per_page": per_page},
)
resp.raise_for_status()
batch = resp.json()
posts.extend(batch)
if len(batch) < per_page:
break
page += 1
return posts
Transformer les données
Pour convertir vos données sources au format de document Discovery Engine, structurez-les comme indiqué dans l'exemple de charge utile suivant. Vous pouvez inclure autant de paires clé/valeur que nécessaire. Par exemple, vous pouvez inclure l'intégralité du contenu pour une recherche complète. Vous pouvez également inclure des champs structurés pour une recherche à facettes, ou une combinaison des deux.
Python
def convert_posts_to_documents(posts: List[dict]) -> List[discoveryengine.Document]:
# Convert WP posts into Discovery Engine Document messages.
docs: List[discoveryengine.Document] = []
for post in posts:
payload = {
"title": post.get("title", {}).get("rendered"),
"body": post.get("content", {}).get("rendered"),
"url": post.get("link"),
"author": post.get("author"),
"categories": post.get("categories"),
"tags": post.get("tags"),
"date": post.get("date"),
}
doc = discoveryengine.Document(
id=str(post["id"]),
json_data=json.dumps(payload),
)
docs.append(doc)
return docs
Récupérer ou créer un magasin d'identités
Pour gérer les identités et les groupes d'utilisateurs pour le contrôle des accès, vous devez récupérer ou créer un magasin d'identités. Cette fonction récupère un magasin d'identités existant par son ID, son projet et son emplacement. Si le magasin d'identités n'existe pas, il en crée un vide et le renvoie.
Python
def get_or_create_ims_data_store(
project_id: str,
location: str,
identity_mapping_store_id: str,
) -> discoveryengine.DataStore:
"""Get or create a DataStore."""
# Initialize the client
client_ims = discoveryengine.IdentityMappingStoreServiceClient()
# Construct the parent resource name
parent_ims = client_ims.location_path(project=project_id, location=location)
try:
# Create the request object
name = f"projects/{project_id}/locations/{location}/identityMappingStores/{identity_mapping_store_id}"
request = discoveryengine.GetIdentityMappingStoreRequest(
name=name,
)
return client_ims.get_identity_mapping_store(request=request)
except:
# Create the IdentityMappingStore object (it can be empty for basic creation)
identity_mapping_store = discoveryengine.IdentityMappingStore()
# Create the request object
request = discoveryengine.CreateIdentityMappingStoreRequest(
parent=parent_ims,
identity_mapping_store=identity_mapping_store,
identity_mapping_store_id=identity_mapping_store_id,
)
return client_ims.create_identity_mapping_store(request=request)
La fonction get_or_create_ims_data_store utilise les variables clés suivantes :
project_id: ID de votre projet Google Cloud .location: emplacement Google Cloud du magasin de mappage des identités.identity_mapping_store_id: identifiant unique du magasin d'identités.client_ims: instance de discoveryengine.IdentityMappingStoreServiceClient utilisée pour interagir avec l'API Identity Store.parent_ims: nom de ressource de l'emplacement parent, construit à l'aide de client_ims.location_path.name: nom complet de la ressource du magasin de mappage des identités, utilisé pour GetIdentityMappingStoreRequest.
Ingérer le mappage d'identité dans le magasin d'identités
Utilisez cette fonction pour charger des entrées de mappage d'identité dans le magasin d'identités spécifié. Elle prend une liste d'entrées de mappage d'identité et lance une opération d'importation intégrée. Cette étape est essentielle pour établir les relations entre les utilisateurs, les groupes et les identités externes nécessaires au contrôle des accès et à la personnalisation.
Python
def load_ims_data(
ims_store: discoveryengine.DataStore,
id_mapping_data: list[discoveryengine.IdentityMappingEntry],
) -> discoveryengine.DataStore:
"""Get the IMS data store."""
# Initialize the client
client_ims = discoveryengine.IdentityMappingStoreServiceClient()
# Create the InlineSource object
inline_source = discoveryengine.ImportIdentityMappingsRequest.InlineSource(
identity_mapping_entries=id_mapping_data
)
# Create the main request object
request_ims = discoveryengine.ImportIdentityMappingsRequest(
identity_mapping_store=ims_store.name,
inline_source=inline_source,
)
try:
# Create the InlineSource object, which holds your list of entries
operation = client_ims.import_identity_mappings(
request=request_ims,
)
result = operation.result()
return result
except Exception as e:
print(f"IMS Load Error: {e}")
result = operation.result()
return result
La fonction load_ims_data utilise les variables clés suivantes :
ims_store: objet discoveryengine.DataStore représentant le magasin de mappage des identités dans lequel les données seront chargées.id_mapping_data: liste d'objets discoveryengine.IdentityMappingEntry, chacun contenant une identité externe et l'ID utilisateur ou de groupe correspondant.result: valeur renvoyée de type discoveryengine.DataStore.
Créer un datastore
Pour utiliser un connecteur personnalisé, vous devez initialiser un data store pour votre contenu. Utilisez default_collection pour les connecteurs personnalisés. Le paramètre IndustryVertical permet de personnaliser le comportement du data store pour des cas d'utilisation spécifiques. GENERIC convient à la plupart des scénarios. Toutefois, vous pouvez choisir une autre valeur pour un secteur spécifique, par exemple MEDIA ou HEALTHCARE_FHIR. Configurez le nom à afficher et d'autres propriétés pour les aligner sur les conventions de dénomination et les exigences de votre projet.
Python
def get_or_create_data_store(
project_id: str,
location: str,
display_name: str,
data_store_id: str,
identity_mapping_store: str,
) -> discoveryengine.DataStore:
"""Get or create a DataStore."""
client = discoveryengine.DataStoreServiceClient()
ds_name = client.data_store_path(project_id, location, data_store_id)
try:
result = client.get_data_store(request={"name": ds_name})
return result
except:
parent = client.collection_path(project_id, location, "default_collection")
operation = client.create_data_store(
request={
"parent": parent,
"data_store": discoveryengine.DataStore(
display_name=display_name,
acl_enabled=True,
industry_vertical=discoveryengine.IndustryVertical.GENERIC,
identity_mapping_store=identity_mapping_store,
),
"data_store_id": data_store_id,
}
)
result = operation.result()
return result
La fonction get_or_create_data_store utilise les variables clés suivantes :
project_id: ID de votre projet Google Cloud .location: emplacement Google Cloud du data store.display_name: nom à afficher lisible pour le data store.data_store_id: identifiant unique du data store.identity_mapping_store: nom de ressource du magasin de mappage d'identité à lier.result: valeur renvoyée de type discoveryengine.DataStore.
Importer des documents intégrés
Pour envoyer directement des documents à Discovery Engine, utilisez l'importation intégrée. Cette méthode utilise le mode de réconciliation incrémentielle par défaut et n'est pas compatible avec le mode de réconciliation complète. En mode incrémentiel, de nouveaux documents sont ajoutés et ceux existants sont mis à jour, mais les documents qui ne figurent plus dans la source ne sont pas supprimés. Le mode de réconciliation complète synchronise le data store avec vos données sources, y compris en supprimant les documents qui ne sont plus présents dans la source.
La réconciliation incrémentielle est idéale pour les systèmes tels que les CRM qui gèrent des modifications fréquentes et mineures des données. Au lieu de synchroniser l'intégralité de la base de données, n'envoyez que les modifications spécifiques, ce qui rend le processus plus rapide et plus efficace. Une synchronisation complète peut toujours être effectuée périodiquement pour maintenir l'intégrité globale des données.
Python
def upload_documents_inline(
project_id: str,
location: str,
data_store_id: str,
branch_id: str,
documents: List[discoveryengine.Document],
) -> discoveryengine.ImportDocumentsMetadata:
"""Inline import of Document messages."""
client = discoveryengine.DocumentServiceClient()
parent = client.branch_path(
project=project_id,
location=location,
data_store=data_store_id,
branch=branch_id,
)
request = discoveryengine.ImportDocumentsRequest(
parent=parent,
inline_source=discoveryengine.ImportDocumentsRequest.InlineSource(
documents=documents,
),
)
operation = client.import_documents(request=request)
operation.result()
result = operation.metadata
return result
La fonction upload_documents_inline utilise les variables clés suivantes :
project_id: ID de votre projet Google Cloud .location: emplacement Google Cloud du data store.data_store_id: ID du data store.branch_id: ID de la branche dans le data store (généralement "0").documents: liste d'objets discoveryengine.Document à importer.result: valeur renvoyée de type discoveryengine.ImportDocumentsMetadata.
Valider votre connecteur
Pour vérifier que votre connecteur fonctionne comme prévu, effectuez un test pour vous assurer que les données circulent correctement de la source à Discovery Engine.
Python
SITE = "https://altostrat.com"
PROJECT_ID = "ucs-3p-connectors-testing"
LOCATION = "global"
IDENTITY_MAPPING_STORE_ID = "your-unique-ims-id17" # A unique ID for your new store
DATA_STORE_ID = "my-acl-ds-id1"
BRANCH_ID = "0"
posts = fetch_posts(SITE)
docs = convert_posts_to_documents(posts)
print(f"Fetched {len(posts)} posts and converted to {len(docs)} documents.")
try:
# Step #1: Retrieve an existing identity mapping store or create a new identity mapping store
ims_store = get_or_create_ims_data_store(PROJECT_ID, LOCATION, IDENTITY_MAPPING_STORE_ID)
print(f"STEP #1: IMS Store Retrieval/Creation: {ims_store}")
RAW_IDENTITY_MAPPING_DATA = [
discoveryengine.IdentityMappingEntry(
external_identity="external_id_1",
user_id="testuser1@example.com",
),
discoveryengine.IdentityMappingEntry(
external_identity="external_id_2",
user_id="testuser2@example.com",
),
discoveryengine.IdentityMappingEntry(
external_identity="external_id_2",
group_id="testgroup1@example.com",
)
]
# Step #2: Load IMS Data
response = load_ims_data(ims_store, RAW_IDENTITY_MAPPING_DATA)
print(
"\nStep #2: Load Data in IMS Store successful.", response
)
# Step #3: Create Entity Data Store & Bind IMS Data Store
data_store = get_or_create_data_store(PROJECT_ID, LOCATION, "my-acl-datastore", DATA_STORE_ID, ims_store.name)
print("\nStep #3: Entity Data Store Create Result: ", data_store)
metadata = upload_documents_inline(
PROJECT_ID, LOCATION, DATA_STORE_ID, BRANCH_ID, docs
)
print(f"Uploaded {metadata.success_count} documents inline.")
except gcp_exceptions.GoogleAPICallError as e:
print(f"\n--- API Call Failed ---")
print(f"Server Error Message: {e.message}")
print(f"Status Code: {e.code}")
except Exception as e:
print(f"An error occurred: {e}")
Validez le code de votre connecteur à l'aide des variables clés suivantes :
SITE: URL de base de la source de données tierce.PROJECT_ID: ID de votre projet Google Cloud .LOCATION: emplacement Google Cloud des ressources.IDENTITY_MAPPING_STORE_ID: ID unique de votre magasin de mappage des identités.DATA_STORE_ID: ID unique de votre data store.BRANCH_ID: ID de la branche dans le data store.posts: stocke les posts récupérés à partir de la source tierce.docs: stocke les documents convertis au format discoveryengine.Document.ims_store: objet discoveryengine.DataStore récupéré ou créé pour le mappage des identités.RAW_IDENTITY_MAPPING_DATA: liste d'objets discoveryengine.IdentityMappingEntry.
Résultat attendu :
Shell
Fetched 20 posts and converted to 20 documents.
STEP #1: IMS Store Retrieval/Creation: "projects/ <Project Number>/locations/global/identityMappingStores/your-unique-ims-id17"
Step #2: Load Data in IMS Store successful.
Step #3: Entity Data Store Create Result: "projects/ <Project Number>/locations/global/collections/default_collection/dataStores/my-acl-ds-id1"
display_name: "my-acl-datastore"
industry_vertical: GENERIC
create_time {
seconds: 1760906997
nanos: 192641000
}
default_schema_id: "default_schema"
acl_enabled: true
identity_mapping_store: "projects/ <Project Number>/locations/global/identityMappingStores/your-unique-ims-id17".
Uploaded 20 documents inline.
Vous pouvez également voir votre data store dans la console Google Google Cloud à ce stade :
Créer un connecteur avec l'option Google Cloud Importer depuis le stockage
Bien que l'importation intégrée fonctionne bien pour le développement, les connecteurs de production doivent utiliser Google Cloud Storage pour une meilleure évolutivité et pour activer le mode de réconciliation complète. Cette approche gère efficacement les grands ensembles de données et permet la suppression automatique des documents qui ne sont plus présents dans la source de données tierce.
Convertir des documents au format JSONL
Pour préparer des documents à l'importation groupée dans Discovery Engine, convertissez-les au format JSON Lines.
Python
def convert_documents_to_jsonl(
documents: List[discoveryengine.Document],
) -> str:
"""Serialize Document messages to JSONL."""
return "\n".join(
discoveryengine.Document.to_json(doc, indent=None)
for doc in documents
) + "\n"
La fonction convert_documents_to_jsonl utilise la variable suivante :
documents: liste des objets discoveryengine.Document à convertir.
Importer dans Google Cloud Storage
Pour activer l'importation groupée efficace, préparez vos données dans Google Cloud Storage.
Python
def upload_jsonl_to_gcs(jsonl: str, bucket_name: str, blob_name: str) -> str:
"""Upload JSONL content to Google Cloud Storage."""
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
blob.upload_from_string(jsonl, content_type="application/json")
return f"gs://{bucket_name}/{blob_name}"
La fonction upload_jsonl_to_gcs utilise les variables clés suivantes :
jsonl: contenu de la chaîne au format JSONL à importer.bucket_name: nom du bucket de stockage Google Cloud .blob_name: nom du blob (objet) dans le bucket spécifié.
Importer depuis Google Cloud Storage avec réconciliation complète
Utilisez cette méthode pour effectuer une synchronisation complète des données à l'aide du mode de réconciliation complète. Cela garantit que votre data store reflète exactement la source de données tierce, en supprimant automatiquement tous les documents qui n'existent plus.
Python
def import_documents_from_gcs(
project_id: str,
location: str,
data_store_id: str,
branch_id: str,
gcs_uri: str,
) -> discoveryengine.ImportDocumentsMetadata:
"""Bulk-import documents from Google Cloud Storage with FULL reconciliation mode."""
client = discoveryengine.DocumentServiceClient()
parent = client.branch_path(
project=project_id,
location=location,
data_store=data_store_id,
branch=branch_id,
)
gcs_source = discoveryengine.GcsSource(input_uris=[gcs_uri])
request = discoveryengine.ImportDocumentsRequest(
parent=parent,
gcs_source=gcs_source,
reconciliation_mode=
discoveryengine.ImportDocumentsRequest
.ReconciliationMode.FULL,
)
operation = client.import_documents(request=request)
operation.result()
return operation.metadata
La fonction import_documents_from_gcs utilise les variables clés suivantes :
project_id: ID de votre projet Google Cloud .location: emplacement Google Cloud du data store.data_store_id: ID du data store.branch_id: ID de la branche dans le data store (généralement "0").gcs_uri: URI Google Cloud Storage pointant vers le fichier JSONL.
Test Google Cloud Importation dans Storage
Pour vérifier le workflow d'importation basé sur le stockage Google Cloud , exécutez la commande suivante :
Python
BUCKET = "your-existing-bucket"
BLOB = "path-to-any-blob/wp/posts.jsonl"
SITE = "https://altostrat.com"
PROJECT_ID = "ucs-3p-connectors-testing"
LOCATION = "global"
IDENTITY_MAPPING_STORE_ID = "your-unique-ims-id17" # A unique ID for your new store
DATA_STORE_ID = "your-data-store-id"
BRANCH_ID = "0"
jsonl_payload = convert_documents_to_jsonl(docs)
gcs_uri = upload_jsonl_to_gcs(jsonl_payload, BUCKET, BLOB)
posts = fetch_posts(SITE)
docs = convert_posts_to_documents(posts)
print(f"Fetched {len(posts)} posts and converted to {len(docs)} documents.")
print("Uploaded to:", gcs_uri)
metadata = import_documents_from_gcs(
PROJECT_ID, LOCATION, DATA_STORE_ID, BRANCH_ID, gcs_uri
)
print(f"Imported: {metadata.success_count} documents")
Les variables clés suivantes sont utilisées pour tester l'importation Google Cloud Storage :
BUCKET: nom du bucket de stockage Google Cloud .BLOB: chemin d'accès au blob dans le bucket.SITE: URL de base de la source de données tierce.PROJECT_ID: ID de votre projet Google Cloud .LOCATION: emplacement Google Cloud des ressources (par exemple, "global").IDENTITY_MAPPING_STORE_ID: ID unique de votre magasin de mappage des identités.DATA_STORE_ID: ID unique de votre data store.BRANCH_ID: ID de la branche dans le data store (généralement "0").jsonl_payload: documents convertis au format JSONL.gcs_uri: URI Google Cloud Storage du fichier JSONL importé.
Résultat attendu :
Shell
Fetched 20 posts and converted to 20 documents.
Uploaded to: gs://alex-de-bucket/wp/posts.jsonl
Imported: 20 documents
Gérer les autorisations
Pour gérer l'accès au niveau des documents dans les environnements d'entreprise, Gemini Enterprise est compatible avec les listes de contrôle d'accès (LCA) et le mappage d'identité, qui permettent de limiter le contenu que les utilisateurs peuvent voir.
Activer les LCA dans le data store
Pour activer les LCA lors de la création de votre data store, exécutez la commande suivante :
Python
# get_or_create_data_store()
"data_store": discoveryengine.DataStore(
display_name=data_store_id,
industry_vertical=discoveryengine.IndustryVertical.GENERIC,
acl_enabled=True, # ADDED
)
Ajouter des LCA aux documents
Pour calculer et inclure AclInfo lors de la transformation des documents, exécutez la commande suivante :
Python
# convert_posts_to_documents()
doc = discoveryengine.Document(
id=str(post["id"]),
json_data=json.dumps(payload),
acl_info=discoveryengine.Document.AclInfo(
readers=[{
"principals": [
{"user_id": "baklavainthebalkans@gmail.com"},
{"user_id": "cloudysanfrancisco@gmail.com"}
]
}]
),
)
Rendre du contenu public
Pour rendre un document accessible au public, définissez le champ readers comme suit :
Python
readers=[{"idp_wide": True}]
Valider les LCA
Pour vérifier que vos configurations de LCA fonctionnent comme prévu, tenez compte des points suivants :
Effectuez une recherche en tant qu'utilisateur n'ayant pas accès au document.
Inspectez la structure du document importé dans Cloud Storage et comparez-la à une référence.
JSON
{
"id": "108",
"jsonData": "{...}",
"aclInfo": {
"readers": [
{
"principals": [
{ "userId": "baklavainthebalkans@gmail.com" },
{ "userId": "cloudysanfrancisco@gmail.com" }
],
"idpWide": false
}
]
}
}
Utiliser le mappage d'identité
Utilisez le mappage d'identité dans les cas suivants :
Votre source de données tierce utilise des identités non Google
Vous souhaitez faire référence à des groupes personnalisés (par exemple, "administrateurs WP") plutôt qu'à des utilisateurs individuels.
Votre API ne renvoie que les noms de groupes
Vous devez regrouper manuellement les utilisateurs pour assurer l'évolutivité ou la cohérence.
Pour effectuer le mappage des identités, procédez comme suit :
- Créez et associez le data store d'identité.
Importez des identités externes (par exemple, external_group:wp-admins). N'incluez pas le préfixe external_group: lors de l'importation. Par exemple :
JSON
{ "externalIdentity": "wp-admins", "userId": "user@example.com" }Dans les informations sur la LCA de votre document, définissez l'ID d'entité externe dans
principal identifier. Lorsque vous faites référence à des groupes personnalisés, utilisez le préfixeexternal_group:dans le champgroupId.Le préfixe
external_group:est obligatoire pour les ID de groupe dans les informations LCA du document lors de l'importation, mais il n'est pas utilisé lors de l'importation d'identités dans le magasin de mappage. Exemple de document avec mappage d'identité :JSON
{ "id": "108", "aclInfo": { "readers": [ { "principals": [ { "userId": "cloudysanfrancisco@gmail.com" }, { "groupId": "external_group:wp-admins" } ] } ] }, "structData": { "id": 108, "date": "2025-04-24T18:16:04", ... } }
Étapes suivantes
- Pour fournir une interface utilisateur permettant d'interroger vos données, créez une application dans Gemini Enterprise et associez-la au connecteur personnalisé existant du datastore.