Crea un chatbot RAG con GKE e Cloud Storage

Questo tutorial mostra come integrare un'applicazione basata su un modello linguistico di grandi dimensioni (LLM) con generazione RAG (Retrieval-Augmented Generation) con file PDF che carichi in un bucket Cloud Storage.

Questa guida utilizza un database come motore di archiviazione e ricerca semantica che contiene le rappresentazioni (embedding) dei documenti caricati. Utilizzi il framework Langchain per interagire con gli incorporamenti e utilizzi i modelli Gemini disponibili tramite Vertex AI.

Langchain è un framework Python open source molto diffuso che semplifica molte attività di machine learning e dispone di interfacce per l'integrazione con diversi servizi di AI e database vettoriali.

Questo tutorial è destinato ad amministratori e architetti di piattaforme cloud, ingegneri ML e professionisti MLOps (DevOps) interessati al deployment di applicazioni LLM RAG in GKE e Cloud Storage.

Crea un cluster

Crea un cluster Qdrant, Elasticsearch o Postgres:

Qdrant

Segui le istruzioni riportate in Esegui il deployment di un database vettoriale Qdrant su GKE per creare un cluster Qdrant in esecuzione su un cluster GKE in modalità Autopilot o Standard.

Elasticsearch

Segui le istruzioni riportate in Esegui il deployment di un database vettoriale Elasticsearch su GKE per creare un cluster Elasticsearch in esecuzione su un cluster GKE in modalità Autopilot o Standard.

PGVector

Segui le istruzioni riportate in Esegui il deployment di un database vettoriale PostgreSQL su GKE per creare un cluster Postgres con PGVector in esecuzione su un cluster GKE in modalità Autopilot o standard.

Weaviate

Segui le istruzioni per eseguire il deployment di un database vettoriale Weaviate su GKE per creare un cluster Weaviate in esecuzione su un cluster GKE in modalità Autopilot o Standard.

Configura l'ambiente

Configura l'ambiente con Cloud Shell:

  1. Imposta le variabili di ambiente per il tuo progetto:

    Qdrant

    export PROJECT_ID=PROJECT_ID
    export KUBERNETES_CLUSTER_PREFIX=qdrant
    export CONTROL_PLANE_LOCATION=us-central1
    export REGION=us-central1
    export DB_NAMESPACE=qdrant
    

    Sostituisci PROJECT_ID con l'Google Cloud ID progetto.

    Elasticsearch

    export PROJECT_ID=PROJECT_ID
    export KUBERNETES_CLUSTER_PREFIX=elasticsearch
    export CONTROL_PLANE_LOCATION=us-central1
    export REGION=us-central1
    export DB_NAMESPACE=elastic
    

    Sostituisci PROJECT_ID con l'Google Cloud ID progetto.

    PGVector

    export PROJECT_ID=PROJECT_ID
    export KUBERNETES_CLUSTER_PREFIX=postgres
    export CONTROL_PLANE_LOCATION=us-central1
    export REGION=us-central1
    export DB_NAMESPACE=pg-ns
    

    Sostituisci PROJECT_ID con l'Google Cloud ID progetto.

    Weaviate

    export PROJECT_ID=PROJECT_ID
    export KUBERNETES_CLUSTER_PREFIX=weaviate
    export CONTROL_PLANE_LOCATION=us-central1
    export REGION=us-central1
    export DB_NAMESPACE=weaviate
    

    Sostituisci PROJECT_ID con l'Google Cloud ID progetto.

  2. Verifica che il cluster GKE sia in esecuzione:

    gcloud container clusters list --project=${PROJECT_ID} --location=${CONTROL_PLANE_LOCATION}
    

    L'output è simile al seguente:

    NAME                                    LOCATION        MASTER_VERSION      MASTER_IP     MACHINE_TYPE  NODE_VERSION        NUM_NODES STATUS
    [KUBERNETES_CLUSTER_PREFIX]-cluster   us-central1   1.30.1-gke.1329003  <EXTERNAL IP> e2-standard-2 1.30.1-gke.1329003   6        RUNNING
    
  3. Clona il repository del codice campione da GitHub:

    git clone https://github.com/GoogleCloudPlatform/kubernetes-engine-samples
    
  4. Vai alla directory databases:

    cd kubernetes-engine-samples/databases
    

Prepara l'infrastruttura

Crea un repository Artifact Registry, crea immagini Docker ed esegui il push delle immagini Docker in Artifact Registry:

  1. Crea un repository Artifact Registry:

    gcloud artifacts repositories create ${KUBERNETES_CLUSTER_PREFIX}-images \
        --repository-format=docker \
        --location=${REGION} \
        --description="Vector database images repository" \
        --async
    
  2. Imposta le autorizzazioni storage.objectAdmin e artifactregistry.admin sul account di servizio Compute Engine per utilizzare Cloud Build per creare ed eseguire il push delle immagini Docker per i servizi embed-docs e chatbot.

    export PROJECT_NUMBER=PROJECT_NUMBER
    
    gcloud projects add-iam-policy-binding ${PROJECT_ID}  \
    --member="serviceAccount:${PROJECT_NUMBER}-compute@developer.gserviceaccount.com" \
    --role="roles/storage.objectAdmin"
    
    gcloud projects add-iam-policy-binding ${PROJECT_ID}  \
    --member="serviceAccount:${PROJECT_NUMBER}-compute@developer.gserviceaccount.com" \
    --role="roles/artifactregistry.admin"
    

    Sostituisci PROJECT_NUMBER con il numero del tuo progettoGoogle Cloud .

  3. Crea immagini Docker per i servizi embed-docs e chatbot. L'immagine embed-docs contiene il codice Python sia per l'applicazione che riceve le richieste di inoltro di Eventarc sia per il job di incorporamento.

    Qdrant

    export DOCKER_REPO="${REGION}-docker.pkg.dev/${PROJECT_ID}/${KUBERNETES_CLUSTER_PREFIX}-images"
    gcloud builds submit qdrant/docker/chatbot --region=${REGION} \
      --tag ${DOCKER_REPO}/chatbot:1.0 --async
    gcloud builds submit qdrant/docker/embed-docs --region=${REGION} \
      --tag ${DOCKER_REPO}/embed-docs:1.0 --async
    

    Elasticsearch

    export DOCKER_REPO="${REGION}-docker.pkg.dev/${PROJECT_ID}/${KUBERNETES_CLUSTER_PREFIX}-images"
    gcloud builds submit elasticsearch/docker/chatbot --region=${REGION} \
      --tag ${DOCKER_REPO}/chatbot:1.0 --async
    gcloud builds submit elasticsearch/docker/embed-docs --region=${REGION} \
      --tag ${DOCKER_REPO}/embed-docs:1.0 --async
    

    PGVector

    export DOCKER_REPO="${REGION}-docker.pkg.dev/${PROJECT_ID}/${KUBERNETES_CLUSTER_PREFIX}-images"
    gcloud builds submit postgres-pgvector/docker/chatbot --region=${REGION} \
      --tag ${DOCKER_REPO}/chatbot:1.0 --async
    gcloud builds submit postgres-pgvector/docker/embed-docs --region=${REGION} \
      --tag ${DOCKER_REPO}/embed-docs:1.0 --async
    

    Weaviate

    export DOCKER_REPO="${REGION}-docker.pkg.dev/${PROJECT_ID}/${KUBERNETES_CLUSTER_PREFIX}-images"
    gcloud builds submit weaviate/docker/chatbot --region=${REGION} \
      --tag ${DOCKER_REPO}/chatbot:1.0 --async
    gcloud builds submit weaviate/docker/embed-docs --region=${REGION} \
      --tag ${DOCKER_REPO}/embed-docs:1.0 --async
    
  4. Verifica le immagini:

    gcloud artifacts docker images list $DOCKER_REPO \
        --project=$PROJECT_ID \
        --format="value(IMAGE)"
    

    L'output è simile al seguente:

    $REGION-docker.pkg.dev/$PROJECT_ID/${KUBERNETES_CLUSTER_PREFIX}-images/chatbot
    $REGION-docker.pkg.dev/$PROJECT_ID/${KUBERNETES_CLUSTER_PREFIX}-images/embed-docs
    
  5. Esegui il deployment di un service account Kubernetes con le autorizzazioni per eseguire i job Kubernetes:

    Qdrant

    sed "s/<PROJECT_ID>/$PROJECT_ID/;s/<CLUSTER_PREFIX>/$KUBERNETES_CLUSTER_PREFIX/" qdrant/manifests/05-rag/service-account.yaml | kubectl -n qdrant apply -f -
    

    Elasticsearch

    sed "s/<PROJECT_ID>/$PROJECT_ID/;s/<CLUSTER_PREFIX>/$KUBERNETES_CLUSTER_PREFIX/" elasticsearch/manifests/05-rag/service-account.yaml | kubectl -n elastic apply -f -
    

    PGVector

    sed "s/<PROJECT_ID>/$PROJECT_ID/;s/<CLUSTER_PREFIX>/$KUBERNETES_CLUSTER_PREFIX/" postgres-pgvector/manifests/03-rag/service-account.yaml | kubectl -n pg-ns apply -f -
    

    Weaviate

    sed "s/<PROJECT_ID>/$PROJECT_ID/;s/<CLUSTER_PREFIX>/$KUBERNETES_CLUSTER_PREFIX/" weaviate/manifests/04-rag/service-account.yaml | kubectl -n weaviate apply -f -
    
  6. Quando utilizzi Terraform per creare il cluster GKE e hai impostato create_service_account su true, verrà creato e utilizzato un account di servizio separato dal cluster e dai nodi. Concedi il ruolo artifactregistry.serviceAgent a questo account di servizio Compute Engine per consentire ai nodi di estrarre l'immagine da Artifact Registry creata per embed-docs e chatbot.

    export CLUSTER_SERVICE_ACCOUNT=$(gcloud container clusters describe ${KUBERNETES_CLUSTER_PREFIX}-cluster \
    --location=${CONTROL_PLANE_LOCATION} \
    --format="value(nodeConfig.serviceAccount)")
    
    gcloud projects add-iam-policy-binding ${PROJECT_ID}  \
    --member="serviceAccount:${CLUSTER_SERVICE_ACCOUNT}" \
    --role="roles/artifactregistry.serviceAgent"
    

    Se non concedi l'accesso al account di servizio, i tuoi nodi potrebbero riscontrare un problema di autorizzazione quando tentano di estrarre l'immagine da Artifact Registry durante il deployment dei servizi embed-docs e chatbot.

  7. Esegui il deployment di un deployment Kubernetes per i servizi embed-docs e chatbot. Un deployment è un oggetto API Kubernetes che ti consente di eseguire più repliche di pod distribuite tra i nodi di un cluster:

    Qdrant

    sed "s|<DOCKER_REPO>|$DOCKER_REPO|" qdrant/manifests/05-rag/chatbot.yaml | kubectl -n qdrant apply -f -
    sed "s|<DOCKER_REPO>|$DOCKER_REPO|" qdrant/manifests/05-rag/docs-embedder.yaml | kubectl -n qdrant apply -f -
    

    Elasticsearch

    sed "s|<DOCKER_REPO>|$DOCKER_REPO|" elasticsearch/manifests/05-rag/chatbot.yaml | kubectl -n elastic apply -f -
    sed "s|<DOCKER_REPO>|$DOCKER_REPO|" elasticsearch/manifests/05-rag/docs-embedder.yaml | kubectl -n elastic apply -f -
    

    PGVector

    sed "s|<DOCKER_REPO>|$DOCKER_REPO|" postgres-pgvector/manifests/03-rag/chatbot.yaml | kubectl -n pg-ns apply -f -
    sed "s|<DOCKER_REPO>|$DOCKER_REPO|" postgres-pgvector/manifests/03-rag/docs-embedder.yaml | kubectl -n pg-ns apply -f -
    

    Weaviate

    sed "s|<DOCKER_REPO>|$DOCKER_REPO|" weaviate/manifests/04-rag/chatbot.yaml | kubectl -n weaviate apply -f -
    sed "s|<DOCKER_REPO>|$DOCKER_REPO|" weaviate/manifests/04-rag/docs-embedder.yaml | kubectl -n weaviate apply -f -
    
  8. Abilita i trigger Eventarc per GKE:

    gcloud eventarc gke-destinations init
    

    Quando richiesto, inserisci y.

  9. Esegui il deployment del bucket Cloud Storage e crea un trigger Eventarc utilizzando Terraform:

    export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
    terraform -chdir=vector-database/terraform/cloud-storage init
    terraform -chdir=vector-database/terraform/cloud-storage apply \
      -var project_id=${PROJECT_ID} \
      -var region=${REGION} \
      -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX} \
      -var db_namespace=${DB_NAMESPACE}
    

    Quando richiesto, digita yes. Il completamento del comando potrebbe richiedere diversi minuti.

    Terraform crea le seguenti risorse:

    • Un bucket Cloud Storage in cui caricare i documenti
    • Un trigger Eventarc
    • Un Google Cloud service account denominato service_account_eventarc_name con l'autorizzazione a utilizzare Eventarc.
    • Un Google Cloud service account denominato service_account_bucket_name con l'autorizzazione a leggere il bucket e accedere ai modelli Vertex AI.

    L'output è simile al seguente:

    ... # Several lines of output omitted
    
    Apply complete! Resources: 15 added, 0 changed, 0 destroyed.
    
    ... # Several lines of output omitted
    

Caricare documenti ed eseguire query del chatbot

Carica i documenti demo ed esegui query per cercare nei documenti demo utilizzando il chatbot:

  1. Carica il documento di esempio carbon-free-energy.pdf nel bucket:

    gcloud storage cp vector-database/documents/carbon-free-energy.pdf gs://${PROJECT_ID}-${KUBERNETES_CLUSTER_PREFIX}-training-docs
    
  2. Verifica che il job di incorporamento dei documenti sia stato completato correttamente:

    kubectl get job -n ${DB_NAMESPACE}
    

    L'output è simile al seguente:

    NAME                            COMPLETIONS   DURATION   AGE
    docs-embedder1716570453361446   1/1           32s        71s
    
  3. Ottieni l'indirizzo IP esterno del bilanciatore del carico:

    export EXTERNAL_IP=$(kubectl -n ${DB_NAMESPACE} get svc chatbot --output jsonpath='{.status.loadBalancer.ingress[0].ip}')
    echo http://${EXTERNAL_IP}:80
    
  4. Apri l'indirizzo IP esterno nel browser web:

    http://EXTERNAL_IP
    

    Il chatbot risponde con un messaggio simile al seguente:

    How can I help you?
    
  5. Fare domande sui contenuti dei documenti caricati. Se il chatbot non trova nulla, risponde I don't know. Ad esempio, potresti chiedere quanto segue:

    You: Hi, what are Google plans for the future?
    

    Un esempio di output del chatbot è simile al seguente:

    Bot: Google intends to run on carbon-free energy everywhere, at all times by 2030. To achieve this, it will rely on a combination of renewable energy sources, such as wind and solar, and carbon-free technologies, such as battery storage.
    
  6. Fare al chatbot una domanda che non riguarda il contesto del documento caricato. Ad esempio, potresti chiedere quanto segue:

    You: What are Google plans to colonize Mars?
    

    Un esempio di output del chatbot è simile al seguente:

    Bot: I don't know. The provided context does not mention anything about Google's plans to colonize Mars.
    

Informazioni sul codice dell'applicazione

Questa sezione spiega come funziona il codice dell'applicazione. All'interno delle immagini Docker sono presenti tre script:

  • endpoint.py: riceve gli eventi Eventarc a ogni caricamento del documento e avvia i job Kubernetes per elaborarli.
  • embedding-job.py: scarica i documenti dal bucket, crea gli embedding e li inserisce nel database vettoriale.
  • chat.py: esegue query sui contenuti dei documenti archiviati.

Il diagramma mostra il processo di generazione delle risposte utilizzando i dati dei documenti:

Nel diagramma, l'applicazione carica un file PDF, lo suddivide in blocchi, poi in vettori e infine invia i vettori a un database vettoriale. In un secondo momento, un utente pone una domanda al chatbot. La catena RAG utilizza la ricerca semantica per cercare nel database vettoriale, quindi restituisce il contesto insieme alla domanda all'LLM. L'LLM risponde alla domanda e la memorizza nella cronologia della chat.

Informazioni su endpoint.py

Questo file elabora i messaggi di Eventarc, crea un job Kubernetes per incorporare il documento e accetta richieste da qualsiasi posizione sulla porta 5001

Qdrant

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from flask import Flask, jsonify
from flask import request
import logging
import sys,os, time
from kubernetes import client, config, utils
import kubernetes.client
from kubernetes.client.rest import ApiException


app = Flask(__name__)
@app.route('/check')
def message():
    return jsonify({"Message": "Hi there"})


@app.route('/', methods=['POST'])
def bucket():
    request_data = request.get_json()
    print(request_data)
    bckt = request_data['bucket']
    f_name = request_data['name']
    id = request_data['generation'] 
    kube_create_job(bckt, f_name, id)
    return "ok"

# Set logging
logging.basicConfig(stream=sys.stdout, level=logging.INFO)

# Setup K8 configs
config.load_incluster_config()
def kube_create_job_object(name, container_image, bucket_name, f_name, namespace="qdrant", container_name="jobcontainer", env_vars={}):

    body = client.V1Job(api_version="batch/v1", kind="Job")
    body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
    body.status = client.V1JobStatus()

    template = client.V1PodTemplate()
    template.template = client.V1PodTemplateSpec()
    env_list = [
        client.V1EnvVar(name="QDRANT_URL", value=os.getenv("QDRANT_URL")),
        client.V1EnvVar(name="COLLECTION_NAME", value="training-docs"), 
        client.V1EnvVar(name="FILE_NAME", value=f_name), 
        client.V1EnvVar(name="BUCKET_NAME", value=bucket_name),
        client.V1EnvVar(name="APIKEY", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="api-key", name="qdrant-database-apikey"))), 
    ]

    container = client.V1Container(name=container_name, image=container_image, env=env_list)
    template.template.spec = client.V1PodSpec(containers=[container], restart_policy='Never', service_account='embed-docs-sa')

    body.spec = client.V1JobSpec(backoff_limit=3, ttl_seconds_after_finished=60, template=template.template)
    return body
def kube_test_credentials():
    try: 
        api_response = api_instance.get_api_resources()
        logging.info(api_response)
    except ApiException as e:
        print("Exception when calling API: %s\n" % e)

def kube_create_job(bckt, f_name, id):
    container_image = os.getenv("JOB_IMAGE")
    namespace = os.getenv("JOB_NAMESPACE")
    name = "docs-embedder" + id
    body = kube_create_job_object(name, container_image, bckt, f_name)
    v1=client.BatchV1Api()
    try: 
        v1.create_namespaced_job(namespace, body, pretty=True)
    except ApiException as e:
        print("Exception when calling BatchV1Api->create_namespaced_job: %s\n" % e)
    return

if __name__ == '__main__':
    app.run('0.0.0.0', port=5001, debug=True)

Elasticsearch

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from flask import Flask, jsonify
from flask import request
import logging
import sys,os, time
from kubernetes import client, config, utils
import kubernetes.client
from kubernetes.client.rest import ApiException


app = Flask(__name__)
@app.route('/check')
def message():
    return jsonify({"Message": "Hi there"})


@app.route('/', methods=['POST'])
def bucket():
    request_data = request.get_json()
    print(request_data)
    bckt = request_data['bucket']
    f_name = request_data['name']
    id = request_data['generation'] 
    kube_create_job(bckt, f_name, id)
    return "ok"

# Set logging
logging.basicConfig(stream=sys.stdout, level=logging.INFO)

# Setup K8 configs
config.load_incluster_config()

def kube_create_job_object(name, container_image, bucket_name, f_name, namespace="elastic", container_name="jobcontainer", env_vars={}):

    body = client.V1Job(api_version="batch/v1", kind="Job")
    body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
    body.status = client.V1JobStatus()

    template = client.V1PodTemplate()
    template.template = client.V1PodTemplateSpec()
    env_list = [
        client.V1EnvVar(name="ES_URL", value=os.getenv("ES_URL")),
        client.V1EnvVar(name="INDEX_NAME", value="training-docs"), 
        client.V1EnvVar(name="FILE_NAME", value=f_name), 
        client.V1EnvVar(name="BUCKET_NAME", value=bucket_name),
        client.V1EnvVar(name="PASSWORD", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="elastic", name="elasticsearch-ha-es-elastic-user"))), 
    ]

    container = client.V1Container(name=container_name, image=container_image, image_pull_policy='Always', env=env_list)
    template.template.spec = client.V1PodSpec(containers=[container], restart_policy='Never', service_account='embed-docs-sa')

    body.spec = client.V1JobSpec(backoff_limit=3, ttl_seconds_after_finished=60, template=template.template)
    return body

def kube_test_credentials():
    try: 
        api_response = api_instance.get_api_resources()
        logging.info(api_response)
    except ApiException as e:
        print("Exception when calling API: %s\n" % e)

def kube_create_job(bckt, f_name, id):
    container_image = os.getenv("JOB_IMAGE")
    namespace = os.getenv("JOB_NAMESPACE")
    name = "docs-embedder" + id
    body = kube_create_job_object(name, container_image, bckt, f_name)
    v1=client.BatchV1Api()
    try: 
        v1.create_namespaced_job(namespace, body, pretty=True)
    except ApiException as e:
        print("Exception when calling BatchV1Api->create_namespaced_job: %s\n" % e)
    return

if __name__ == '__main__':
    app.run('0.0.0.0', port=5001, debug=True)

PGVector

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from flask import Flask, jsonify
from flask import request
import logging
import sys,os, time
from kubernetes import client, config, utils
import kubernetes.client
from kubernetes.client.rest import ApiException


app = Flask(__name__)
@app.route('/check')
def message():
    return jsonify({"Message": "Hi there"})


@app.route('/', methods=['POST'])
def bucket():
    request_data = request.get_json()
    print(request_data)
    bckt = request_data['bucket']
    f_name = request_data['name']
    id = request_data['generation'] 
    kube_create_job(bckt, f_name, id)
    return "ok"

# Set logging
logging.basicConfig(stream=sys.stdout, level=logging.INFO)

# Setup K8 configs
config.load_incluster_config()
def kube_create_job_object(name, container_image, bucket_name, f_name, namespace="pg-ns", container_name="jobcontainer", env_vars={}):

    body = client.V1Job(api_version="batch/v1", kind="Job")
    body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
    body.status = client.V1JobStatus()

    template = client.V1PodTemplate()
    template.template = client.V1PodTemplateSpec()
    env_list = [
        client.V1EnvVar(name="POSTGRES_HOST", value=os.getenv("POSTGRES_HOST")),
        client.V1EnvVar(name="DATABASE_NAME", value="app"), 
        client.V1EnvVar(name="COLLECTION_NAME", value="training-docs"), 
        client.V1EnvVar(name="FILE_NAME", value=f_name), 
        client.V1EnvVar(name="BUCKET_NAME", value=bucket_name),
        client.V1EnvVar(name="PASSWORD", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="password", name="gke-pg-cluster-app"))), 
        client.V1EnvVar(name="USERNAME", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="username", name="gke-pg-cluster-app"))), 
    ]

    container = client.V1Container(name=container_name, image=container_image, image_pull_policy='Always', env=env_list)
    template.template.spec = client.V1PodSpec(containers=[container], restart_policy='Never', service_account='embed-docs-sa')

    body.spec = client.V1JobSpec(backoff_limit=3, ttl_seconds_after_finished=60, template=template.template)
    return body
def kube_test_credentials():
    try: 
        api_response = api_instance.get_api_resources()
        logging.info(api_response)
    except ApiException as e:
        print("Exception when calling API: %s\n" % e)

def kube_create_job(bckt, f_name, id):
    container_image = os.getenv("JOB_IMAGE")
    namespace = os.getenv("JOB_NAMESPACE")
    name = "docs-embedder" + id
    body = kube_create_job_object(name, container_image, bckt, f_name)
    v1=client.BatchV1Api()
    try: 
        v1.create_namespaced_job(namespace, body, pretty=True)
    except ApiException as e:
        print("Exception when calling BatchV1Api->create_namespaced_job: %s\n" % e)
    return

if __name__ == '__main__':
    app.run('0.0.0.0', port=5001, debug=True)

Weaviate

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from flask import Flask, jsonify
from flask import request
import logging
import sys,os, time
from kubernetes import client, config, utils
import kubernetes.client
from kubernetes.client.rest import ApiException


app = Flask(__name__)
@app.route('/check')
def message():
    return jsonify({"Message": "Hi there"})


@app.route('/', methods=['POST'])
def bucket():
    request_data = request.get_json()
    print(request_data)
    bckt = request_data['bucket']
    f_name = request_data['name']
    id = request_data['generation'] 
    kube_create_job(bckt, f_name, id)
    return "ok"

# Set logging
logging.basicConfig(stream=sys.stdout, level=logging.INFO)

# Setup K8 configs
config.load_incluster_config()
def kube_create_job_object(name, container_image, bucket_name, f_name, namespace, container_name="jobcontainer", env_vars={}):

    body = client.V1Job(api_version="batch/v1", kind="Job")
    body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
    body.status = client.V1JobStatus()

    template = client.V1PodTemplate()
    template.template = client.V1PodTemplateSpec()
    env_list = [
        client.V1EnvVar(name="WEAVIATE_ENDPOINT", value=os.getenv("WEAVIATE_ENDPOINT")),
        client.V1EnvVar(name="WEAVIATE_GRPC_ENDPOINT", value=os.getenv("WEAVIATE_GRPC_ENDPOINT")),
        client.V1EnvVar(name="FILE_NAME", value=f_name), 
        client.V1EnvVar(name="BUCKET_NAME", value=bucket_name),
        client.V1EnvVar(name="APIKEY", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="AUTHENTICATION_APIKEY_ALLOWED_KEYS", name="apikeys"))), 
    ]

    container = client.V1Container(name=container_name, image=container_image, image_pull_policy='Always', env=env_list)
    template.template.spec = client.V1PodSpec(containers=[container], restart_policy='Never', service_account='embed-docs-sa')

    body.spec = client.V1JobSpec(backoff_limit=3, ttl_seconds_after_finished=60, template=template.template)
    return body
def kube_test_credentials():
    try: 
        api_response = api_instance.get_api_resources()
        logging.info(api_response)
    except ApiException as e:
        print("Exception when calling API: %s\n" % e)

def kube_create_job(bckt, f_name, id):
    container_image = os.getenv("JOB_IMAGE")
    namespace = os.getenv("JOB_NAMESPACE")
    name = "docs-embedder" + id
    body = kube_create_job_object(name, container_image, bckt, f_name, namespace)
    v1=client.BatchV1Api()
    try: 
        v1.create_namespaced_job(namespace, body, pretty=True)
    except ApiException as e:
        print("Exception when calling BatchV1Api->create_namespaced_job: %s\n" % e)
    return

if __name__ == '__main__':
    app.run('0.0.0.0', port=5001, debug=True)

Informazioni su embedding-job.py

Questo file elabora i documenti e li invia al database vettoriale.

Qdrant

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from langchain_google_vertexai import ChatVertexAI
from langchain.prompts import ChatPromptTemplate
from langchain_google_vertexai import VertexAIEmbeddings
from langchain.memory import ConversationBufferWindowMemory
from langchain_community.vectorstores import Qdrant
from qdrant_client import QdrantClient
import streamlit as st
import os

vertexAI = ChatVertexAI(model_name=os.getenv("VERTEX_AI_MODEL_NAME", "gemini-2.5-flash-preview-04-17"), streaming=True, convert_system_message_to_human=True)
prompt_template = ChatPromptTemplate.from_messages(
    [
        ("system", "You are a helpful assistant who helps in finding answers to questions using the provided context."),
        ("human", """
        The answer should be based on the text context given in "text_context" and the conversation history given in "conversation_history" along with its Caption: \n
        Base your response on the provided text context and the current conversation history to answer the query.
        Select the most relevant information from the context.
        Generate a draft response using the selected information. Remove duplicate content from the draft response.
        Generate your final response after adjusting it to increase accuracy and relevance.
        Now only show your final response!
        If you do not know the answer or context is not relevant, response with "I don't know".

        text_context:
        {context}

        conversation_history:
        {history}

        query:
        {query}
        """),
    ]
)

embedding_model = VertexAIEmbeddings("text-embedding-005")

client = QdrantClient(
    url=os.getenv("QDRANT_URL"),
    api_key=os.getenv("APIKEY"),
)
collection_name = os.getenv("COLLECTION_NAME")
vector_search = Qdrant(client, collection_name, embeddings=embedding_model)
def format_docs(docs):
    return "\n\n".join([d.page_content for d in docs])

st.title("🤖 Chatbot")
if "messages" not in st.session_state:
    st.session_state["messages"] = [{"role": "ai", "content": "How can I help you?"}]
if "memory" not in st.session_state:
    st.session_state["memory"] = ConversationBufferWindowMemory(
        memory_key="history",
        ai_prefix="Bob",
        human_prefix="User",
        k=3,
    )
for message in st.session_state.messages:
    with st.chat_message(message["role"]):
        st.write(message["content"])
if chat_input := st.chat_input():
    with st.chat_message("human"):
        st.write(chat_input)
        st.session_state.messages.append({"role": "human", "content": chat_input})

    found_docs = vector_search.similarity_search(chat_input)
    context = format_docs(found_docs)

    prompt_value = prompt_template.format_messages(name="Bob", query=chat_input, context=context, history=st.session_state.memory.load_memory_variables({}))
    with st.chat_message("ai"):
        with st.spinner("Typing..."):
            content = ""
            with st.empty():
                for chunk in vertexAI.stream(prompt_value):
                    content += chunk.content
                    st.write(content)
            st.session_state.messages.append({"role": "ai", "content": content})

    st.session_state.memory.save_context({"input": chat_input}, {"output": content})

Elasticsearch

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from langchain_google_vertexai import VertexAIEmbeddings
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from elasticsearch import Elasticsearch
from langchain_community.vectorstores.elasticsearch import ElasticsearchStore
from google.cloud import storage
import os

bucketname = os.getenv("BUCKET_NAME")
filename = os.getenv("FILE_NAME")

storage_client = storage.Client()
bucket = storage_client.bucket(bucketname)
blob = bucket.blob(filename)
blob.download_to_filename("/documents/" + filename)

loader = PyPDFLoader("/documents/" + filename)
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
documents = loader.load_and_split(text_splitter)

embeddings = VertexAIEmbeddings("text-embedding-005")

client = Elasticsearch(
    [os.getenv("ES_URL")], 
    verify_certs=False, 
    ssl_show_warn=False,
    basic_auth=("elastic", os.getenv("PASSWORD"))
)

db = ElasticsearchStore.from_documents(
    documents,
    embeddings,
    es_connection=client,
    index_name=os.getenv("INDEX_NAME")
)
db.client.indices.refresh(index=os.getenv("INDEX_NAME"))

print(filename + " was successfully embedded") 
print(f"# of vectors = {len(documents)}")

PGVector

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from langchain_google_vertexai import VertexAIEmbeddings
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores.pgvector import PGVector
from google.cloud import storage
import os
bucketname = os.getenv("BUCKET_NAME")
filename = os.getenv("FILE_NAME")

storage_client = storage.Client()
bucket = storage_client.bucket(bucketname)
blob = bucket.blob(filename)
blob.download_to_filename("/documents/" + filename)

loader = PyPDFLoader("/documents/" + filename)
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
documents = loader.load_and_split(text_splitter)
for document in documents:
    document.page_content = document.page_content.replace('\x00', '')

embeddings = VertexAIEmbeddings("text-embedding-005")

CONNECTION_STRING = PGVector.connection_string_from_db_params(
    driver="psycopg2",
    host=os.environ.get("POSTGRES_HOST"),
    port=5432,
    database=os.environ.get("DATABASE_NAME"),
    user=os.environ.get("USERNAME"),
    password=os.environ.get("PASSWORD"),
)
COLLECTION_NAME = os.environ.get("COLLECTION_NAME")

db = PGVector.from_documents(
    embedding=embeddings,
    documents=documents,
    collection_name=COLLECTION_NAME,
    connection_string=CONNECTION_STRING,
    use_jsonb=True
)

print(filename + " was successfully embedded") 
print(f"# of vectors = {len(documents)}")

Weaviate

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from langchain_google_vertexai import VertexAIEmbeddings
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
import weaviate
from weaviate.connect import ConnectionParams
from langchain_weaviate.vectorstores import WeaviateVectorStore
from google.cloud import storage
import os
bucketname = os.getenv("BUCKET_NAME")
filename = os.getenv("FILE_NAME")

storage_client = storage.Client()
bucket = storage_client.bucket(bucketname)
blob = bucket.blob(filename)
blob.download_to_filename("/documents/" + filename)

loader = PyPDFLoader("/documents/" + filename)
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
documents = loader.load_and_split(text_splitter)

embeddings = VertexAIEmbeddings("text-embedding-005")

auth_config = weaviate.auth.AuthApiKey(api_key=os.getenv("APIKEY"))
client = weaviate.WeaviateClient(
    connection_params=ConnectionParams.from_params(
        http_host=os.getenv("WEAVIATE_ENDPOINT"),
        http_port="80",
        http_secure=False,
        grpc_host=os.getenv("WEAVIATE_GRPC_ENDPOINT"),
        grpc_port="50051",
        grpc_secure=False,
    ),
    auth_client_secret=auth_config
)
client.connect()
if not client.collections.exists("trainingdocs"):
    collection = client.collections.create(name="trainingdocs")
db = WeaviateVectorStore.from_documents(documents, embeddings, client=client, index_name="trainingdocs")

print(filename + " was successfully embedded") 
print(f"# of vectors = {len(documents)}")

Informazioni su chat.py

Questo file configura il modello in modo che risponda alle domande utilizzando solo il contesto fornito e le risposte precedenti. Se il contesto o la cronologia della conversazione non corrispondono ad alcun dato, il modello restituisce I don't know.

Qdrant

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from flask import Flask, jsonify
from flask import request
import logging
import sys,os, time
from kubernetes import client, config, utils
import kubernetes.client
from kubernetes.client.rest import ApiException


app = Flask(__name__)
@app.route('/check')
def message():
    return jsonify({"Message": "Hi there"})


@app.route('/', methods=['POST'])
def bucket():
    request_data = request.get_json()
    print(request_data)
    bckt = request_data['bucket']
    f_name = request_data['name']
    id = request_data['generation'] 
    kube_create_job(bckt, f_name, id)
    return "ok"

# Set logging
logging.basicConfig(stream=sys.stdout, level=logging.INFO)

# Setup K8 configs
config.load_incluster_config()
def kube_create_job_object(name, container_image, bucket_name, f_name, namespace="qdrant", container_name="jobcontainer", env_vars={}):

    body = client.V1Job(api_version="batch/v1", kind="Job")
    body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
    body.status = client.V1JobStatus()

    template = client.V1PodTemplate()
    template.template = client.V1PodTemplateSpec()
    env_list = [
        client.V1EnvVar(name="QDRANT_URL", value=os.getenv("QDRANT_URL")),
        client.V1EnvVar(name="COLLECTION_NAME", value="training-docs"), 
        client.V1EnvVar(name="FILE_NAME", value=f_name), 
        client.V1EnvVar(name="BUCKET_NAME", value=bucket_name),
        client.V1EnvVar(name="APIKEY", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="api-key", name="qdrant-database-apikey"))), 
    ]

    container = client.V1Container(name=container_name, image=container_image, env=env_list)
    template.template.spec = client.V1PodSpec(containers=[container], restart_policy='Never', service_account='embed-docs-sa')

    body.spec = client.V1JobSpec(backoff_limit=3, ttl_seconds_after_finished=60, template=template.template)
    return body
def kube_test_credentials():
    try: 
        api_response = api_instance.get_api_resources()
        logging.info(api_response)
    except ApiException as e:
        print("Exception when calling API: %s\n" % e)

def kube_create_job(bckt, f_name, id):
    container_image = os.getenv("JOB_IMAGE")
    namespace = os.getenv("JOB_NAMESPACE")
    name = "docs-embedder" + id
    body = kube_create_job_object(name, container_image, bckt, f_name)
    v1=client.BatchV1Api()
    try: 
        v1.create_namespaced_job(namespace, body, pretty=True)
    except ApiException as e:
        print("Exception when calling BatchV1Api->create_namespaced_job: %s\n" % e)
    return

if __name__ == '__main__':
    app.run('0.0.0.0', port=5001, debug=True)

Elasticsearch

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from langchain_google_vertexai import ChatVertexAI
from langchain.prompts import ChatPromptTemplate
from langchain_google_vertexai import VertexAIEmbeddings
from langchain.memory import ConversationBufferWindowMemory
from elasticsearch import Elasticsearch
from langchain_community.vectorstores.elasticsearch import ElasticsearchStore
import streamlit as st
import os

vertexAI = ChatVertexAI(model_name=os.getenv("VERTEX_AI_MODEL_NAME", "gemini-2.5-flash-preview-04-17"), streaming=True, convert_system_message_to_human=True)
prompt_template = ChatPromptTemplate.from_messages(
    [
        ("system", "You are a helpful assistant who helps in finding answers to questions using the provided context."),
        ("human", """
        The answer should be based on the text context given in "text_context" and the conversation history given in "conversation_history" along with its Caption: \n
        Base your response on the provided text context and the current conversation history to answer the query.
        Select the most relevant information from the context.
        Generate a draft response using the selected information. Remove duplicate content from the draft response.
        Generate your final response after adjusting it to increase accuracy and relevance.
        Now only show your final response!
        If you do not know the answer or context is not relevant, response with "I don't know".

        text_context:
        {context}

        conversation_history:
        {history}

        query:
        {query}
        """),
    ]
)

embedding_model = VertexAIEmbeddings("text-embedding-005")

client = Elasticsearch(
    [os.getenv("ES_URL")], 
    verify_certs=False, 
    ssl_show_warn=False,
    basic_auth=("elastic", os.getenv("PASSWORD"))
)
vector_search = ElasticsearchStore(
    index_name=os.getenv("INDEX_NAME"),
    es_connection=client,
    embedding=embedding_model
)

def format_docs(docs):
    return "\n\n".join([d.page_content for d in docs])

st.title("🤖 Chatbot")
if "messages" not in st.session_state:
    st.session_state["messages"] = [{"role": "ai", "content": "How can I help you?"}]

if "memory" not in st.session_state:
    st.session_state["memory"] = ConversationBufferWindowMemory(
        memory_key="history",
        ai_prefix="Bot",
        human_prefix="User",
        k=3,
    )

for message in st.session_state.messages:
    with st.chat_message(message["role"]):
        st.write(message["content"])

if chat_input := st.chat_input():
    with st.chat_message("human"):
        st.write(chat_input)
        st.session_state.messages.append({"role": "human", "content": chat_input})

    found_docs = vector_search.similarity_search(chat_input)
    context = format_docs(found_docs)

    prompt_value = prompt_template.format_messages(name="Bot", query=chat_input, context=context, history=st.session_state.memory.load_memory_variables({}))
    with st.chat_message("ai"):
        with st.spinner("Typing..."):
            content = ""
            with st.empty():
                for chunk in vertexAI.stream(prompt_value):
                    content += chunk.content
                    st.write(content)
            st.session_state.messages.append({"role": "ai", "content": content})

    st.session_state.memory.save_context({"input": chat_input}, {"output": content})

PGVector

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from langchain_google_vertexai import ChatVertexAI
from langchain.prompts import ChatPromptTemplate
from langchain_google_vertexai import VertexAIEmbeddings
from langchain.memory import ConversationBufferWindowMemory
from langchain_community.vectorstores.pgvector import PGVector
import streamlit as st
import os

vertexAI = ChatVertexAI(model_name=os.getenv("VERTEX_AI_MODEL_NAME", "gemini-2.5-flash-preview-04-17"), streaming=True, convert_system_message_to_human=True)
prompt_template = ChatPromptTemplate.from_messages(
    [
        ("system", "You are a helpful assistant who helps in finding answers to questions using the provided context."),
        ("human", """
        The answer should be based on the text context given in "text_context" and the conversation history given in "conversation_history" along with its Caption: \n
        Base your response on the provided text context and the current conversation history to answer the query.
        Select the most relevant information from the context.
        Generate a draft response using the selected information. Remove duplicate content from the draft response.
        Generate your final response after adjusting it to increase accuracy and relevance.
        Now only show your final response!
        If you do not know the answer or context is not relevant, response with "I don't know".

        text_context:
        {context}

        conversation_history:
        {history}

        query:
        {query}
        """),
    ]
)

embedding_model = VertexAIEmbeddings("text-embedding-005")

CONNECTION_STRING = PGVector.connection_string_from_db_params(
    driver="psycopg2",
    host=os.environ.get("POSTGRES_HOST"),
    port=5432,
    database=os.environ.get("DATABASE_NAME"),
    user=os.environ.get("USERNAME"),
    password=os.environ.get("PASSWORD"),
)
COLLECTION_NAME = os.environ.get("COLLECTION_NAME"),

vector_search = PGVector(
    collection_name=COLLECTION_NAME,
    connection_string=CONNECTION_STRING,
    embedding_function=embedding_model,
)

def format_docs(docs):
    return "\n\n".join([d.page_content for d in docs])

st.title("🤖 Chatbot")
if "messages" not in st.session_state:
    st.session_state["messages"] = [{"role": "ai", "content": "How can I help you?"}]

if "memory" not in st.session_state:
    st.session_state["memory"] = ConversationBufferWindowMemory(
        memory_key="history",
        ai_prefix="Bot",
        human_prefix="User",
        k=3,
    )

for message in st.session_state.messages:
    with st.chat_message(message["role"]):
        st.write(message["content"])

if chat_input := st.chat_input():
    with st.chat_message("human"):
        st.write(chat_input)
        st.session_state.messages.append({"role": "human", "content": chat_input})

    found_docs = vector_search.similarity_search(chat_input)
    context = format_docs(found_docs)

    prompt_value = prompt_template.format_messages(name="Bot", query=chat_input, context=context, history=st.session_state.memory.load_memory_variables({}))
    with st.chat_message("ai"):
        with st.spinner("Typing..."):
            content = ""
            with st.empty():
                for chunk in vertexAI.stream(prompt_value):
                    content += chunk.content
                    st.write(content)
            st.session_state.messages.append({"role": "ai", "content": content})

    st.session_state.memory.save_context({"input": chat_input}, {"output": content})

Weaviate

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from langchain_google_vertexai import ChatVertexAI
from langchain.prompts import ChatPromptTemplate
from langchain_google_vertexai import VertexAIEmbeddings
from langchain.memory import ConversationBufferWindowMemory
import weaviate
from weaviate.connect import ConnectionParams
from langchain_weaviate.vectorstores import WeaviateVectorStore
import streamlit as st
import os

vertexAI = ChatVertexAI(model_name=os.getenv("VERTEX_AI_MODEL_NAME", "gemini-2.5-flash-preview-04-17"), streaming=True, convert_system_message_to_human=True)
prompt_template = ChatPromptTemplate.from_messages(
    [
        ("system", "You are a helpful assistant who helps in finding answers to questions using the provided context."),
        ("human", """
        The answer should be based on the text context given in "text_context" and the conversation history given in "conversation_history" along with its Caption: \n
        Base your response on the provided text context and the current conversation history to answer the query.
        Select the most relevant information from the context.
        Generate a draft response using the selected information. Remove duplicate content from the draft response.
        Generate your final response after adjusting it to increase accuracy and relevance.
        Now only show your final response!
        If you do not know the answer or context is not relevant, response with "I don't know".

        text_context:
        {context}

        conversation_history:
        {history}

        query:
        {query}
        """),
    ]
)

embedding_model = VertexAIEmbeddings("text-embedding-005")

auth_config = weaviate.auth.AuthApiKey(api_key=os.getenv("APIKEY"))
client = weaviate.WeaviateClient(
    connection_params=ConnectionParams.from_params(
        http_host=os.getenv("WEAVIATE_ENDPOINT"),
        http_port="80",
        http_secure=False,
        grpc_host=os.getenv("WEAVIATE_GRPC_ENDPOINT"),
        grpc_port="50051",
        grpc_secure=False,
    ),
    auth_client_secret=auth_config
)
client.connect()

vector_search = WeaviateVectorStore.from_documents([],embedding_model,client=client, index_name="trainingdocs")

def format_docs(docs):
    return "\n\n".join([d.page_content for d in docs])

st.title("🤖 Chatbot")
if "messages" not in st.session_state:
    st.session_state["messages"] = [{"role": "ai", "content": "How can I help you?"}]

if "memory" not in st.session_state:
    st.session_state["memory"] = ConversationBufferWindowMemory(
        memory_key="history",
        ai_prefix="Bot",
        human_prefix="User",
        k=3,
    )

for message in st.session_state.messages:
    with st.chat_message(message["role"]):
        st.write(message["content"])

if chat_input := st.chat_input():
    with st.chat_message("human"):
        st.write(chat_input)
        st.session_state.messages.append({"role": "human", "content": chat_input})

    found_docs = vector_search.similarity_search(chat_input)
    context = format_docs(found_docs)

    prompt_value = prompt_template.format_messages(name="Bot", query=chat_input, context=context, history=st.session_state.memory.load_memory_variables({}))
    with st.chat_message("ai"):
        with st.spinner("Typing..."):
            content = ""
            with st.empty():
                for chunk in vertexAI.stream(prompt_value):
                    content += chunk.content
                    st.write(content)
            st.session_state.messages.append({"role": "ai", "content": content})

    st.session_state.memory.save_context({"input": chat_input}, {"output": content})