Use o Vertex AI RAG Engine na API Gemini Live

A geração aumentada por obtenção (RAG) é uma técnica usada para obter e fornecer informações relevantes aos GMLs para gerar respostas verificáveis. As informações podem incluir informações atualizadas, um tópico e contexto ou factos reais.

Esta página mostra como usar o motor RAG do Vertex AI com a API Gemini Live, que lhe permite especificar e obter informações do corpus RAG.

Pré-requisitos

Os seguintes pré-requisitos têm de ser concluídos antes de poder usar o motor RAG do Vertex AI com a API Live multimodal:

  1. Ative a API RAG no Vertex AI.

  2. Crie o exemplo de corpus RAG.

  3. Para carregar ficheiros para o corpus RAG, consulte o exemplo de API de importação de ficheiros RAG.

Configurar

Pode usar o Vertex AI RAG Engine com a API Live especificando o Vertex AI RAG Engine como uma ferramenta. O exemplo de código seguinte demonstra como especificar o motor RAG do Vertex AI como uma ferramenta:

Substitua as seguintes variáveis:

  • YOUR_PROJECT_ID: o ID do seu projeto do Google Cloud .
  • YOUR_CORPUS_ID: o ID do seu corpus.
  • YOUR_LOCATION: a região para processar o pedido.
PROJECT_ID = "YOUR_PROJECT_ID"
RAG_CORPUS_ID = "YOUR_CORPUS_ID"
LOCATION = "YOUR_LOCATION"

TOOLS = {
  "retrieval": {
    "vertex_rag_store": {
        "rag_resources": {
        "rag_corpus": "projects/${PROJECT_ID}/locations/${LOCATION}/ragCorpora/${RAG_CORPUS_ID}"
      }
    }
  }
}

Use o Websocket para comunicação em tempo real

Para ativar a comunicação em tempo real entre um cliente e um servidor, tem de usar um Websocket. Estes exemplos de código demonstram como usar um Websocket usando a API Python e o SDK Python.

API do Python

CONFIG = {"response_modalities": ["TEXT"], "speech_config": { "language_code": "en-US" }}
headers = {
  "Content-Type": "application/json",
  "Authorization": f"Bearer {bearer_token[0]}",
}
HOST= "${LOCATION}-aiplatform.googleapis.com"
SERVICE_URL = f"wss://{HOST}/ws/google.cloud.aiplatform.v1beta1.LlmBidiService/BidiGenerateContent"
MODEL="gemini-2.0-flash-exp"

# Connect to the server
async with connect(SERVICE_URL, additional_headers=headers) as ws:
  # Setup the session
  await ws.send(
json.dumps(
          {
              "setup": {
                  "model": MODEL,
                  "generation_config": CONFIG,
                  # Setup RAG as a retrieval tool
                  "tools": TOOLS,
              }
          }
      )
  )

  # Receive setup response
  raw_response = await ws.recv(decode=False)
  setup_response = json.loads(raw_response.decode("ascii"))

  # Send text message
  text_input = "What are popular LLMs?"
  display(Markdown(f"**Input:** {text_input}"))

  msg = {
      "client_content": {
          "turns": [{"role": "user", "parts": [{"text": text_input}]}],
          "turn_complete": True,
      }
  }

  await ws.send(json.dumps(msg))

  responses = []

  # Receive chunks of server response
  async for raw_response in ws:
      response = json.loads(raw_response.decode())
      server_content = response.pop("serverContent", None)
      if server_content is None:
          break

      model_turn = server_content.pop("modelTurn", None)
      if model_turn is not None:
          parts = model_turn.pop("parts", None)
          if parts is not None:
              display(Markdown(f"**parts >** {parts}"))
              responses.append(parts[0]["text"])

      # End of turn
      turn_complete = server_content.pop("turnComplete", None)
      if turn_complete:
          grounding_metadata = server_content.pop("groundingMetadata", None)
          if grounding_metadata is not None:
            grounding_chunks = grounding_metadata.pop("groundingChunks", None)
            if grounding_chunks is not None:
              for chunk in grounding_chunks:
                display(Markdown(f"**grounding_chunk >** {chunk}"))
          break

  # Print the server response
  display(Markdown(f"**Response >** {''.join(responses)}"))

SDK Python

Para saber como instalar o SDK de IA generativa, consulte o artigo Instale uma biblioteca:

from google import genai
from google.genai import types
from google.genai.types import (Content, LiveConnectConfig, HttpOptions, Modality, Part,)
from IPython import display

MODEL="gemini-2.0-flash-exp"

client = genai.Client(
  vertexai=True,
  project=PROJECT_ID,
  location=LOCATION
)

async with client.aio.live.connect(
  model=MODEL,
  config=LiveConnectConfig(response_modalities=[Modality.TEXT],
                            tools=TOOLS),
) as session:
  text_input = "\'What are core LLM techniques?\'"
  print("> ", text_input, "\n")
  await session.send_client_content(
      turns=Content(role="user", parts=[Part(text=text_input)])
  )

  async for message in session.receive()
      if message.text:
          display.display(display.Markdown(message.text))
          continue

Use o Vertex AI RAG Engine como o armazenamento de contexto

Pode usar o motor RAG do Vertex AI como o armazenamento de contexto para a API Gemini Live para armazenar o contexto da sessão de modo a formar e obter contextos anteriores relacionados com a sua conversa e enriquecer o contexto atual para a geração de modelos. Também pode tirar partido desta funcionalidade para partilhar contextos nas suas diferentes sessões da API Live.

O Vertex AI RAG Engine suporta o armazenamento e a indexação das seguintes formas de dados de contextos de sessão:

  • Texto
  • Áudio de voz

Crie um corpus do tipo MemoryCorpus

Para armazenar e indexar textos de conversas a partir do contexto da sessão, tem de criar um corpus RAG do tipo MemoryCorpus. Também tem de especificar um analisador sintático de MDIs na configuração do corpus de memória que é usado para analisar os contextos das sessões armazenados a partir da API Live para criar memória para indexação.

Este exemplo de código demonstra como criar um corpus. No entanto, substitua primeiro as variáveis por valores.

# Currently supports Google first-party embedding models
EMBEDDING_MODEL = YOUR_EMBEDDING_MODEL  # Such as "publishers/google/models/text-embedding-005"
MEMORY_CORPUS_DISPLAY_NAME = YOUR_MEMORY_CORPUS_DISPLAY_NAME
LLM_PARSER_MODEL_NAME = YOUR_LLM_PARSER_MODEL_NAME  # Such as "projects/{project_id}/locations/{location}/publishers/google/models/gemini-2.5-pro-preview-05-06"

memory_corpus = rag.create_corpus(
   display_name=MEMORY_CORPUS_DISPLAY_NAME,
   corpus_type_config=rag.RagCorpusTypeConfig(
       corpus_type_config=rag.MemoryCorpus(
           llm_parser=rag.LlmParserConfig(
               model_name=LLM_PARSER_MODEL_NAME,
           )
       )
   ),
   backend_config=rag.RagVectorDbConfig(
       rag_embedding_model_config=rag.RagEmbeddingModelConfig(
           vertex_prediction_endpoint=rag.VertexPredictionEndpoint(
               publisher_model=EMBEDDING_MODEL
           )
       )
   ),
)

Especifique o seu corpus de memória para armazenar contextos

Quando usar o seu corpus de memória com a API Live, tem de especificar o corpus de memória como uma ferramenta de obtenção e, em seguida, definir store_context como true para permitir que a API Live armazene os contextos da sessão.

Este exemplo de código demonstra como especificar o seu corpus de memória para armazenar contextos. No entanto, primeiro, substitua as variáveis por valores.

from google import genai
from google.genai import types
from google.genai.types import (Content, LiveConnectConfig, HttpOptions, Modality, Part)
from IPython import display

PROJECT_ID=YOUR_PROJECT_ID
LOCATION=YOUR_LOCATION
TEXT_INPUT=YOUR_TEXT_INPUT
MODEL_NAME=YOUR_MODEL_NAME  # Such as "gemini-2.0-flash-exp"

client = genai.Client(
   vertexai=True,
   project=PROJECT_ID,
   location=LOCATION,
)

memory_store=types.VertexRagStore(
   rag_resources=[
       types.VertexRagStoreRagResource(
           rag_corpus=memory_corpus.name
       )
   ],
   store_context=True
)

async with client.aio.live.connect(
   model=MODEL_NAME,
   config=LiveConnectConfig(response_modalities=[Modality.TEXT],
                            tools=[types.Tool(
                                retrieval=types.Retrieval(
                                    vertex_rag_store=memory_store))]),
) as session:
   text_input=TEXT_INPUT
   await session.send_client_content(
       turns=Content(role="user", parts=[Part(text=text_input)])
   )

   async for message in session.receive():
       if message.text:
           display.display(display.Markdown(message.text))
           continue

O que se segue?