יצירת צ'אטבוט RAG באמצעות GKE ו-Cloud Storage

במדריך הזה נראה איך לשלב אפליקציה של מודל שפה גדול (LLM) שמבוססת על יצירה משולבת-אחזור (RAG) עם קובצי PDF שמעלים לקטגוריה של Cloud Storage.

במדריך הזה נעשה שימוש במסד נתונים כמנוע אחסון וחיפוש סמנטי שמכיל את הייצוגים (הטמעות) של המסמכים שהועלו. אתם משתמשים ב-framework של Langchain כדי ליצור אינטראקציה עם ההטמעות, ומשתמשים במודלים של Gemini שזמינים דרך Vertex AI.

‫Langchain הוא פריימוורק פופולרי של Python בקוד פתוח, שמפשט הרבה משימות של למידת מכונה ויש לו ממשקי משתמש לשילוב עם מסדי נתונים שונים של וקטורים ושירותי AI.

המדריך הזה מיועד למנהלים ולארכיטקטים של פלטפורמות ענן, למהנדסי ML ולמומחי MLOps (DevOps) שרוצים לפרוס אפליקציות RAG LLM ב-GKE וב-Cloud Storage.

מטרות

במדריך הזה תלמדו איך:

  • ליצור ולפרוס אפליקציה כדי ליצור הטמעות של מסמכים ולאחסן אותן במסד נתונים וקטורי.
  • אוטומציה של האפליקציה כדי להפעיל העלאות של מסמכים חדשים לקטגוריה של Cloud Storage.
  • פריסת אפליקציית צ'אטבוט שמשתמשת בחיפוש סמנטי כדי לענות על שאלות על סמך תוכן המסמך.

ארכיטקטורת פריסה

במדריך הזה תיצרו קטגוריה של Cloud Storage, טריגר Eventarc ואת השירותים הבאים:

  • embed-docs: טריגר של Eventarc מפעיל את השירות הזה בכל פעם שמשתמש מעלה מסמך חדש לקטגוריה של Cloud Storage. השירות מתחיל Kubernetes Job שיוצר הטמעות למסמך שהועלה ומכניס את ההטמעות למסד נתונים וקטורי.
  • chatbot: השירות הזה עונה על שאלות בשפה טבעית לגבי המסמכים שהועלו באמצעות חיפוש סמנטי ו-Gemini API.

בתרשים הבא מוצג תהליך ההעלאה והווקטוריזציה של מסמכים:

בתרשים, המשתמש מעלה קבצים לקטגוריה ב-Cloud Storage. ‫Eventarc נרשם לאירועים של אובייקט metadataUpdated בדלי ומשתמש במעביר האירועים של Eventarc, שהוא עומס עבודה של Kubernetes, כדי לקרוא לשירות embed-docs כשמעלים מסמך חדש. השירות יוצר הטבעות למסמך שהועלה. embed-docs השירות מאחסן את ההטמעות במסד נתונים וקטורי באמצעות מודל ההטמעה של Vertex AI.

בתרשים הבא מוצג התהליך של שאילת שאלות לגבי תוכן המסמך שהועלה באמצעות chatbot Service:

המשתמשים יכולים לשאול שאלות בשפה טבעית, והצ'אטבוט יוצר תשובות שמבוססות רק על התוכן של הקבצים שהועלו. הצ'אטבוט מאחזר הקשר ממסד הנתונים הווקטורי באמצעות חיפוש סמנטי, ואז שולח את השאלה וההקשר ל-Gemini.

עלויות

במסמך הזה משתמשים ברכיבים הבאים של Google Cloud, והשימוש בהם כרוך בתשלום:

כדי להעריך את ההוצאות בהתאם לתחזית השימוש שלכם, אתם יכולים להיעזר במחשבון העלויות.

משתמשים חדשים של Google Cloud ? יכול להיות שאתם זכאים לתקופת ניסיון בחינם.

כשמסיימים את המשימות שמתוארות במסמך הזה אפשר למחוק את המשאבים שיצרתם כדי להימנע מחיובים נוספים. מידע נוסף זמין בקטע הסרת המשאבים.

לפני שמתחילים

במדריך הזה משתמשים ב-Cloud Shell כדי להריץ פקודות. ‫Cloud Shell היא סביבת מעטפת לניהול משאבים שמתארחים ב- Google Cloud. ב-Cloud Shell מותקנים מראש כלי שורת הפקודה Google Cloud CLI,‏ kubectl ו-Terraform. אם אתם לא משתמשים ב-Cloud Shell, אתם צריכים להתקין את Google Cloud CLI.

  1. נכנסים לחשבון Google Cloud . אם אתם משתמשים חדשים ב- Google Cloud, צרו חשבון כדי שתוכלו להעריך את הביצועים של המוצרים שלנו בתרחישים מהעולם האמיתי. לקוחות חדשים מקבלים בחינם גם קרדיט בשווי 300$ להרצה, לבדיקה ולפריסה של עומסי העבודה.
  2. התקינו את ה-CLI של Google Cloud.

  3. אם אתם משתמשים בספק זהויות חיצוני (IdP), קודם אתם צריכים להיכנס ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.

  4. כדי לאתחל את ה-CLI של gcloud, הריצו את הפקודה הבאה:

    gcloud init
  5. יוצרים או בוחרים Google Cloud פרויקט.

    תפקידים שנדרשים כדי לבחור או ליצור פרויקט

    • Select a project: כדי לבחור פרויקט לא צריך תפקיד IAM ספציפי – אפשר לבחור כל פרויקט שקיבלתם בו תפקיד.
    • יצירת פרויקט: כדי ליצור פרויקט, צריך את התפקיד Project Creator (יצירת פרויקטים) (roles/resourcemanager.projectCreator), שכולל את ההרשאה resourcemanager.projects.create. איך מקצים תפקידים
    • יוצרים Google Cloud פרויקט:

      gcloud projects create PROJECT_ID

      מחליפים את PROJECT_ID בשם של פרויקט Google Cloud שיוצרים.

    • בוחרים את הפרויקט שיצרתם: Google Cloud

      gcloud config set project PROJECT_ID

      מחליפים את PROJECT_ID בשם הפרויקט ב- Google Cloud .

  6. מוודאים שהחיוב מופעל בפרויקט Google Cloud .

  7. מפעילים את ממשקי ה-API של Vertex AI,‏ Cloud Build,‏ Eventarc ו-Artifact Registry:

    תפקידים שנדרשים להפעלת ממשקי API

    כדי להפעיל ממשקי API, צריך את תפקיד ה-IAM 'אדמין של Service Usage' (roles/serviceusage.serviceUsageAdmin), שכולל את ההרשאה serviceusage.services.enable. איך מקצים תפקידים

    gcloud services enable aiplatform.googleapis.com cloudbuild.googleapis.com eventarc.googleapis.com artifactregistry.googleapis.com
  8. התקינו את ה-CLI של Google Cloud.

  9. אם אתם משתמשים בספק זהויות חיצוני (IdP), קודם אתם צריכים להיכנס ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.

  10. כדי לאתחל את ה-CLI של gcloud, הריצו את הפקודה הבאה:

    gcloud init
  11. יוצרים או בוחרים Google Cloud פרויקט.

    תפקידים שנדרשים כדי לבחור או ליצור פרויקט

    • Select a project: כדי לבחור פרויקט לא צריך תפקיד IAM ספציפי – אפשר לבחור כל פרויקט שקיבלתם בו תפקיד.
    • יצירת פרויקט: כדי ליצור פרויקט, צריך את התפקיד Project Creator (יצירת פרויקטים) (roles/resourcemanager.projectCreator), שכולל את ההרשאה resourcemanager.projects.create. איך מקצים תפקידים
    • יוצרים Google Cloud פרויקט:

      gcloud projects create PROJECT_ID

      מחליפים את PROJECT_ID בשם של פרויקט Google Cloud שיוצרים.

    • בוחרים את הפרויקט שיצרתם: Google Cloud

      gcloud config set project PROJECT_ID

      מחליפים את PROJECT_ID בשם הפרויקט ב- Google Cloud .

  12. מוודאים שהחיוב מופעל בפרויקט Google Cloud .

  13. מפעילים את ממשקי ה-API של Vertex AI,‏ Cloud Build,‏ Eventarc ו-Artifact Registry:

    תפקידים שנדרשים להפעלת ממשקי API

    כדי להפעיל ממשקי API, צריך את תפקיד ה-IAM 'אדמין של Service Usage' (roles/serviceusage.serviceUsageAdmin), שכולל את ההרשאה serviceusage.services.enable. איך מקצים תפקידים

    gcloud services enable aiplatform.googleapis.com cloudbuild.googleapis.com eventarc.googleapis.com artifactregistry.googleapis.com
  14. מעניקים תפקידים לחשבון המשתמש. מריצים את הפקודה הבאה לכל אחד מהתפקידים הבאים ב-IAM: eventarc.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    מחליפים את מה שכתוב בשדות הבאים:

    • PROJECT_ID: מזהה הפרויקט.
    • USER_IDENTIFIER: המזהה של חשבון המשתמש . לדוגמה, myemail@example.com.
    • ROLE: תפקיד ה-IAM שאתם מקצים לחשבון המשתמש.

יצירת אשכול

יצירת אשכול Qdrant,‏ Elasticsearch או Postgres:

Qdrant

פועלים לפי ההוראות במאמר פריסת מסד נתונים של וקטורים של Qdrant ב-GKE כדי ליצור אשכול Qdrant שפועל במצב Autopilot או במצב Standard של אשכול GKE.

Elasticsearch

פועלים לפי ההוראות במאמר פריסת מסד נתונים וקטורי של Elasticsearch ב-GKE כדי ליצור אשכול Elasticsearch שפועל באשכול GKE במצב Autopilot או במצב רגיל.

PGVector

פועלים לפי ההוראות במאמר פריסת מסד נתונים של וקטורים ב-PostgreSQL ב-GKE כדי ליצור אשכול Postgres עם PGVector שפועל במצב Autopilot או במצב רגיל של אשכול GKE.

Weaviate

פועלים לפי ההוראות לפריסת מסד נתונים וקטורי של Weaviate ב-GKE כדי ליצור אשכול Weaviate שפועל באשכול GKE במצב Autopilot או במצב רגיל.

מגדירים את הסביבה

מגדירים את הסביבה באמצעות Cloud Shell:

  1. מגדירים משתני סביבה לפרויקט:

    Qdrant

    export PROJECT_ID=PROJECT_ID
    export KUBERNETES_CLUSTER_PREFIX=qdrant
    export CONTROL_PLANE_LOCATION=us-central1
    export REGION=us-central1
    export DB_NAMESPACE=qdrant
    

    מחליפים את PROJECT_ID במזהה הפרויקט ב-Google Cloud .

    Elasticsearch

    export PROJECT_ID=PROJECT_ID
    export KUBERNETES_CLUSTER_PREFIX=elasticsearch
    export CONTROL_PLANE_LOCATION=us-central1
    export REGION=us-central1
    export DB_NAMESPACE=elastic
    

    מחליפים את PROJECT_ID במזהה הפרויקט ב-Google Cloud .

    PGVector

    export PROJECT_ID=PROJECT_ID
    export KUBERNETES_CLUSTER_PREFIX=postgres
    export CONTROL_PLANE_LOCATION=us-central1
    export REGION=us-central1
    export DB_NAMESPACE=pg-ns
    

    מחליפים את PROJECT_ID במזהה הפרויקט ב-Google Cloud .

    Weaviate

    export PROJECT_ID=PROJECT_ID
    export KUBERNETES_CLUSTER_PREFIX=weaviate
    export CONTROL_PLANE_LOCATION=us-central1
    export REGION=us-central1
    export DB_NAMESPACE=weaviate
    

    מחליפים את PROJECT_ID במזהה הפרויקט ב-Google Cloud .

  2. מוודאים שקלאסטר ה-GKE פועל:

    gcloud container clusters list --project=${PROJECT_ID} --location=${CONTROL_PLANE_LOCATION}
    

    הפלט אמור להיראות כך:

    NAME                                    LOCATION        MASTER_VERSION      MASTER_IP     MACHINE_TYPE  NODE_VERSION        NUM_NODES STATUS
    [KUBERNETES_CLUSTER_PREFIX]-cluster   us-central1   1.30.1-gke.1329003  <EXTERNAL IP> e2-standard-2 1.30.1-gke.1329003   6        RUNNING
    
  3. משכפלים את מאגר הקוד לדוגמה מ-GitHub:

    git clone https://github.com/GoogleCloudPlatform/kubernetes-engine-samples
    
  4. מנווטים לספרייה databases:

    cd kubernetes-engine-samples/databases
    

הכנת התשתית

יצירת מאגר ב-Artifact Registry, יצירת קובצי אימג' של Docker והעברת קובצי אימג' של Docker בדחיפה אל Artifact Registry:

  1. יוצרים מאגר Artifact Registry:

    gcloud artifacts repositories create ${KUBERNETES_CLUSTER_PREFIX}-images \
        --repository-format=docker \
        --location=${REGION} \
        --description="Vector database images repository" \
        --async
    
  2. מגדירים את ההרשאות storage.objectAdmin ו-artifactregistry.admin בחשבון השירות של Compute Engine כדי להשתמש ב-Cloud Build ליצירה ולדחיפה של תמונות Docker לשירותים embed-docs ו-chatbot.

    export PROJECT_NUMBER=PROJECT_NUMBER
    
    gcloud projects add-iam-policy-binding ${PROJECT_ID}  \
    --member="serviceAccount:${PROJECT_NUMBER}-compute@developer.gserviceaccount.com" \
    --role="roles/storage.objectAdmin"
    
    gcloud projects add-iam-policy-binding ${PROJECT_ID}  \
    --member="serviceAccount:${PROJECT_NUMBER}-compute@developer.gserviceaccount.com" \
    --role="roles/artifactregistry.admin"
    

    מחליפים את PROJECT_NUMBER במספר הפרויקט ב-Google Cloud .

  3. יוצרים קובצי אימג' של Docker לשירותים embed-docs ו-chatbot. embed-docs התמונה מכילה קוד Python גם לאפליקציה שמקבלת בקשות להעברה של Eventarc וגם למשימת ההטמעה.

    Qdrant

    export DOCKER_REPO="${REGION}-docker.pkg.dev/${PROJECT_ID}/${KUBERNETES_CLUSTER_PREFIX}-images"
    gcloud builds submit qdrant/docker/chatbot --region=${REGION} \
      --tag ${DOCKER_REPO}/chatbot:1.0 --async
    gcloud builds submit qdrant/docker/embed-docs --region=${REGION} \
      --tag ${DOCKER_REPO}/embed-docs:1.0 --async
    

    Elasticsearch

    export DOCKER_REPO="${REGION}-docker.pkg.dev/${PROJECT_ID}/${KUBERNETES_CLUSTER_PREFIX}-images"
    gcloud builds submit elasticsearch/docker/chatbot --region=${REGION} \
      --tag ${DOCKER_REPO}/chatbot:1.0 --async
    gcloud builds submit elasticsearch/docker/embed-docs --region=${REGION} \
      --tag ${DOCKER_REPO}/embed-docs:1.0 --async
    

    PGVector

    export DOCKER_REPO="${REGION}-docker.pkg.dev/${PROJECT_ID}/${KUBERNETES_CLUSTER_PREFIX}-images"
    gcloud builds submit postgres-pgvector/docker/chatbot --region=${REGION} \
      --tag ${DOCKER_REPO}/chatbot:1.0 --async
    gcloud builds submit postgres-pgvector/docker/embed-docs --region=${REGION} \
      --tag ${DOCKER_REPO}/embed-docs:1.0 --async
    

    Weaviate

    export DOCKER_REPO="${REGION}-docker.pkg.dev/${PROJECT_ID}/${KUBERNETES_CLUSTER_PREFIX}-images"
    gcloud builds submit weaviate/docker/chatbot --region=${REGION} \
      --tag ${DOCKER_REPO}/chatbot:1.0 --async
    gcloud builds submit weaviate/docker/embed-docs --region=${REGION} \
      --tag ${DOCKER_REPO}/embed-docs:1.0 --async
    
  4. מאמתים את התמונות:

    gcloud artifacts docker images list $DOCKER_REPO \
        --project=$PROJECT_ID \
        --format="value(IMAGE)"
    

    הפלט אמור להיראות כך:

    $REGION-docker.pkg.dev/$PROJECT_ID/${KUBERNETES_CLUSTER_PREFIX}-images/chatbot
    $REGION-docker.pkg.dev/$PROJECT_ID/${KUBERNETES_CLUSTER_PREFIX}-images/embed-docs
    
  5. פריסת חשבון שירות של Kubernetes עם הרשאות להפעלת משימות של Kubernetes:

    Qdrant

    sed "s/<PROJECT_ID>/$PROJECT_ID/;s/<CLUSTER_PREFIX>/$KUBERNETES_CLUSTER_PREFIX/" qdrant/manifests/05-rag/service-account.yaml | kubectl -n qdrant apply -f -
    

    Elasticsearch

    sed "s/<PROJECT_ID>/$PROJECT_ID/;s/<CLUSTER_PREFIX>/$KUBERNETES_CLUSTER_PREFIX/" elasticsearch/manifests/05-rag/service-account.yaml | kubectl -n elastic apply -f -
    

    PGVector

    sed "s/<PROJECT_ID>/$PROJECT_ID/;s/<CLUSTER_PREFIX>/$KUBERNETES_CLUSTER_PREFIX/" postgres-pgvector/manifests/03-rag/service-account.yaml | kubectl -n pg-ns apply -f -
    

    Weaviate

    sed "s/<PROJECT_ID>/$PROJECT_ID/;s/<CLUSTER_PREFIX>/$KUBERNETES_CLUSTER_PREFIX/" weaviate/manifests/04-rag/service-account.yaml | kubectl -n weaviate apply -f -
    
  6. כשמשתמשים ב-Terraform כדי ליצור את אשכול GKE והערך של create_service_account מוגדר כ-true, נוצר חשבון שירות נפרד שמשמש את האשכול והצמתים. צריך להקצות את התפקיד artifactregistry.serviceAgent לחשבון השירות הזה של Compute Engine כדי שהצמתים יוכלו לשלוף תמונה מ-Artifact Registry שנוצר עבור embed-docs ו-chatbot.

    export CLUSTER_SERVICE_ACCOUNT=$(gcloud container clusters describe ${KUBERNETES_CLUSTER_PREFIX}-cluster \
    --location=${CONTROL_PLANE_LOCATION} \
    --format="value(nodeConfig.serviceAccount)")
    
    gcloud projects add-iam-policy-binding ${PROJECT_ID}  \
    --member="serviceAccount:${CLUSTER_SERVICE_ACCOUNT}" \
    --role="roles/artifactregistry.serviceAgent"
    

    אם לא תעניקו גישה לחשבון השירות, יכול להיות שהצמתים שלכם ייתקלו בבעיית הרשאה בניסיון למשוך תמונה מ-Artifact Registry כשמנסים לפרוס את השירותים embed-docs ו-chatbot.

  7. פורסים Kubernetes Deployment לשירותים embed-docs ו-chatbot. פריסה היא אובייקט Kubernetes API שמאפשר להפעיל כמה רפליקות של Pods שמפוזרות בין הצמתים באשכול:

    Qdrant

    sed "s|<DOCKER_REPO>|$DOCKER_REPO|" qdrant/manifests/05-rag/chatbot.yaml | kubectl -n qdrant apply -f -
    sed "s|<DOCKER_REPO>|$DOCKER_REPO|" qdrant/manifests/05-rag/docs-embedder.yaml | kubectl -n qdrant apply -f -
    

    Elasticsearch

    sed "s|<DOCKER_REPO>|$DOCKER_REPO|" elasticsearch/manifests/05-rag/chatbot.yaml | kubectl -n elastic apply -f -
    sed "s|<DOCKER_REPO>|$DOCKER_REPO|" elasticsearch/manifests/05-rag/docs-embedder.yaml | kubectl -n elastic apply -f -
    

    PGVector

    sed "s|<DOCKER_REPO>|$DOCKER_REPO|" postgres-pgvector/manifests/03-rag/chatbot.yaml | kubectl -n pg-ns apply -f -
    sed "s|<DOCKER_REPO>|$DOCKER_REPO|" postgres-pgvector/manifests/03-rag/docs-embedder.yaml | kubectl -n pg-ns apply -f -
    

    Weaviate

    sed "s|<DOCKER_REPO>|$DOCKER_REPO|" weaviate/manifests/04-rag/chatbot.yaml | kubectl -n weaviate apply -f -
    sed "s|<DOCKER_REPO>|$DOCKER_REPO|" weaviate/manifests/04-rag/docs-embedder.yaml | kubectl -n weaviate apply -f -
    
  8. הפעלת טריגרים של Eventarc ל-GKE:

    gcloud eventarc gke-destinations init
    

    כשתופיע בקשה, כותבים y.

  9. פורסים את קטגוריית Cloud Storage ויוצרים טריגר Eventarc באמצעות Terraform:

    export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
    terraform -chdir=vector-database/terraform/cloud-storage init
    terraform -chdir=vector-database/terraform/cloud-storage apply \
      -var project_id=${PROJECT_ID} \
      -var region=${REGION} \
      -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX} \
      -var db_namespace=${DB_NAMESPACE}
    

    כשמופיעה בקשה, כותבים yes. יכול להיות שהפקודה תימשך כמה דקות.

    ‫Terraform יוצר את המשאבים הבאים:

    • קטגוריה של Cloud Storage להעלאת המסמכים
    • טריגר Eventarc
    • Google Cloud חשבון שירות בשם service_account_eventarc_name עם הרשאה להשתמש ב-Eventarc.
    • חשבון שירות בשם service_account_bucket_name עם הרשאה לקרוא את הקטגוריה ולגשת למודלים של Vertex AI. Google Cloud

    הפלט אמור להיראות כך:

    ... # Several lines of output omitted
    
    Apply complete! Resources: 15 added, 0 changed, 0 destroyed.
    
    ... # Several lines of output omitted
    

טעינת מסמכים והרצת שאילתות בצ'אטבוט

מעלים את מסמכי ההדגמה ומריצים שאילתות כדי לחפש במסמכי ההדגמה באמצעות הצ'אטבוט:

  1. מעלים את המסמך לדוגמה carbon-free-energy.pdf לקטגוריה:

    gcloud storage cp vector-database/documents/carbon-free-energy.pdf gs://${PROJECT_ID}-${KUBERNETES_CLUSTER_PREFIX}-training-docs
    
  2. מוודאים שמשימת הטמעת המסמך הושלמה בהצלחה:

    kubectl get job -n ${DB_NAMESPACE}
    

    הפלט אמור להיראות כך:

    NAME                            COMPLETIONS   DURATION   AGE
    docs-embedder1716570453361446   1/1           32s        71s
    
  3. מקבלים את כתובת ה-IP החיצונית של מאזן העומסים:

    export EXTERNAL_IP=$(kubectl -n ${DB_NAMESPACE} get svc chatbot --output jsonpath='{.status.loadBalancer.ingress[0].ip}')
    echo http://${EXTERNAL_IP}:80
    
  4. פותחים את כתובת ה-IP החיצונית בדפדפן האינטרנט:

    http://EXTERNAL_IP
    

    צ'אטבוט ישיב בהודעה שדומה להודעה הבאה:

    How can I help you?
    
  5. לשאול שאלות לגבי התוכן של המסמכים שהועלו. אם הצ'אטבוט לא מוצא כלום, הוא עונה I don't know. לדוגמה, אפשר לשאול את השאלות הבאות:

    You: Hi, what are Google plans for the future?
    

    פלט לדוגמה מהצ'אטבוט:

    Bot: Google intends to run on carbon-free energy everywhere, at all times by 2030. To achieve this, it will rely on a combination of renewable energy sources, such as wind and solar, and carbon-free technologies, such as battery storage.
    
  6. לשאול את הצ'אט בוט שאלה שלא קשורה להקשר של המסמך שהועלה. לדוגמה, אתם יכולים לשאול את השאלות הבאות:

    You: What are Google plans to colonize Mars?
    

    פלט לדוגמה מהצ'אטבוט:

    Bot: I don't know. The provided context does not mention anything about Google's plans to colonize Mars.
    

מידע על קוד האפליקציה

בקטע הזה מוסבר איך קוד האפליקציה פועל. יש שלושה סקריפטים בתוך קובצי האימג' של Docker:

  • endpoint.py: מקבל אירועי Eventarc בכל העלאה של מסמך ומפעיל את משימות Kubernetes כדי לעבד אותם.
  • embedding-job.py: מוריד מסמכים מהמאגר, יוצר הטמעות ומכניס אותן למסד הנתונים הווקטורי.
  • chat.py: מריץ שאילתות על התוכן של מסמכים מאוחסנים.

בתרשים מוצג תהליך יצירת התשובות באמצעות נתונים מהמסמכים:

בתרשים, האפליקציה טוענת קובץ PDF, מפצלת את הקובץ למקטעים, ואז לוקטורים, ואז שולחת את הווקטורים למסד נתונים וקטורי. בשלב מאוחר יותר, משתמש שואל שאלה את הצ'אטבוט. שרשרת ה-RAG משתמשת בחיפוש סמנטי כדי לחפש במסד הנתונים הווקטורי, ואז מחזירה את ההקשר יחד עם השאלה ל-LLM. מודל ה-LLM עונה על השאלה ושומר אותה בהיסטוריית הצ'אט.

מידע על endpoint.py

הקובץ הזה מעבד הודעות מ-Eventarc, יוצר Kubernetes Job להטמעת המסמך ומקבל בקשות מכל מקום ביציאה 5001

Qdrant

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from flask import Flask, jsonify
from flask import request
import logging
import sys,os, time
from kubernetes import client, config, utils
import kubernetes.client
from kubernetes.client.rest import ApiException


app = Flask(__name__)
@app.route('/check')
def message():
    return jsonify({"Message": "Hi there"})


@app.route('/', methods=['POST'])
def bucket():
    request_data = request.get_json()
    print(request_data)
    bckt = request_data['bucket']
    f_name = request_data['name']
    id = request_data['generation'] 
    kube_create_job(bckt, f_name, id)
    return "ok"

# Set logging
logging.basicConfig(stream=sys.stdout, level=logging.INFO)

# Setup K8 configs
config.load_incluster_config()
def kube_create_job_object(name, container_image, bucket_name, f_name, namespace="qdrant", container_name="jobcontainer", env_vars={}):

    body = client.V1Job(api_version="batch/v1", kind="Job")
    body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
    body.status = client.V1JobStatus()

    template = client.V1PodTemplate()
    template.template = client.V1PodTemplateSpec()
    env_list = [
        client.V1EnvVar(name="QDRANT_URL", value=os.getenv("QDRANT_URL")),
        client.V1EnvVar(name="COLLECTION_NAME", value="training-docs"), 
        client.V1EnvVar(name="FILE_NAME", value=f_name), 
        client.V1EnvVar(name="BUCKET_NAME", value=bucket_name),
        client.V1EnvVar(name="APIKEY", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="api-key", name="qdrant-database-apikey"))), 
    ]

    container = client.V1Container(name=container_name, image=container_image, env=env_list)
    template.template.spec = client.V1PodSpec(containers=[container], restart_policy='Never', service_account='embed-docs-sa')

    body.spec = client.V1JobSpec(backoff_limit=3, ttl_seconds_after_finished=60, template=template.template)
    return body
def kube_test_credentials():
    try: 
        api_response = api_instance.get_api_resources()
        logging.info(api_response)
    except ApiException as e:
        print("Exception when calling API: %s\n" % e)

def kube_create_job(bckt, f_name, id):
    container_image = os.getenv("JOB_IMAGE")
    namespace = os.getenv("JOB_NAMESPACE")
    name = "docs-embedder" + id
    body = kube_create_job_object(name, container_image, bckt, f_name)
    v1=client.BatchV1Api()
    try: 
        v1.create_namespaced_job(namespace, body, pretty=True)
    except ApiException as e:
        print("Exception when calling BatchV1Api->create_namespaced_job: %s\n" % e)
    return

if __name__ == '__main__':
    app.run('0.0.0.0', port=5001, debug=True)

Elasticsearch

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from flask import Flask, jsonify
from flask import request
import logging
import sys,os, time
from kubernetes import client, config, utils
import kubernetes.client
from kubernetes.client.rest import ApiException


app = Flask(__name__)
@app.route('/check')
def message():
    return jsonify({"Message": "Hi there"})


@app.route('/', methods=['POST'])
def bucket():
    request_data = request.get_json()
    print(request_data)
    bckt = request_data['bucket']
    f_name = request_data['name']
    id = request_data['generation'] 
    kube_create_job(bckt, f_name, id)
    return "ok"

# Set logging
logging.basicConfig(stream=sys.stdout, level=logging.INFO)

# Setup K8 configs
config.load_incluster_config()

def kube_create_job_object(name, container_image, bucket_name, f_name, namespace="elastic", container_name="jobcontainer", env_vars={}):

    body = client.V1Job(api_version="batch/v1", kind="Job")
    body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
    body.status = client.V1JobStatus()

    template = client.V1PodTemplate()
    template.template = client.V1PodTemplateSpec()
    env_list = [
        client.V1EnvVar(name="ES_URL", value=os.getenv("ES_URL")),
        client.V1EnvVar(name="INDEX_NAME", value="training-docs"), 
        client.V1EnvVar(name="FILE_NAME", value=f_name), 
        client.V1EnvVar(name="BUCKET_NAME", value=bucket_name),
        client.V1EnvVar(name="PASSWORD", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="elastic", name="elasticsearch-ha-es-elastic-user"))), 
    ]

    container = client.V1Container(name=container_name, image=container_image, image_pull_policy='Always', env=env_list)
    template.template.spec = client.V1PodSpec(containers=[container], restart_policy='Never', service_account='embed-docs-sa')

    body.spec = client.V1JobSpec(backoff_limit=3, ttl_seconds_after_finished=60, template=template.template)
    return body

def kube_test_credentials():
    try: 
        api_response = api_instance.get_api_resources()
        logging.info(api_response)
    except ApiException as e:
        print("Exception when calling API: %s\n" % e)

def kube_create_job(bckt, f_name, id):
    container_image = os.getenv("JOB_IMAGE")
    namespace = os.getenv("JOB_NAMESPACE")
    name = "docs-embedder" + id
    body = kube_create_job_object(name, container_image, bckt, f_name)
    v1=client.BatchV1Api()
    try: 
        v1.create_namespaced_job(namespace, body, pretty=True)
    except ApiException as e:
        print("Exception when calling BatchV1Api->create_namespaced_job: %s\n" % e)
    return

if __name__ == '__main__':
    app.run('0.0.0.0', port=5001, debug=True)

PGVector

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from flask import Flask, jsonify
from flask import request
import logging
import sys,os, time
from kubernetes import client, config, utils
import kubernetes.client
from kubernetes.client.rest import ApiException


app = Flask(__name__)
@app.route('/check')
def message():
    return jsonify({"Message": "Hi there"})


@app.route('/', methods=['POST'])
def bucket():
    request_data = request.get_json()
    print(request_data)
    bckt = request_data['bucket']
    f_name = request_data['name']
    id = request_data['generation'] 
    kube_create_job(bckt, f_name, id)
    return "ok"

# Set logging
logging.basicConfig(stream=sys.stdout, level=logging.INFO)

# Setup K8 configs
config.load_incluster_config()
def kube_create_job_object(name, container_image, bucket_name, f_name, namespace="pg-ns", container_name="jobcontainer", env_vars={}):

    body = client.V1Job(api_version="batch/v1", kind="Job")
    body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
    body.status = client.V1JobStatus()

    template = client.V1PodTemplate()
    template.template = client.V1PodTemplateSpec()
    env_list = [
        client.V1EnvVar(name="POSTGRES_HOST", value=os.getenv("POSTGRES_HOST")),
        client.V1EnvVar(name="DATABASE_NAME", value="app"), 
        client.V1EnvVar(name="COLLECTION_NAME", value="training-docs"), 
        client.V1EnvVar(name="FILE_NAME", value=f_name), 
        client.V1EnvVar(name="BUCKET_NAME", value=bucket_name),
        client.V1EnvVar(name="PASSWORD", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="password", name="gke-pg-cluster-app"))), 
        client.V1EnvVar(name="USERNAME", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="username", name="gke-pg-cluster-app"))), 
    ]

    container = client.V1Container(name=container_name, image=container_image, image_pull_policy='Always', env=env_list)
    template.template.spec = client.V1PodSpec(containers=[container], restart_policy='Never', service_account='embed-docs-sa')

    body.spec = client.V1JobSpec(backoff_limit=3, ttl_seconds_after_finished=60, template=template.template)
    return body
def kube_test_credentials():
    try: 
        api_response = api_instance.get_api_resources()
        logging.info(api_response)
    except ApiException as e:
        print("Exception when calling API: %s\n" % e)

def kube_create_job(bckt, f_name, id):
    container_image = os.getenv("JOB_IMAGE")
    namespace = os.getenv("JOB_NAMESPACE")
    name = "docs-embedder" + id
    body = kube_create_job_object(name, container_image, bckt, f_name)
    v1=client.BatchV1Api()
    try: 
        v1.create_namespaced_job(namespace, body, pretty=True)
    except ApiException as e:
        print("Exception when calling BatchV1Api->create_namespaced_job: %s\n" % e)
    return

if __name__ == '__main__':
    app.run('0.0.0.0', port=5001, debug=True)

Weaviate

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from flask import Flask, jsonify
from flask import request
import logging
import sys,os, time
from kubernetes import client, config, utils
import kubernetes.client
from kubernetes.client.rest import ApiException


app = Flask(__name__)
@app.route('/check')
def message():
    return jsonify({"Message": "Hi there"})


@app.route('/', methods=['POST'])
def bucket():
    request_data = request.get_json()
    print(request_data)
    bckt = request_data['bucket']
    f_name = request_data['name']
    id = request_data['generation'] 
    kube_create_job(bckt, f_name, id)
    return "ok"

# Set logging
logging.basicConfig(stream=sys.stdout, level=logging.INFO)

# Setup K8 configs
config.load_incluster_config()
def kube_create_job_object(name, container_image, bucket_name, f_name, namespace, container_name="jobcontainer", env_vars={}):

    body = client.V1Job(api_version="batch/v1", kind="Job")
    body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
    body.status = client.V1JobStatus()

    template = client.V1PodTemplate()
    template.template = client.V1PodTemplateSpec()
    env_list = [
        client.V1EnvVar(name="WEAVIATE_ENDPOINT", value=os.getenv("WEAVIATE_ENDPOINT")),
        client.V1EnvVar(name="WEAVIATE_GRPC_ENDPOINT", value=os.getenv("WEAVIATE_GRPC_ENDPOINT")),
        client.V1EnvVar(name="FILE_NAME", value=f_name), 
        client.V1EnvVar(name="BUCKET_NAME", value=bucket_name),
        client.V1EnvVar(name="APIKEY", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="AUTHENTICATION_APIKEY_ALLOWED_KEYS", name="apikeys"))), 
    ]

    container = client.V1Container(name=container_name, image=container_image, image_pull_policy='Always', env=env_list)
    template.template.spec = client.V1PodSpec(containers=[container], restart_policy='Never', service_account='embed-docs-sa')

    body.spec = client.V1JobSpec(backoff_limit=3, ttl_seconds_after_finished=60, template=template.template)
    return body
def kube_test_credentials():
    try: 
        api_response = api_instance.get_api_resources()
        logging.info(api_response)
    except ApiException as e:
        print("Exception when calling API: %s\n" % e)

def kube_create_job(bckt, f_name, id):
    container_image = os.getenv("JOB_IMAGE")
    namespace = os.getenv("JOB_NAMESPACE")
    name = "docs-embedder" + id
    body = kube_create_job_object(name, container_image, bckt, f_name, namespace)
    v1=client.BatchV1Api()
    try: 
        v1.create_namespaced_job(namespace, body, pretty=True)
    except ApiException as e:
        print("Exception when calling BatchV1Api->create_namespaced_job: %s\n" % e)
    return

if __name__ == '__main__':
    app.run('0.0.0.0', port=5001, debug=True)

מידע על embedding-job.py

הקובץ הזה מעבד מסמכים ושולח אותם למאגר הנתונים הווקטורי.

Qdrant

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from langchain_google_vertexai import ChatVertexAI
from langchain.prompts import ChatPromptTemplate
from langchain_google_vertexai import VertexAIEmbeddings
from langchain.memory import ConversationBufferWindowMemory
from langchain_community.vectorstores import Qdrant
from qdrant_client import QdrantClient
import streamlit as st
import os

vertexAI = ChatVertexAI(model_name=os.getenv("VERTEX_AI_MODEL_NAME", "gemini-2.5-flash-preview-04-17"), streaming=True, convert_system_message_to_human=True)
prompt_template = ChatPromptTemplate.from_messages(
    [
        ("system", "You are a helpful assistant who helps in finding answers to questions using the provided context."),
        ("human", """
        The answer should be based on the text context given in "text_context" and the conversation history given in "conversation_history" along with its Caption: \n
        Base your response on the provided text context and the current conversation history to answer the query.
        Select the most relevant information from the context.
        Generate a draft response using the selected information. Remove duplicate content from the draft response.
        Generate your final response after adjusting it to increase accuracy and relevance.
        Now only show your final response!
        If you do not know the answer or context is not relevant, response with "I don't know".

        text_context:
        {context}

        conversation_history:
        {history}

        query:
        {query}
        """),
    ]
)

embedding_model = VertexAIEmbeddings("text-embedding-005")

client = QdrantClient(
    url=os.getenv("QDRANT_URL"),
    api_key=os.getenv("APIKEY"),
)
collection_name = os.getenv("COLLECTION_NAME")
vector_search = Qdrant(client, collection_name, embeddings=embedding_model)
def format_docs(docs):
    return "\n\n".join([d.page_content for d in docs])

st.title("🤖 Chatbot")
if "messages" not in st.session_state:
    st.session_state["messages"] = [{"role": "ai", "content": "How can I help you?"}]
if "memory" not in st.session_state:
    st.session_state["memory"] = ConversationBufferWindowMemory(
        memory_key="history",
        ai_prefix="Bob",
        human_prefix="User",
        k=3,
    )
for message in st.session_state.messages:
    with st.chat_message(message["role"]):
        st.write(message["content"])
if chat_input := st.chat_input():
    with st.chat_message("human"):
        st.write(chat_input)
        st.session_state.messages.append({"role": "human", "content": chat_input})

    found_docs = vector_search.similarity_search(chat_input)
    context = format_docs(found_docs)

    prompt_value = prompt_template.format_messages(name="Bob", query=chat_input, context=context, history=st.session_state.memory.load_memory_variables({}))
    with st.chat_message("ai"):
        with st.spinner("Typing..."):
            content = ""
            with st.empty():
                for chunk in vertexAI.stream(prompt_value):
                    content += chunk.content
                    st.write(content)
            st.session_state.messages.append({"role": "ai", "content": content})

    st.session_state.memory.save_context({"input": chat_input}, {"output": content})

Elasticsearch

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from langchain_google_vertexai import VertexAIEmbeddings
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from elasticsearch import Elasticsearch
from langchain_community.vectorstores.elasticsearch import ElasticsearchStore
from google.cloud import storage
import os

bucketname = os.getenv("BUCKET_NAME")
filename = os.getenv("FILE_NAME")

storage_client = storage.Client()
bucket = storage_client.bucket(bucketname)
blob = bucket.blob(filename)
blob.download_to_filename("/documents/" + filename)

loader = PyPDFLoader("/documents/" + filename)
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
documents = loader.load_and_split(text_splitter)

embeddings = VertexAIEmbeddings("text-embedding-005")

client = Elasticsearch(
    [os.getenv("ES_URL")], 
    verify_certs=False, 
    ssl_show_warn=False,
    basic_auth=("elastic", os.getenv("PASSWORD"))
)

db = ElasticsearchStore.from_documents(
    documents,
    embeddings,
    es_connection=client,
    index_name=os.getenv("INDEX_NAME")
)
db.client.indices.refresh(index=os.getenv("INDEX_NAME"))

print(filename + " was successfully embedded") 
print(f"# of vectors = {len(documents)}")

PGVector

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from langchain_google_vertexai import VertexAIEmbeddings
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores.pgvector import PGVector
from google.cloud import storage
import os
bucketname = os.getenv("BUCKET_NAME")
filename = os.getenv("FILE_NAME")

storage_client = storage.Client()
bucket = storage_client.bucket(bucketname)
blob = bucket.blob(filename)
blob.download_to_filename("/documents/" + filename)

loader = PyPDFLoader("/documents/" + filename)
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
documents = loader.load_and_split(text_splitter)
for document in documents:
    document.page_content = document.page_content.replace('\x00', '')

embeddings = VertexAIEmbeddings("text-embedding-005")

CONNECTION_STRING = PGVector.connection_string_from_db_params(
    driver="psycopg2",
    host=os.environ.get("POSTGRES_HOST"),
    port=5432,
    database=os.environ.get("DATABASE_NAME"),
    user=os.environ.get("USERNAME"),
    password=os.environ.get("PASSWORD"),
)
COLLECTION_NAME = os.environ.get("COLLECTION_NAME")

db = PGVector.from_documents(
    embedding=embeddings,
    documents=documents,
    collection_name=COLLECTION_NAME,
    connection_string=CONNECTION_STRING,
    use_jsonb=True
)

print(filename + " was successfully embedded") 
print(f"# of vectors = {len(documents)}")

Weaviate

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from langchain_google_vertexai import VertexAIEmbeddings
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
import weaviate
from weaviate.connect import ConnectionParams
from langchain_weaviate.vectorstores import WeaviateVectorStore
from google.cloud import storage
import os
bucketname = os.getenv("BUCKET_NAME")
filename = os.getenv("FILE_NAME")

storage_client = storage.Client()
bucket = storage_client.bucket(bucketname)
blob = bucket.blob(filename)
blob.download_to_filename("/documents/" + filename)

loader = PyPDFLoader("/documents/" + filename)
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
documents = loader.load_and_split(text_splitter)

embeddings = VertexAIEmbeddings("text-embedding-005")

auth_config = weaviate.auth.AuthApiKey(api_key=os.getenv("APIKEY"))
client = weaviate.WeaviateClient(
    connection_params=ConnectionParams.from_params(
        http_host=os.getenv("WEAVIATE_ENDPOINT"),
        http_port="80",
        http_secure=False,
        grpc_host=os.getenv("WEAVIATE_GRPC_ENDPOINT"),
        grpc_port="50051",
        grpc_secure=False,
    ),
    auth_client_secret=auth_config
)
client.connect()
if not client.collections.exists("trainingdocs"):
    collection = client.collections.create(name="trainingdocs")
db = WeaviateVectorStore.from_documents(documents, embeddings, client=client, index_name="trainingdocs")

print(filename + " was successfully embedded") 
print(f"# of vectors = {len(documents)}")

מידע על chat.py

קובץ ההגדרה הזה מגדיר את המודל לענות על שאלות רק באמצעות ההקשר שסופק והתשובות הקודמות. אם ההקשר או היסטוריית השיחה לא תואמים לאף נתון, המודל מחזיר I don't know.

Qdrant

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from flask import Flask, jsonify
from flask import request
import logging
import sys,os, time
from kubernetes import client, config, utils
import kubernetes.client
from kubernetes.client.rest import ApiException


app = Flask(__name__)
@app.route('/check')
def message():
    return jsonify({"Message": "Hi there"})


@app.route('/', methods=['POST'])
def bucket():
    request_data = request.get_json()
    print(request_data)
    bckt = request_data['bucket']
    f_name = request_data['name']
    id = request_data['generation'] 
    kube_create_job(bckt, f_name, id)
    return "ok"

# Set logging
logging.basicConfig(stream=sys.stdout, level=logging.INFO)

# Setup K8 configs
config.load_incluster_config()
def kube_create_job_object(name, container_image, bucket_name, f_name, namespace="qdrant", container_name="jobcontainer", env_vars={}):

    body = client.V1Job(api_version="batch/v1", kind="Job")
    body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
    body.status = client.V1JobStatus()

    template = client.V1PodTemplate()
    template.template = client.V1PodTemplateSpec()
    env_list = [
        client.V1EnvVar(name="QDRANT_URL", value=os.getenv("QDRANT_URL")),
        client.V1EnvVar(name="COLLECTION_NAME", value="training-docs"), 
        client.V1EnvVar(name="FILE_NAME", value=f_name), 
        client.V1EnvVar(name="BUCKET_NAME", value=bucket_name),
        client.V1EnvVar(name="APIKEY", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="api-key", name="qdrant-database-apikey"))), 
    ]

    container = client.V1Container(name=container_name, image=container_image, env=env_list)
    template.template.spec = client.V1PodSpec(containers=[container], restart_policy='Never', service_account='embed-docs-sa')

    body.spec = client.V1JobSpec(backoff_limit=3, ttl_seconds_after_finished=60, template=template.template)
    return body
def kube_test_credentials():
    try: 
        api_response = api_instance.get_api_resources()
        logging.info(api_response)
    except ApiException as e:
        print("Exception when calling API: %s\n" % e)

def kube_create_job(bckt, f_name, id):
    container_image = os.getenv("JOB_IMAGE")
    namespace = os.getenv("JOB_NAMESPACE")
    name = "docs-embedder" + id
    body = kube_create_job_object(name, container_image, bckt, f_name)
    v1=client.BatchV1Api()
    try: 
        v1.create_namespaced_job(namespace, body, pretty=True)
    except ApiException as e:
        print("Exception when calling BatchV1Api->create_namespaced_job: %s\n" % e)
    return

if __name__ == '__main__':
    app.run('0.0.0.0', port=5001, debug=True)

Elasticsearch

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from langchain_google_vertexai import ChatVertexAI
from langchain.prompts import ChatPromptTemplate
from langchain_google_vertexai import VertexAIEmbeddings
from langchain.memory import ConversationBufferWindowMemory
from elasticsearch import Elasticsearch
from langchain_community.vectorstores.elasticsearch import ElasticsearchStore
import streamlit as st
import os

vertexAI = ChatVertexAI(model_name=os.getenv("VERTEX_AI_MODEL_NAME", "gemini-2.5-flash-preview-04-17"), streaming=True, convert_system_message_to_human=True)
prompt_template = ChatPromptTemplate.from_messages(
    [
        ("system", "You are a helpful assistant who helps in finding answers to questions using the provided context."),
        ("human", """
        The answer should be based on the text context given in "text_context" and the conversation history given in "conversation_history" along with its Caption: \n
        Base your response on the provided text context and the current conversation history to answer the query.
        Select the most relevant information from the context.
        Generate a draft response using the selected information. Remove duplicate content from the draft response.
        Generate your final response after adjusting it to increase accuracy and relevance.
        Now only show your final response!
        If you do not know the answer or context is not relevant, response with "I don't know".

        text_context:
        {context}

        conversation_history:
        {history}

        query:
        {query}
        """),
    ]
)

embedding_model = VertexAIEmbeddings("text-embedding-005")

client = Elasticsearch(
    [os.getenv("ES_URL")], 
    verify_certs=False, 
    ssl_show_warn=False,
    basic_auth=("elastic", os.getenv("PASSWORD"))
)
vector_search = ElasticsearchStore(
    index_name=os.getenv("INDEX_NAME"),
    es_connection=client,
    embedding=embedding_model
)

def format_docs(docs):
    return "\n\n".join([d.page_content for d in docs])

st.title("🤖 Chatbot")
if "messages" not in st.session_state:
    st.session_state["messages"] = [{"role": "ai", "content": "How can I help you?"}]

if "memory" not in st.session_state:
    st.session_state["memory"] = ConversationBufferWindowMemory(
        memory_key="history",
        ai_prefix="Bot",
        human_prefix="User",
        k=3,
    )

for message in st.session_state.messages:
    with st.chat_message(message["role"]):
        st.write(message["content"])

if chat_input := st.chat_input():
    with st.chat_message("human"):
        st.write(chat_input)
        st.session_state.messages.append({"role": "human", "content": chat_input})

    found_docs = vector_search.similarity_search(chat_input)
    context = format_docs(found_docs)

    prompt_value = prompt_template.format_messages(name="Bot", query=chat_input, context=context, history=st.session_state.memory.load_memory_variables({}))
    with st.chat_message("ai"):
        with st.spinner("Typing..."):
            content = ""
            with st.empty():
                for chunk in vertexAI.stream(prompt_value):
                    content += chunk.content
                    st.write(content)
            st.session_state.messages.append({"role": "ai", "content": content})

    st.session_state.memory.save_context({"input": chat_input}, {"output": content})

PGVector

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from langchain_google_vertexai import ChatVertexAI
from langchain.prompts import ChatPromptTemplate
from langchain_google_vertexai import VertexAIEmbeddings
from langchain.memory import ConversationBufferWindowMemory
from langchain_community.vectorstores.pgvector import PGVector
import streamlit as st
import os

vertexAI = ChatVertexAI(model_name=os.getenv("VERTEX_AI_MODEL_NAME", "gemini-2.5-flash-preview-04-17"), streaming=True, convert_system_message_to_human=True)
prompt_template = ChatPromptTemplate.from_messages(
    [
        ("system", "You are a helpful assistant who helps in finding answers to questions using the provided context."),
        ("human", """
        The answer should be based on the text context given in "text_context" and the conversation history given in "conversation_history" along with its Caption: \n
        Base your response on the provided text context and the current conversation history to answer the query.
        Select the most relevant information from the context.
        Generate a draft response using the selected information. Remove duplicate content from the draft response.
        Generate your final response after adjusting it to increase accuracy and relevance.
        Now only show your final response!
        If you do not know the answer or context is not relevant, response with "I don't know".

        text_context:
        {context}

        conversation_history:
        {history}

        query:
        {query}
        """),
    ]
)

embedding_model = VertexAIEmbeddings("text-embedding-005")

CONNECTION_STRING = PGVector.connection_string_from_db_params(
    driver="psycopg2",
    host=os.environ.get("POSTGRES_HOST"),
    port=5432,
    database=os.environ.get("DATABASE_NAME"),
    user=os.environ.get("USERNAME"),
    password=os.environ.get("PASSWORD"),
)
COLLECTION_NAME = os.environ.get("COLLECTION_NAME"),

vector_search = PGVector(
    collection_name=COLLECTION_NAME,
    connection_string=CONNECTION_STRING,
    embedding_function=embedding_model,
)

def format_docs(docs):
    return "\n\n".join([d.page_content for d in docs])

st.title("🤖 Chatbot")
if "messages" not in st.session_state:
    st.session_state["messages"] = [{"role": "ai", "content": "How can I help you?"}]

if "memory" not in st.session_state:
    st.session_state["memory"] = ConversationBufferWindowMemory(
        memory_key="history",
        ai_prefix="Bot",
        human_prefix="User",
        k=3,
    )

for message in st.session_state.messages:
    with st.chat_message(message["role"]):
        st.write(message["content"])

if chat_input := st.chat_input():
    with st.chat_message("human"):
        st.write(chat_input)
        st.session_state.messages.append({"role": "human", "content": chat_input})

    found_docs = vector_search.similarity_search(chat_input)
    context = format_docs(found_docs)

    prompt_value = prompt_template.format_messages(name="Bot", query=chat_input, context=context, history=st.session_state.memory.load_memory_variables({}))
    with st.chat_message("ai"):
        with st.spinner("Typing..."):
            content = ""
            with st.empty():
                for chunk in vertexAI.stream(prompt_value):
                    content += chunk.content
                    st.write(content)
            st.session_state.messages.append({"role": "ai", "content": content})

    st.session_state.memory.save_context({"input": chat_input}, {"output": content})

Weaviate

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from langchain_google_vertexai import ChatVertexAI
from langchain.prompts import ChatPromptTemplate
from langchain_google_vertexai import VertexAIEmbeddings
from langchain.memory import ConversationBufferWindowMemory
import weaviate
from weaviate.connect import ConnectionParams
from langchain_weaviate.vectorstores import WeaviateVectorStore
import streamlit as st
import os

vertexAI = ChatVertexAI(model_name=os.getenv("VERTEX_AI_MODEL_NAME", "gemini-2.5-flash-preview-04-17"), streaming=True, convert_system_message_to_human=True)
prompt_template = ChatPromptTemplate.from_messages(
    [
        ("system", "You are a helpful assistant who helps in finding answers to questions using the provided context."),
        ("human", """
        The answer should be based on the text context given in "text_context" and the conversation history given in "conversation_history" along with its Caption: \n
        Base your response on the provided text context and the current conversation history to answer the query.
        Select the most relevant information from the context.
        Generate a draft response using the selected information. Remove duplicate content from the draft response.
        Generate your final response after adjusting it to increase accuracy and relevance.
        Now only show your final response!
        If you do not know the answer or context is not relevant, response with "I don't know".

        text_context:
        {context}

        conversation_history:
        {history}

        query:
        {query}
        """),
    ]
)

embedding_model = VertexAIEmbeddings("text-embedding-005")

auth_config = weaviate.auth.AuthApiKey(api_key=os.getenv("APIKEY"))
client = weaviate.WeaviateClient(
    connection_params=ConnectionParams.from_params(
        http_host=os.getenv("WEAVIATE_ENDPOINT"),
        http_port="80",
        http_secure=False,
        grpc_host=os.getenv("WEAVIATE_GRPC_ENDPOINT"),
        grpc_port="50051",
        grpc_secure=False,
    ),
    auth_client_secret=auth_config
)
client.connect()

vector_search = WeaviateVectorStore.from_documents([],embedding_model,client=client, index_name="trainingdocs")

def format_docs(docs):
    return "\n\n".join([d.page_content for d in docs])

st.title("🤖 Chatbot")
if "messages" not in st.session_state:
    st.session_state["messages"] = [{"role": "ai", "content": "How can I help you?"}]

if "memory" not in st.session_state:
    st.session_state["memory"] = ConversationBufferWindowMemory(
        memory_key="history",
        ai_prefix="Bot",
        human_prefix="User",
        k=3,
    )

for message in st.session_state.messages:
    with st.chat_message(message["role"]):
        st.write(message["content"])

if chat_input := st.chat_input():
    with st.chat_message("human"):
        st.write(chat_input)
        st.session_state.messages.append({"role": "human", "content": chat_input})

    found_docs = vector_search.similarity_search(chat_input)
    context = format_docs(found_docs)

    prompt_value = prompt_template.format_messages(name="Bot", query=chat_input, context=context, history=st.session_state.memory.load_memory_variables({}))
    with st.chat_message("ai"):
        with st.spinner("Typing..."):
            content = ""
            with st.empty():
                for chunk in vertexAI.stream(prompt_value):
                    content += chunk.content
                    st.write(content)
            st.session_state.messages.append({"role": "ai", "content": content})

    st.session_state.memory.save_context({"input": chat_input}, {"output": content})

הסרת המשאבים

כדי להימנע מחיובים בחשבון Google Cloud בגלל השימוש במשאבים שנעשה במסגרת המדריך הזה, אפשר למחוק את הפרויקט שמכיל את המשאבים, או להשאיר את הפרויקט ולמחוק את המשאבים בנפרד.

מחיקת הפרויקט

הדרך הקלה ביותר להימנע מחיוב היא למחוק את הפרויקט שיצרתם בשביל המדריך הזה.

כדי למחוק Google Cloud פרויקט:

gcloud projects delete PROJECT_ID

אם מחקתם את הפרויקט, סיימתם את הניקוי. אם לא מחקתם את הפרויקט, צריך למחוק את המשאבים בנפרד.

מחיקת משאבים בודדים

  1. מחיקת מאגר Artifact Registry:

    gcloud artifacts repositories delete ${KUBERNETES_CLUSTER_PREFIX}-images \
        --location=${REGION} \
        --async
    

    כשמופיעה בקשה, כותבים y.

  2. מחיקת קטגוריית Cloud Storage והטריגר של Eventarc:

    export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
    terraform -chdir=vector-database/terraform/cloud-storage destroy \
      -var project_id=${PROJECT_ID} \
      -var region=${REGION} \
      -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX} \
      -var db_namespace=${DB_NAMESPACE}
    

    כשמופיעה בקשה, כותבים yes.

    כדי ליצור ולמחוק נקודת יעד ב-Eventarc, צריך לוודא שהיא תקפה.

המאמרים הבאים