Nesta página, descrevemos como criar um conector personalizado.
Antes de começar
Antes de começar, confira se você tem o seguinte:
Verifique se o faturamento está ativado no seu projeto do Google Cloud .
Instale e inicialize a CLI Google Cloud . Verifique se ele está autenticado para seu projeto.
Consiga acesso de administrador do Discovery Engine para seu projeto Google Cloud .
Receba credenciais de acesso para sua fonte de dados de terceiros, como chaves de API ou autenticação de banco de dados.
Crie um plano de mapeamento de dados claro. Isso precisa incluir quais campos indexar e como representar o controle de acesso, incluindo identidades de terceiros.
Criar conector básico
Esta seção demonstra como criar um conector personalizado no idioma escolhido. Os princípios e padrões mostrados aqui se aplicam a qualquer sistema externo. Basta adaptar as chamadas de API e as transformações de dados para sua fonte específica no idioma escolhido para criar um conector básico.
Buscar dados
Para começar, recupere os dados da sua fonte de dados de terceiros. Neste exemplo, demonstramos como buscar postagens usando paginação. Para ambientes de produção, recomendamos usar uma abordagem de streaming para conjuntos de dados grandes. Isso evita problemas de memória que podem ocorrer ao carregar todos os dados de uma vez.
Python
def fetch_posts(base_url: str, per_page: int = 15) -> List[dict]:
#Fetch all posts from the given site.#
url = base_url.rstrip("/") + "/wp-json/wp/v2/posts"
posts: List[dict] = []
page = 1
while True:
resp = requests.get(
url,
params={"page": page, "per_page": per_page},
)
resp.raise_for_status()
batch = resp.json()
posts.extend(batch)
if len(batch) < per_page:
break
page += 1
return posts
Transformar dados
Para converter seus dados de origem no formato de documento do Discovery Engine, estruture-os conforme mostrado no exemplo de payload a seguir. Você pode incluir quantos pares de chave-valor forem necessários. Por exemplo, você pode incluir o conteúdo completo para uma pesquisa abrangente. Você também pode incluir campos estruturados para uma pesquisa facetada ou uma combinação dos dois.
Python
def convert_posts_to_documents(posts: List[dict]) -> List[discoveryengine.Document]:
# Convert WP posts into Discovery Engine Document messages.
docs: List[discoveryengine.Document] = []
for post in posts:
payload = {
"title": post.get("title", {}).get("rendered"),
"body": post.get("content", {}).get("rendered"),
"url": post.get("link"),
"author": post.get("author"),
"categories": post.get("categories"),
"tags": post.get("tags"),
"date": post.get("date"),
}
doc = discoveryengine.Document(
id=str(post["id"]),
json_data=json.dumps(payload),
)
docs.append(doc)
return docs
Recuperar ou criar um armazenamento de identidade
Para gerenciar identidades de usuários e grupos para controle de acesso, é necessário recuperar ou criar um armazenamento de identidades. Essa função recebe um repositório de identidades existente pelo ID, projeto e local. Se a loja de identidades não existir, ela vai criar e retornar uma nova loja vazia.
Python
def get_or_create_ims_data_store(
project_id: str,
location: str,
identity_mapping_store_id: str,
) -> discoveryengine.DataStore:
"""Get or create a DataStore."""
# Initialize the client
client_ims = discoveryengine.IdentityMappingStoreServiceClient()
# Construct the parent resource name
parent_ims = client_ims.location_path(project=project_id, location=location)
try:
# Create the request object
name = f"projects/{project_id}/locations/{location}/identityMappingStores/{identity_mapping_store_id}"
request = discoveryengine.GetIdentityMappingStoreRequest(
name=name,
)
return client_ims.get_identity_mapping_store(request=request)
except:
# Create the IdentityMappingStore object (it can be empty for basic creation)
identity_mapping_store = discoveryengine.IdentityMappingStore()
# Create the request object
request = discoveryengine.CreateIdentityMappingStoreRequest(
parent=parent_ims,
identity_mapping_store=identity_mapping_store,
identity_mapping_store_id=identity_mapping_store_id,
)
return client_ims.create_identity_mapping_store(request=request)
A função get_or_create_ims_data_store usa as seguintes variáveis de chave:
project_id: o ID do seu projeto do Google Cloud .location: o Google Cloud local do repositório de mapeamento de identidade.identity_mapping_store_id: um identificador exclusivo para o armazenamento de identidades.client_ims: uma instância de discoveryengine.IdentityMappingStoreServiceClient usada para interagir com a API de armazenamento de identidades.parent_ims: o nome do recurso do local principal, construído usando client_ims.location_path.name: o nome completo do recurso do repositório de mapeamento de identidade, usado para GetIdentityMappingStoreRequest.
Ingerir mapeamento de identidade no armazenamento de identidades
Para carregar entradas de mapeamento de identidade no repositório especificado, use esta função. Ele recebe uma lista de entradas de mapeamento de identidade e inicia uma operação de importação inline. Isso é fundamental para estabelecer as relações de usuário, grupo e identidade externa necessárias para controle de acesso e personalização.
Python
def load_ims_data(
ims_store: discoveryengine.DataStore,
id_mapping_data: list[discoveryengine.IdentityMappingEntry],
) -> discoveryengine.DataStore:
"""Get the IMS data store."""
# Initialize the client
client_ims = discoveryengine.IdentityMappingStoreServiceClient()
# Create the InlineSource object
inline_source = discoveryengine.ImportIdentityMappingsRequest.InlineSource(
identity_mapping_entries=id_mapping_data
)
# Create the main request object
request_ims = discoveryengine.ImportIdentityMappingsRequest(
identity_mapping_store=ims_store.name,
inline_source=inline_source,
)
try:
# Create the InlineSource object, which holds your list of entries
operation = client_ims.import_identity_mappings(
request=request_ims,
)
result = operation.result()
return result
except Exception as e:
print(f"IMS Load Error: {e}")
result = operation.result()
return result
A função "load_ims_data" usa as seguintes variáveis principais:
ims_store: o objeto discoveryengine.DataStore que representa o repositório de mapeamento de identidade em que os dados serão carregados.id_mapping_data: uma lista de objetos "discoveryengine.IdentityMappingEntry", cada um contendo uma identidade externa e o ID de usuário ou grupo correspondente.result: valor de retorno do tipo discoveryengine.DataStore.
Criar repositório de dados
Para usar um conector personalizado, inicialize um repositório de dados para seu conteúdo. Use o default_collection para conectores personalizados. O parâmetro IndustryVertical personaliza o comportamento do repositório de dados para casos de uso específicos. O GENERIC é adequado para a maioria dos cenários. No entanto, é possível escolher um valor diferente para um setor específico, como MEDIA ou HEALTHCARE_FHIR. Configure o nome de exibição e outras propriedades para se alinhar às convenções e aos requisitos de nomenclatura do projeto.
Python
def get_or_create_data_store(
project_id: str,
location: str,
display_name: str,
data_store_id: str,
identity_mapping_store: str,
) -> discoveryengine.DataStore:
"""Get or create a DataStore."""
client = discoveryengine.DataStoreServiceClient()
ds_name = client.data_store_path(project_id, location, data_store_id)
try:
result = client.get_data_store(request={"name": ds_name})
return result
except:
parent = client.collection_path(project_id, location, "default_collection")
operation = client.create_data_store(
request={
"parent": parent,
"data_store": discoveryengine.DataStore(
display_name=display_name,
acl_enabled=True,
industry_vertical=discoveryengine.IndustryVertical.GENERIC,
identity_mapping_store=identity_mapping_store,
),
"data_store_id": data_store_id,
}
)
result = operation.result()
return result
A função "get_or_create_data_store" usa as seguintes variáveis principais:
project_id: o ID do seu projeto do Google Cloud .location: o Google Cloud local do repositório de dados.display_name: o nome de exibição legível do repositório de dados.data_store_id: um identificador exclusivo para o repositório de dados.identity_mapping_store: o nome do recurso do armazenamento de mapeamento de identidade a ser vinculado.result: valor de retorno do tipo discoveryengine.DataStore.
Fazer upload de documentos inline
Para enviar documentos diretamente ao Discovery Engine, use o upload inline. Esse método usa o modo de reconciliação incremental por padrão e não é compatível com o modo de reconciliação completa. No modo incremental, novos documentos são adicionados e os existentes são atualizados, mas os documentos que não estão mais na origem não são excluídos. O modo de reconciliação completa sincroniza o repositório de dados com os dados de origem, incluindo a exclusão de documentos que não estão mais presentes na origem.
A conciliação incremental é ideal para sistemas como um CRM que processam mudanças pequenas e frequentes nos dados. Em vez de sincronizar todo o banco de dados, envie apenas mudanças específicas, tornando o processo mais rápido e eficiente. Uma sincronização completa ainda pode ser realizada periodicamente para manter a integridade geral dos dados.
Python
def upload_documents_inline(
project_id: str,
location: str,
data_store_id: str,
branch_id: str,
documents: List[discoveryengine.Document],
) -> discoveryengine.ImportDocumentsMetadata:
"""Inline import of Document messages."""
client = discoveryengine.DocumentServiceClient()
parent = client.branch_path(
project=project_id,
location=location,
data_store=data_store_id,
branch=branch_id,
)
request = discoveryengine.ImportDocumentsRequest(
parent=parent,
inline_source=discoveryengine.ImportDocumentsRequest.InlineSource(
documents=documents,
),
)
operation = client.import_documents(request=request)
operation.result()
result = operation.metadata
return result
A função upload_documents_inline usa as seguintes variáveis principais:
project_id: o ID do seu projeto do Google Cloud .location: o Google Cloud local do repositório de dados.data_store_id: o ID do repositório de dados.branch_id: o ID da ramificação no repositório de dados (normalmente "0").documents: uma lista de objetos discoveryengine.Document a serem enviados.result: valor de retorno do tipo discoveryengine.ImportDocumentsMetadata.
Validar o conector
Para validar se o conector está funcionando como esperado, faça um teste para garantir o fluxo adequado de dados da origem para o Discovery Engine.
Python
SITE = "https://altostrat.com"
PROJECT_ID = "ucs-3p-connectors-testing"
LOCATION = "global"
IDENTITY_MAPPING_STORE_ID = "your-unique-ims-id17" # A unique ID for your new store
DATA_STORE_ID = "my-acl-ds-id1"
BRANCH_ID = "0"
posts = fetch_posts(SITE)
docs = convert_posts_to_documents(posts)
print(f"Fetched {len(posts)} posts and converted to {len(docs)} documents.")
try:
# Step #1: Retrieve an existing identity mapping store or create a new identity mapping store
ims_store = get_or_create_ims_data_store(PROJECT_ID, LOCATION, IDENTITY_MAPPING_STORE_ID)
print(f"STEP #1: IMS Store Retrieval/Creation: {ims_store}")
RAW_IDENTITY_MAPPING_DATA = [
discoveryengine.IdentityMappingEntry(
external_identity="external_id_1",
user_id="testuser1@example.com",
),
discoveryengine.IdentityMappingEntry(
external_identity="external_id_2",
user_id="testuser2@example.com",
),
discoveryengine.IdentityMappingEntry(
external_identity="external_id_2",
group_id="testgroup1@example.com",
)
]
# Step #2: Load IMS Data
response = load_ims_data(ims_store, RAW_IDENTITY_MAPPING_DATA)
print(
"\nStep #2: Load Data in IMS Store successful.", response
)
# Step #3: Create Entity Data Store & Bind IMS Data Store
data_store = get_or_create_data_store(PROJECT_ID, LOCATION, "my-acl-datastore", DATA_STORE_ID, ims_store.name)
print("\nStep #3: Entity Data Store Create Result: ", data_store)
metadata = upload_documents_inline(
PROJECT_ID, LOCATION, DATA_STORE_ID, BRANCH_ID, docs
)
print(f"Uploaded {metadata.success_count} documents inline.")
except gcp_exceptions.GoogleAPICallError as e:
print(f"\n--- API Call Failed ---")
print(f"Server Error Message: {e.message}")
print(f"Status Code: {e.code}")
except Exception as e:
print(f"An error occurred: {e}")
Valide se o código do conector usa as seguintes variáveis principais:
SITE: o URL base da fonte de dados de terceiros.PROJECT_ID: o ID do projeto do Google Cloud .LOCATION: o Google Cloud local dos recursos.IDENTITY_MAPPING_STORE_ID: um ID exclusivo para seu Identity Mapping Store.DATA_STORE_ID: um ID exclusivo para seu repositório de dados.BRANCH_ID: o ID da ramificação no repositório de dados.posts: armazena as postagens buscadas da fonte de terceiros.docs: armazena os documentos convertidos no formato discoveryengine.Document.ims_store: o objeto discoveryengine.DataStore recuperado ou criado para mapeamento de identidade.RAW_IDENTITY_MAPPING_DATA: uma lista de objetos "discoveryengine.IdentityMappingEntry".
Saída esperada:
Shell
Fetched 20 posts and converted to 20 documents.
STEP #1: IMS Store Retrieval/Creation: "projects/ <Project Number>/locations/global/identityMappingStores/your-unique-ims-id17"
Step #2: Load Data in IMS Store successful.
Step #3: Entity Data Store Create Result: "projects/ <Project Number>/locations/global/collections/default_collection/dataStores/my-acl-ds-id1"
display_name: "my-acl-datastore"
industry_vertical: GENERIC
create_time {
seconds: 1760906997
nanos: 192641000
}
default_schema_id: "default_schema"
acl_enabled: true
identity_mapping_store: "projects/ <Project Number>/locations/global/identityMappingStores/your-unique-ims-id17".
Uploaded 20 documents inline.
Você também pode conferir o repositório de dados no console do Google Google Cloud neste momento:
Criar conector com Google Cloud Upload de armazenamento
Embora a importação inline funcione bem para desenvolvimento, os conectores de produção precisam usar o Google Cloud Storage para melhorar a escalonabilidade e ativar o modo de reconciliação completa. Essa abordagem processa grandes conjuntos de dados de maneira eficiente e oferece suporte à exclusão automática de documentos que não estão mais presentes na fonte de dados de terceiros.
Converter documentos para JSONL
Para preparar documentos para importação em massa no Discovery Engine, converta-os para o formato JSON Lines.
Python
def convert_documents_to_jsonl(
documents: List[discoveryengine.Document],
) -> str:
"""Serialize Document messages to JSONL."""
return "\n".join(
discoveryengine.Document.to_json(doc, indent=None)
for doc in documents
) + "\n"
A função "convert_documents_to_jsonl" usa a seguinte variável:
documents: uma lista de objetos discoveryengine.Document a serem convertidos.
Fazer upload para o Google Cloud Storage
Para ativar a importação em massa eficiente, prepare seus dados no Google Cloud Storage.
Python
def upload_jsonl_to_gcs(jsonl: str, bucket_name: str, blob_name: str) -> str:
"""Upload JSONL content to Google Cloud Storage."""
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
blob.upload_from_string(jsonl, content_type="application/json")
return f"gs://{bucket_name}/{blob_name}"
A função "upload_jsonl_to_gcs" usa as seguintes variáveis de chave:
jsonl: o conteúdo da string formatada em JSONL a ser enviada.bucket_name: o nome do bucket de armazenamento do Google Cloud .blob_name: o nome do blob (objeto) no bucket especificado.
Importar do Google Cloud Storage com reconciliação completa
Para fazer uma sincronização completa de dados usando o modo de reconciliação completa, use este método. Isso garante que seu repositório de dados reflita exatamente a fonte de dados de terceiros, removendo automaticamente os documentos que não existem mais.
Python
def import_documents_from_gcs(
project_id: str,
location: str,
data_store_id: str,
branch_id: str,
gcs_uri: str,
) -> discoveryengine.ImportDocumentsMetadata:
"""Bulk-import documents from Google Cloud Storage with FULL reconciliation mode."""
client = discoveryengine.DocumentServiceClient()
parent = client.branch_path(
project=project_id,
location=location,
data_store=data_store_id,
branch=branch_id,
)
gcs_source = discoveryengine.GcsSource(input_uris=[gcs_uri])
request = discoveryengine.ImportDocumentsRequest(
parent=parent,
gcs_source=gcs_source,
reconciliation_mode=
discoveryengine.ImportDocumentsRequest
.ReconciliationMode.FULL,
)
operation = client.import_documents(request=request)
operation.result()
return operation.metadata
A função "import_documents_from_gcs" usa as seguintes variáveis principais:
project_id: o ID do seu projeto do Google Cloud .location: o Google Cloud local do repositório de dados.data_store_id: o ID do repositório de dados.branch_id: o ID da ramificação no repositório de dados (normalmente "0").gcs_uri: o URI do Google Cloud Storage que aponta para o arquivo JSONL.
Teste Google Cloud de upload do Storage
Para verificar o fluxo de trabalho de importação com base no Google Cloud Storage, execute o seguinte:
Python
BUCKET = "your-existing-bucket"
BLOB = "path-to-any-blob/wp/posts.jsonl"
SITE = "https://altostrat.com"
PROJECT_ID = "ucs-3p-connectors-testing"
LOCATION = "global"
IDENTITY_MAPPING_STORE_ID = "your-unique-ims-id17" # A unique ID for your new store
DATA_STORE_ID = "your-data-store-id"
BRANCH_ID = "0"
jsonl_payload = convert_documents_to_jsonl(docs)
gcs_uri = upload_jsonl_to_gcs(jsonl_payload, BUCKET, BLOB)
posts = fetch_posts(SITE)
docs = convert_posts_to_documents(posts)
print(f"Fetched {len(posts)} posts and converted to {len(docs)} documents.")
print("Uploaded to:", gcs_uri)
metadata = import_documents_from_gcs(
PROJECT_ID, LOCATION, DATA_STORE_ID, BRANCH_ID, gcs_uri
)
print(f"Imported: {metadata.success_count} documents")
As seguintes variáveis principais são usadas para testar o upload do Google Cloud Storage:
BUCKET: o nome do bucket de armazenamento do Google Cloud .BLOB: o caminho para o blob no bucket.SITE: o URL base da fonte de dados de terceiros.PROJECT_ID: o ID do projeto do Google Cloud .LOCATION: o Google Cloud local dos recursos (por exemplo, "global").IDENTITY_MAPPING_STORE_ID: um ID exclusivo para seu Identity Mapping Store.DATA_STORE_ID: um ID exclusivo para seu repositório de dados.BRANCH_ID: o ID da ramificação no repositório de dados (normalmente "0").jsonl_payload: os documentos convertidos para o formato JSONL.gcs_uri: o URI do Google Cloud Storage do arquivo JSONL enviado por upload.
Saída esperada:
Shell
Fetched 20 posts and converted to 20 documents.
Uploaded to: gs://alex-de-bucket/wp/posts.jsonl
Imported: 20 documents
Gerenciar permissões
Para gerenciar o acesso no nível do documento em ambientes empresariais, o Gemini Enterprise é compatível com listas de controle de acesso (ACLs) e mapeamento de identidade, que ajudam a limitar o conteúdo que os usuários podem acessar.
Ativar ACLs no repositório de dados
Para ativar as ACLs ao criar o repositório de dados, execute o seguinte:
Python
# get_or_create_data_store()
"data_store": discoveryengine.DataStore(
display_name=data_store_id,
industry_vertical=discoveryengine.IndustryVertical.GENERIC,
acl_enabled=True, # ADDED
)
Adicionar ACLs a documentos
Para calcular e incluir AclInfo ao transformar os documentos, execute o seguinte:
Python
# convert_posts_to_documents()
doc = discoveryengine.Document(
id=str(post["id"]),
json_data=json.dumps(payload),
acl_info=discoveryengine.Document.AclInfo(
readers=[{
"principals": [
{"user_id": "baklavainthebalkans@gmail.com"},
{"user_id": "cloudysanfrancisco@gmail.com"}
]
}]
),
)
Tornar o conteúdo público
Para tornar um documento acessível publicamente, defina o campo readers da seguinte maneira:
Python
readers=[{"idp_wide": True}]
Validar ACLs
Para validar se as configurações de ACL estão funcionando conforme o esperado, considere o seguinte:
Pesquise como um usuário que não tem acesso ao documento.
Inspecione a estrutura do documento enviado no Cloud Storage e compare com uma referência.
JSON
{
"id": "108",
"jsonData": "{...}",
"aclInfo": {
"readers": [
{
"principals": [
{ "userId": "baklavainthebalkans@gmail.com" },
{ "userId": "cloudysanfrancisco@gmail.com" }
],
"idpWide": false
}
]
}
}
Usar o mapeamento de identidade
Use o mapeamento de identidade nos seguintes cenários:
Sua fonte de dados de terceiros usa identidades que não são do Google
Você quer fazer referência a grupos personalizados (por exemplo, wp-admins) em vez de usuários individuais.
Sua API retorna apenas nomes de grupos
Você precisa agrupar usuários manualmente para escalonamento ou consistência
Para fazer o mapeamento de identidade, siga estas etapas:
- Crie e vincule o repositório de dados de identidade.
Importe identidades externas (por exemplo, external_group:wp-admins). Não inclua o prefixo external_group: ao importar. Por exemplo:
JSON
{ "externalIdentity": "wp-admins", "userId": "user@example.com" }Nas informações da ACL do documento, defina o ID da entidade externa em
principal identifier. Ao fazer referência a grupos personalizados, use o prefixoexternal_group:no campogroupId.O prefixo
external_group:é obrigatório para IDs de grupo nas informações da ACL do documento durante a importação, mas não é usado ao importar identidades para o repositório de mapeamento. Exemplo de documento com mapeamento de identidade:JSON
{ "id": "108", "aclInfo": { "readers": [ { "principals": [ { "userId": "cloudysanfrancisco@gmail.com" }, { "groupId": "external_group:wp-admins" } ] } ] }, "structData": { "id": 108, "date": "2025-04-24T18:16:04", ... } }
A seguir
- Para fornecer uma interface do usuário para consultar seus dados, crie um app no Gemini Enterprise e conecte-o ao repositório de dados do conector personalizado.