פיתוח סוכן נתונים באמצעות HTTP ו-Python

בדף הזה נסביר איך להשתמש ב-Python כדי לשלוח בקשות HTTP אל Conversational Analytics API (אפשר לגשת אליו דרך geminidataanalytics.googleapis.com).

בדף הזה יש דוגמאות לקוד Python שמראות איך לבצע את הפעולות הבאות:

בסוף הדף מופיעה גרסה מלאה של קוד לדוגמה, יחד עם פונקציות עזר שמשמשות להזרמת תגובת ה-API.

הגדרת ההגדרות הראשוניות והאימות

קוד Python לדוגמה שמבצע את המשימות האלה:

  • ייבוא של ספריות Python הנדרשות
  • קבלת אסימון גישה לאימות HTTP באמצעות Google Cloud CLI
  • הגדרת משתנים לפרויקט החיוב, למיקום ולהוראות המערכת
import json
import json as json_lib
import textwrap

import altair as alt
import google.auth
from google.auth.transport.requests import Request
import IPython
from IPython.display import display, HTML
import pandas as pd
from pygments import highlight, lexers, formatters
import requests

from google.colab import auth
auth.authenticate_user()

access_token = !gcloud auth application-default print-access-token
headers = {
        "Authorization": f"Bearer {access_token[0]}",
        "Content-Type": "application/json",
        "x-server-timeout": "300", # Custom timeout up to 600s
}

billing_project = 'YOUR-BILLING-PROJECT'
location = "global"
system_instruction = 'YOUR-SYSTEM-INSTRUCTIONS'

מחליפים את הערכים לדוגמה באופן הבא:

  • YOUR-BILLING-PROJECT: המזהה של פרויקט החיוב שבו הפעלתם את ממשקי ה-API הנדרשים.
  • YOUR-SYSTEM-INSTRUCTIONS: הוראות למערכת שמנחות את התנהגות הסוכן ומאפשרות להתאים אותה לצורכי הנתונים שלכם. לדוגמה, אתם יכולים להשתמש בהוראות מערכת כדי להגדיר מונחים עסקיים, לשלוט באורך התשובה או להגדיר את פורמט הנתונים. מומלץ להגדיר את ההוראות למערכת באמצעות פורמט ה-YAML המומלץ במאמר איך כותבים הוראות למערכת בצורה יעילה כדי לספק הנחיות מפורטות ומובְנות.

אימות ל-Looker

אם אתם מתכננים להתחבר למקור נתונים של Looker, תצטרכו לבצע אימות למכונה של Looker.

שימוש במפתחות API

בדוגמת הקוד הבאה ב-Python אפשר לראות איך מאמתים את הסוכן למופע Looker באמצעות מפתחות API.

looker_credentials = {
    "oauth": {
        "secret": {
            "client_id": "YOUR-LOOKER-CLIENT-ID",
            "client_secret": "YOUR-LOOKER-CLIENT-SECRET",
        }
    }
}

מחליפים את הערכים לדוגמה באופן הבא:

  • YOUR-LOOKER-CLIENT-ID: מזהה הלקוח של מפתח Looker API שנוצר.
  • YOUR-LOOKER-CLIENT-SECRET: ה-client secret של מפתח Looker API שנוצר.

שימוש באסימוני גישה

בדוגמת הקוד הבאה ב-Python אפשר לראות איך מאמתים את הסוכן למופע Looker באמצעות טוקנים של גישה.

looker_credentials = {
    "oauth": {
        "token": {
            "access_token": "YOUR-TOKEN",
        }
    }
}

מחליפים את הערכים לדוגמה באופן הבא:

  • YOUR-TOKEN: הערך access_token שיוצרים כדי לבצע אימות ב-Looker.

אימות ב-AlloyDB ל-PostgreSQL

אם אתם מתכננים להתחבר למקור נתונים של AlloyDB, אתם צריכים להשתמש ב-CLI של gcloud כדי לבצע אימות למכונת AlloyDB.

  1. כדי להפעיל אימות של ניהול זהויות והרשאות גישה (IAM) ב-AlloyDB, צריך להפעיל את הדגל alloydb.iam_authentication במופע AlloyDB.
  2. הקצאת תפקידים ב-IAM. למשתמש או לחשבון השירות שקוראים ל-Conversational Analytics API צריכות להיות הרשאות IAM מתאימות בGoogle Cloud פרויקט. בדרך כלל צוות כזה כולל תפקידים כמו alloydb.databaseUser.
  3. יוצרים משתמש מסד נתונים תואם. יוצרים משתמש ב-AlloyDB שמשקף את הישות המורשית ב-IAM, באמצעות כתובת האימייל בחשבון של המשתמש או של חשבון השירות כשם המשתמש.

מידע נוסף זמין במאמר בנושא אימות מסד נתונים באמצעות IAM.

אימות ב-Cloud SQL ל-MySQL וב-Cloud SQL ל-PostgreSQL

אם אתם מתכננים להתחבר למקור נתונים של Cloud SQL ל-MySQL או Cloud SQL ל-PostgreSQL, אתם צריכים לבצע אימות למכונה של Cloud SQL.

  1. מפעילים אימות מסד נתונים של IAM. מוודאים שהדגל cloudsql.iam_authentication מוגדר ל-on במופע Cloud SQL ל-MySQL. אפשר להוסיף את זה כשיוצרים מופע או כשמבצעים תיקון למופע קיים.

    gcloud sql instances patch INSTANCE_NAME --database-flags
    cloudsql.iam_authentication=on
    
  2. מעניקים את התפקידים הנדרשים ב-IAM.

    לפרינציפל (משתמש או חשבון שירות) שמנסה להתחבר צריכות להיות הרשאות ה-IAM‏ cloudsql.instances.login ו-cloudsql.instances.connect בפרויקט. כדי להעניק את התפקידים וההרשאות האלה, אפשר לבצע אחת מהפעולות הבאות:

    1. הקצאת התפקיד roles/cloudsql.instanceUser. ההרשאות האלה כוללות הרשאה להיכנס למופע (cloudsql.instances.login) ולהתחבר באמצעות ה-proxy (cloudsql.instances.connect).
    2. אפשרות אחרת היא להקצות תפקיד בהתאמה אישית שכולל לפחות את ההרשאות cloudsql.instances.login ו-cloudsql.instances.connect.
  3. מוסיפים את הגורם הראשי ב-IAM כמשתמש במסד הנתונים. יוצרים משתמש במופע Cloud SQL שמתאים לכתובת האימייל של הגורם המורשה ב-IAM.

    כדי להשתמש בחשבון שירות, מריצים את הפקודה הבאה:

    gcloud sql users create SERVICE_ACCOUNT_EMAIL --instance=INSTANCE_NAME
    --type=cloud_iam_service_account
    

    כדי להגדיר חשבון משתמש, מריצים את הפקודה הבאה:

    gcloud sql users create USER_EMAIL --instance=INSTANCE_NAME
    --type=cloud_iam_user
    
  4. הענקת הרשאות למסד הנתונים. התחברות באמצעות IAM מאמתת את המשתמש במסד הנתונים, אבל לא מעניקה הרשאות במסד הנתונים באופן אוטומטי. צריך להשתמש בפקודות PostgreSQL רגילות – כמו GRANT – כדי לתת למשתמש החדש במסד הנתונים הרשאות לאובייקטים ספציפיים כמו טבלאות וסכימות.

  5. מתחברים בתור משתמש על – כמו משתמש postgres שמוגדר כברירת מחדל – ומריצים את הפקודות הבאות:

    -- Example: Connect using psql as the 'postgres' user
    
    GRANT SELECT ON ALL TABLES IN SCHEMA public TO
    "user-or-service-account@example.com";
    
  6. מתחברים באמצעות אימות IAM.

    בדרך כלל, לקוחות מתחברים דרך שרת proxy ל-Cloud SQL Auth או דרך ספריית מחברים שמטפלת בהחלפת האסימונים. שם המשתמש במסד הנתונים הוא כתובת האימייל של הגורם המורשה ב-IAM. לא נעשה שימוש בסיסמאות. במקום זאת, נעשה שימוש באסימון OAuth 2.0, שבדרך כלל מטופל באופן אוטומטי על ידי שרת ה-proxy ל-Cloud SQL Auth או על ידי Google Cloud CLI.

    בדוגמה הבאה נעשה שימוש ב-psql וב-CLI של gcloud כדי ליצור טוקן:

    PGPASSWORD=$(gcloud sql generate-login-token) psql --host=HOST_IP \
    --username=IAM_PRINCIPAL_EMAIL --dbname=DATABASE_NAME\
    

    מידע נוסף זמין במאמר אימות IAM.

    מחליפים את הערכים לדוגמה באופן הבא:

    • INSTANCE_NAME: המזהה של מופע Cloud SQL שרוצים להחיל עליו תיקון כדי להפעיל אימות מסד נתונים של IAM.
    • SERVICE_ACCOUNT_EMAIL: כתובת האימייל של חשבון השירות.
    • USER_EMAIL: כתובת האימייל של חשבון המשתמש.
    • HOST_IP: כתובת ה-IP של מופע Cloud SQL.
    • IAM_PRINCIPAL_EMAIL: כתובת האימייל של הגורם המרכזי ב-IAM – לדוגמה, חשבון שירות או משתמש.
    • DATABASE_NAME: השם של מסד הנתונים שאליו רוצים להתחבר.

אימות ב-Spanner

אם אתם מתכננים להתחבר למקור נתונים של Spanner, אתם צריכים לבצע אימות למופע Spanner.

  1. הקצאת תפקידים ב-IAM. למשתמש או לחשבון השירות שקוראים ל-Conversational Analytics API צריכות להיות הרשאות IAM מתאימות בGoogle Cloud פרויקט. בדרך כלל צוות כזה כולל תפקידים כמו databaseUser.
  2. יוצרים תפקידי מסד נתונים מופשטים תואמים ב-Spanner באמצעות שפת הגדרת נתונים (DDL).
  3. משתמשים בכללי המדיניות ב-IAM כדי לאפשר לחשבונות משתמשים ב-IAM לקבל את תפקידי מסד הנתונים האלה.

מידע נוסף זמין במאמר הרשאות בקרת גישה ברמת פירוט גבוהה.

התחברות למקור נתונים

בקטעים הבאים מוסבר איך להגדיר את פרטי החיבור למקורות הנתונים של הסוכן. הסוכן יכול להתחבר לנתונים בדרכים הבאות:

חיבור לנתונים ב-Looker

בדוגמת הקוד הבאה מוגדר חיבור ל-Looker Explore. כדי ליצור חיבור למופע Looker, צריך לוודא שיצרתם מפתחות Looker API, כמו שמתואר במאמר אימות וחיבור למקור נתונים באמצעות Conversational Analytics API. אפשר להתחבר לעד חמישה Looker Explores בכל פעם באמצעות Conversational Analytics API.

כשמתחברים למקור נתונים של Looker, חשוב לשים לב לנקודות הבאות:

  • אתם יכולים לשאול שאלות על כל דוח Explore שכלול בשיחה.
  • סוכן יכול לשלוח שאילתה רק ל-Explore אחד בכל פעם. אי אפשר להריץ שאילתות בכמה ניתוחים בו-זמנית.
  • נציג יכול לשאול שאלות בכמה ניתוחים באותה שיחה.
  • סוכן יכול לשלוח שאילתות לכמה ניתוחים בשיחה שכוללת שאלות עם כמה חלקים, או בשיחות שכוללות שאלות המשך.

    לדוגמה: משתמש מחבר שני דוחות Explore, אחד בשם cat-explore ואחד בשם dog-explore. המשתמש מזין את השאלה 'מה גדול יותר: מספר החתולים או מספר הכלבים?' כך יישאלו שתי שאילתות: אחת לספירת מספר החתולים ב-cat-explore ואחת לספירת מספר הכלבים ב-dog-explore. הנציג משווה את המספר משתי השאילתות אחרי השלמתן.

קוד לדוגמה הבא מגדיר חיבור לכמה Looker Explores. כדי לשפר את הביצועים של הסוכן, אתם יכולים לספק שאילתות לדוגמה כהקשר מובנה לניתוחים שלכם. מידע נוסף על הגדרת ההקשר של סוכן נתונים למקורות נתונים של Looker

looker_instance_uri = "https://your_company.looker.com"
looker_data_source = {
    "looker": {
        "explore_references": [
            {
                "looker_instance_uri": looker_instance_uri,
                "lookml_model": "your_model",
                "explore": "your_explore",
            },
            {
                "looker_instance_uri": looker_instance_uri,
                "lookml_model": "your_model_2",
                "explore": "your_explore_2",
            },
            # Add up to 5 total Explore references
        ],
       # Do not include the following line during agent creation
       "credentials": looker_credentials
    }
}

מחליפים את הערכים לדוגמה באופן הבא:

  • https://your_company.looker.com: כתובת ה-URL המלאה של מופע Looker.
  • your_model: השם של מודל LookML שכולל את כלי הניתוח שרוצים להתחבר אליו.
  • your_explore: השם של Looker Explore שרוצים שהסוכן לגישה לנתונים ישלח אליו שאילתה.
  • your_model_2: השם של מודל LookML השני שכולל את התכונה Explore שאליה רוצים להתחבר. אפשר לחזור על המשתנה הזה עבור מודלים נוספים, עד חמישה ניתוחים.
  • your_explore_2: השם של Looker Explore נוסף שרוצים שהסוכן לגישה לנתונים ישלח אליו שאילתה. אפשר לחזור על המשתנה הזה כדי לכלול עד חמישה ניתוחים.

התחברות לנתוני BigQuery

ב-Conversational Analytics API, אין מגבלות קשיחות על מספר הטבלאות ב-BigQuery שאפשר להתחבר אליהן. עם זאת, חיבור למספר גדול של טבלאות עלול להפחית את רמת הדיוק או לגרום לחריגה ממגבלת הטוקנים של הקלט במודל.

בדוגמת קוד לדוגמה הבאה מוגדר חיבור לכמה טבלאות ב-BigQuery, והיא כוללת דוגמאות לשדות אופציונליים של הקשר מובנה. כדי לשפר את הביצועים של הסוכן, אתם יכולים לספק הקשר מובנה לטבלאות BigQuery, כמו תיאורים של טבלאות ועמודות, מילים נרדפות, תגים ושאילתות לדוגמה. מידע נוסף זמין במאמר בנושא הגדרת ההקשר של סוכן הנתונים למקורות נתונים של BigQuery.

bigquery_data_sources = {
    "bq": {
        "tableReferences": [
            {
                "projectId": "my_project_id",
                "datasetId": "my_dataset_id",
                "tableId": "my_table_id",
                "schema": {
                    "description": "my_table_description",
                    "fields": [{
                        "name": "my_column_name",
                        "description": "my_column_description"
                    }]
                }
            },
            {
                "projectId": "my_project_id_2",
                "datasetId": "my_dataset_id_2",
                "tableId": "my_table_id_2"
            },
            {
                "projectId": "my_project_id_3",
                "datasetId": "my_dataset_id_3",
                "tableId": "my_table_id_3"
            },
        ]
    }
}

מחליפים את הערכים לדוגמה באופן הבא:

  • my_project_id: המזהה של הפרויקט Google Cloud שמכיל את מערך הנתונים והטבלה ב-BigQuery שאליהם רוצים להתחבר. כדי להתחבר למערך נתונים ציבורי, מציינים את bigquery-public-data.
  • my_dataset_id: המזהה של מערך הנתונים ב-BigQuery.
  • my_table_id: המזהה של הטבלה ב-BigQuery.
  • my_table_description: תיאור אופציונלי של תוכן הטבלה והמטרה שלה.
  • my_column_name: שם העמודה בטבלה שרוצים לספק לה תיאור אופציונלי.
  • my_column_description: תיאור אופציונלי של התוכן והמטרה של העמודה.

התחברות לנתונים של Data Studio

בדוגמת הקוד הבאה מוגדר חיבור למקור נתונים ב-Data Studio.

looker_studio_data_source = {
    "studio":{
        "studio_references": [
            {
              "studio_datasource_id": "studio_datasource_id"
            }
        ]
    }
}

מחליפים את studio_datasource_id במזהה של מקור הנתונים.

חיבור לנתוני AlloyDB

ב-Conversational Analytics API נעשה שימוש בשדה database_reference בשדה alloydb כדי להתחבר לאשכול AlloyDB.

בדוגמה הבאה מוגדר חיבור למסד נתונים של AlloyDB.

alloydb_data_source = {
    "alloydb": {
        "database_reference":
          {
            "project_id":"PROJECT_ID",
            "region":"REGION",
            "cluster_id":"CLUSTER_ID",
            "instance_id":"INSTANCE_ID",
            "database_id":"DATABASE",
            "table_ids":["TABLE_1_ID", "TABLE_2_ID"]
          }
          # Optional: Include this if you have pre-authored context for the agent
          # "agent_context_reference": {
          #     "context_set_id": f"projects/billing_project/locations/location/contextSets/your_context_set_id"
          # }

    }
}

מחליפים את הערכים לדוגמה באופן הבא:

  • PROJECT_ID: מזהה Google Cloud הפרויקט שמכיל את אשכול AlloyDB.
  • REGION: האזור שבו נמצא אשכול AlloyDB, לדוגמה us-central1.
  • CLUSTER_ID: המזהה של אשכול AlloyDB.
  • INSTANCE_ID: המזהה של מופע AlloyDB.
  • DATABASE: השם של מסד הנתונים של היעד – לדוגמה, financial.
  • TABLE_1_ID, TABLE_2_ID: אופציונלי. רשימה של הטבלאות במסד הנתונים שהסוכן הדיגיטלי מציע להשתמש בהן – לדוגמה, "loan", "client", "disp".
  • billing_project: המזהה של פרויקט החיוב שבו הפעלתם את ממשקי ה-API הנדרשים.
  • location: המיקום שבו נמצאת קבוצת ההקשר. לדוגמה: global.
  • your_context_set_id: אם יצרתם הקשר מתקדם לסוכן, המזהה של קבוצת ההקשר.

חיבור לנתונים של Cloud SQL ל-MySQL ו-Cloud SQL ל-PostgreSQL

ה-API של ניתוח שיחות משתמש בשדה database_reference בשדה sql כדי להתחבר למופע Cloud SQL.

בדוגמה הבאה מוגדר חיבור למסד נתונים של Cloud SQL ל-MySQL או של Cloud SQL ל-PostgreSQL.

sql_data_source = {
    "cloud_sql_reference": {
        "database_reference":
          {
            "project_id":"PROJECT_ID",
            "region":"REGION",
            "cluster_id":"CLUSTER_ID",
            "instance_id":"INSTANCE_ID",
            "database_id":"DATABASE_ID",
            "table_ids":["TABLE_1_ID", "TABLE_2_ID"]
          }
          # Optional: Include this if you have pre-authored context for the agent
          # "agent_context_reference": {
          #     "context_set_id": f"projects/billing_project/locations/location/contextSets/your_context_set_id"
          # }
    }
}

מחליפים את הערכים לדוגמה באופן הבא:

  • PROJECT_ID: המזהה של Google Cloud הפרויקט שמכיל את מופע Cloud SQL.
  • REGION: האזור שבו נמצא מופע Cloud SQL – לדוגמה, us-central1.
  • CLUSTER_ID: המזהה של אשכול Cloud SQL.
  • INSTANCE_ID: המזהה של מופע Cloud SQL.
  • DATABASE_ID: השם של מסד הנתונים של היעד – לדוגמה, financial.
  • TABLE_1_ID, TABLE_2_ID: אופציונלי. רשימה של הטבלאות במסד הנתונים שהסוכן הדיגיטלי מציע להשתמש בהן – לדוגמה, "loan", "client", "disp".
  • billing_project: המזהה של פרויקט החיוב שבו הפעלתם את ממשקי ה-API הנדרשים.
  • location: המיקום שבו נמצאת קבוצת ההקשר. לדוגמה: global.
  • your_context_set_id: אם יצרתם הקשר מתקדם לסוכן, המזהה של קבוצת ההקשר.

חיבור לנתוני Spanner

‫Conversational Analytics API משתמש בשדה database_reference בשדה spanner כדי להתחבר למופע Spanner.

בדוגמה הבאה מוגדר חיבור למסד נתונים של Spanner.

spanner_data_sources = {
    "spanner_reference": {
        "database_reference": {
            "project_id": "PROJECT_ID",
            "region": location,
            "engine": GOOGLE_SQL,
            "instance_id": "INSTANCE_ID",
            "database_id": "DATABASE",
            "table_ids":["TABLE_1_ID", "TABLE_2_ID"]
        },
        # Optional: Include this if you have pre-authored context for the agent
        # "agent_context_reference": {
        #     "context_set_id": f"projects/billing_project/locations/location/contextSets/your_context_set_id"
        # }
    }
}

מחליפים את הערכים לדוגמה באופן הבא:

  • PROJECT_ID: מזהה Google Cloud הפרויקט שמכיל את מופע Spanner.
  • INSTANCE_ID: המזהה של מכונת Spanner.
  • DATABASE: השם של מסד הנתונים של היעד – לדוגמה, financial.
  • TABLE_1_ID, TABLE_2_ID: רשימה של הטבלאות במסד הנתונים שהסוכן לניהול נתונים מציע להשתמש בהן – לדוגמה, "loan", "client", "disp".
  • billing_project: המזהה של פרויקט החיוב שבו הפעלתם את ממשקי ה-API הנדרשים.
  • location: המיקום שבו נמצאת קבוצת ההקשר. לדוגמה: global.
  • your_context_set_id: אם יצרתם הקשר מתקדם לסוכן, המזהה של קבוצת ההקשר.

יצירת סוכן נתונים

דוגמת קוד לדוגמה הבאה מדגימה איך ליצור את סוכן הנתונים באופן סינכרוני או אסינכרוני על ידי שליחת בקשת HTTP POST לנקודת הקצה ליצירת סוכן נתונים. מטען הייעודי (payload) של הבקשה כולל את הפרטים הבאים:

  • השם המלא של המשאב של הסוכן. הערך הזה כולל את מזהה הפרויקט, המיקום ומזהה ייחודי של הסוכן.
  • תיאור של סוכן הנתונים.
  • ההקשר של סוכן הנתונים, כולל תיאור המערכת (מוגדר בקטע הגדרת ההגדרות הראשוניות והאימות) ומקור הנתונים שבו הסוכן משתמש (מוגדר בקטע קישור למקור נתונים).

אפשר להגן על סוכן הנתונים באמצעות מפתחות הצפנה בניהול הלקוח (CMEK) על ידי אספקת kms_key במהלך היצירה. הצפנה באמצעות מפתח משל לקוח נתמכת רק במקורות נתונים של Looker. מידע נוסף מופיע במאמר בנושא מפתחות הצפנה בניהול הלקוח (CMEK).

אפשר גם להפעיל ניתוח מתקדם באמצעות Python על ידי הוספת הפרמטר options למטען הייעודי (payload) של הבקשה. מידע נוסף על הפרמטר options ועל האפשרויות שאפשר להגדיר לשיחה זמין במאמר REST Resource: projects.locations.dataAgents.

סינכרוני

data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents:createSync"

data_agent_id = "data_agent_1"

# Optional: If using CMEK, replace the empty strings with your KMS key details.
kms_project_id = ""  # KMS_PROJECT_ID (Defaults to billing_project if empty)
key_ring_name = ""   # KEY_RING_NAME
key_name = ""        # KEY_NAME

data_agent_payload = {
      "name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional
      "description": "This is the description of data_agent_1.", # Optional
      "data_analytics_agent": {
          "published_context": {
              "datasource_references": bigquery_data_sources,
              "system_instruction": system_instruction,
              # Optional: To enable advanced analysis with Python, include the following options block:
              "options": {
                  "analysis": {
                      "python": {
                          "enabled": True
                      }
                  }
              }
          }
      }
  }

# If key details are provided, construct and add the kms_key field.
if key_ring_name and key_name:
  if not kms_project_id:
    kms_project_id = billing_project
  data_agent_payload["kms_key"] = f"projects/{kms_project_id}/locations/{location}/keyRings/{key_ring_name}/cryptoKeys/{key_name}"

params = {"data_agent_id": data_agent_id} # Optional

data_agent_response = requests.post(
    data_agent_url, params=params, json=data_agent_payload, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agent created successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error creating Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

אסינכרוני

data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents"

data_agent_id = "data_agent_1"

# Optional: If using CMEK, replace the empty strings with your KMS key details.
kms_project_id = ""  # KMS_PROJECT_ID (Defaults to billing_project if empty)
key_ring_name = ""   # KEY_RING_NAME
key_name = ""        # KEY_NAME

data_agent_payload = {
      "name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional
      "description": "This is the description of data_agent_1.", # Optional
      "data_analytics_agent": {
          "published_context": {
              "datasource_references": bigquery_data_sources,
              "system_instruction": system_instruction,
              # Optional: To enable advanced analysis with Python, include the following options block:
              "options": {
                  "analysis": {
                      "python": {
                          "enabled": True
                      }
                  }
              }
          }
      }
  }

# If key details are provided, construct and add the kms_key field.
if key_ring_name and key_name:
  if not kms_project_id:
    kms_project_id = billing_project
  data_agent_payload["kms_key"] = f"projects/{kms_project_id}/locations/{location}/keyRings/{key_ring_name}/cryptoKeys/{key_name}"

params = {"data_agent_id": data_agent_id} # Optional

data_agent_response = requests.post(
    data_agent_url, params=params, json=data_agent_payload, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agent created successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error creating Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

מחליפים את הערכים לדוגמה באופן הבא:

  • data_agent_1: מזהה ייחודי של סוכן הנתונים. הערך הזה משמש בשם המשאב של הסוכן וכפרמטר השאילתה של כתובת האתר data_agent_id.
  • KMS_PROJECT_ID: אם משתמשים במפתח הצפנה שבניהול הלקוח (CMEK), מזהה הפרויקט שבו מתארח המפתח. אם לא תציינו פרויקט, ברירת המחדל תהיה פרויקט החיוב שלכם.
  • KEY_RING_NAME: אם משתמשים ב-CMEK, השם של אוסף המפתחות ב-Cloud KMS.
  • KEY_NAME: אם משתמשים ב-CMEK, השם של מפתח Cloud KMS.
  • This is the description of data_agent_1.: תיאור של סוכן הנתונים.

יצירת שיחה

בדוגמת הקוד הבאה אפשר לראות איך יוצרים שיחה עם סוכן הנתונים.

אפשר גם להגן על השיחה באמצעות CMEK על ידי ציון kms_key במהלך היצירה. הצפנה באמצעות מפתח משל לקוח נתמכת רק במקורות נתונים של Looker. מידע נוסף מופיע במאמר בנושא מפתחות הצפנה בניהול הלקוח (CMEK).

conversation_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/conversations"

data_agent_id = "data_agent_1"
conversation_id = "conversation_1"

# Optional: If using CMEK, replace the empty strings with your KMS key details.
kms_project_id = ""  # KMS_PROJECT_ID (Defaults to billing_project if empty)
key_ring_name = ""   # KEY_RING_NAME
key_name = ""        # KEY_NAME

conversation_payload = {
    "agents": [
        f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
    ],
    "name": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}",
}

# If key details are provided, construct and add the kms_key field.
if key_ring_name and key_name:
  if not kms_project_id:
    kms_project_id = billing_project
  conversation_payload["kms_key"] = f"projects/{kms_project_id}/locations/{location}/keyRings/{key_ring_name}/cryptoKeys/{key_name}"

params = {
    "conversation_id": conversation_id
}

conversation_response = requests.post(conversation_url, headers=headers, params=params, json=conversation_payload)

if conversation_response.status_code == 200:
    print("Conversation created successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error creating Conversation: {conversation_response.status_code}")
    print(conversation_response.text)

מחליפים את הערכים לדוגמה באופן הבא:

  • data_agent_1: המזהה של סוכן הנתונים, כפי שמוגדר בבלוק הקוד לדוגמה במאמר יצירת סוכן נתונים.
  • conversation_1: מזהה ייחודי של השיחה.
  • KMS_PROJECT_ID: אם משתמשים במפתח הצפנה שבניהול הלקוח (CMEK), מזהה הפרויקט שבו מתארח המפתח. אם לא תציינו פרויקט, ברירת המחדל תהיה פרויקט החיוב שלכם.
  • KEY_RING_NAME: אם משתמשים ב-CMEK, השם של אוסף המפתחות ב-Cloud KMS.
  • KEY_NAME: אם משתמשים ב-CMEK, השם של מפתח Cloud KMS.

ניהול סוכני נתונים ושיחות

בדוגמאות הקוד הבאות אפשר לראות איך לנהל את סוכני הנתונים והשיחות באמצעות Conversational Analytics API. אפשר לבצע את המשימות הבאות:

קבלת סוכן נתונים

בדוגמת הקוד הבאה אפשר לראות איך מאחזרים סוכן נתונים קיים על ידי שליחת בקשת HTTP GET לכתובת ה-URL של משאב סוכן הנתונים.

data_agent_id = "data_agent_1"
data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"

data_agent_response = requests.get(
    data_agent_url, headers=headers
)

if data_agent_response.status_code == 200:
    print("Fetched Data Agent successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error: {data_agent_response.status_code}")
    print(data_agent_response.text)

בדוגמה הקודמת, מחליפים את data_agent_1 במזהה של סוכן הנתונים שרוצים לאחזר.

הצגת רשימה של סוכני נתונים

בדוגמת הקוד הבאה אפשר לראות איך לשלוח בקשת HTTP‏ GET לנקודת הקצה dataAgents כדי להציג רשימה של כל סוכני הנתונים בפרויקט נתון.

כדי לראות את רשימת כל הסוכנים, צריך את ההרשאה geminidataanalytics.dataAgents.list בפרויקט. למידע נוסף על תפקידי IAM שכוללים את ההרשאה הזו, אפשר לעיין ברשימת התפקידים המוגדרים מראש.

billing_project = "YOUR-BILLING-PROJECT"
data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents"

data_agent_response = requests.get(
    data_agent_url, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agents Listed successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Listing Data Agents: {data_agent_response.status_code}")

מחליפים את YOUR-BILLING-PROJECT במזהה פרויקט החיוב.

הצגת רשימה של סוכני נתונים שאפשר לגשת אליהם

בדוגמת הקוד הבאה מוצג איך לשלוח בקשת HTTP GET לנקודת הקצה dataAgents:listAccessible כדי לקבל רשימה של כל סוכני הנתונים שאפשר לגשת אליהם בפרויקט נתון.

billing_project = "YOUR-BILLING-PROJECT"
creator_filter = "YOUR-CREATOR-FILTER"
data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents:listAccessible"

params = {
    "creator_filter": creator_filter
}

data_agent_response = requests.get(
    data_agent_url, headers=headers, params=params
)

if data_agent_response.status_code == 200:
    print("Accessible Data Agents Listed successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Listing Accessible Data Agents: {data_agent_response.status_code}")

מחליפים את הערכים לדוגמה באופן הבא:

  • YOUR-BILLING-PROJECT: מזהה הפרויקט לחיוב.
  • YOUR-CREATOR-FILTER: המסנן להחלה על סמך יוצר סוכן הנתונים. הערכים האפשריים כוללים NONE (ברירת מחדל), CREATOR_ONLY ו-NOT_CREATOR_ONLY.

עדכון סוכן נתונים

בדוגמת הקוד הבאה אפשר לראות איך מעדכנים סוכן נתונים באופן סינכרוני או אסינכרוני על ידי שליחת בקשת HTTP PATCH לכתובת ה-URL של משאב סוכן הנתונים. מטען הייעודי (payload) של הבקשה כולל את הערכים החדשים של השדות שרוצים לשנות, והפרמטרים של הבקשה כוללים פרמטר updateMask שמציין את השדות שצריך לעדכן.

סינכרוני

data_agent_id = "data_agent_1"
billing_project = "YOUR-BILLING-PROJECT"

data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:updateSync"

payload = {
    "description": "Updated description of the data agent.",
    "data_analytics_agent": {
        "published_context": {
            "datasource_references": bigquery_data_sources,
            "system_instruction": system_instruction
        }
    },
}

fields = ["description", "data_analytics_agent"]
params = {
    "updateMask": ",".join(fields)
}

data_agent_response = requests.patch(
    data_agent_url, headers=headers, params=params, json=payload
)

if data_agent_response.status_code == 200:
    print("Data Agent updated successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Updating Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

אסינכרוני

data_agent_id = "data_agent_1"
billing_project = "YOUR-BILLING-PROJECT"

data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"

payload = {
    "description": "Updated description of the data agent.",
    "data_analytics_agent": {
        "published_context": {
            "datasource_references": bigquery_data_sources,
            "system_instruction": system_instruction
        }
    },
}

fields = ["description", "data_analytics_agent"]
params = {
    "updateMask": ",".join(fields)
}

data_agent_response = requests.patch(
    data_agent_url, headers=headers, params=params, json=payload
)

if data_agent_response.status_code == 200:
    print("Data Agent updated successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Updating Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

מחליפים את הערכים לדוגמה באופן הבא:

  • data_agent_1: המזהה של סוכן הנתונים שרוצים לעדכן.
  • YOUR-BILLING-PROJECT: מזהה הפרויקט לחיוב.
  • Updated description of the data agent.: התיאור החדש של סוכן הנתונים.

קבלת מדיניות IAM לסוכן נתונים

בדוגמת הקוד הבאה אפשר לראות איך לאחזר את מדיניות IAM של סוכן נתונים על ידי שליחת בקשת HTTP POST לכתובת ה-URL של סוכן הנתונים. המטען הייעודי (payload) של הבקשה כולל את הנתיב של סוכן הנתונים.

billing_project = "YOUR-BILLING-PROJECT"
data_agent_id = "data_agent_1"

data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:getIamPolicy"

# Request body
payload = {
    "resource": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
}

data_agent_response = requests.post(
    data_agent_url, headers=headers, json=payload
)

if data_agent_response.status_code == 200:
    print("IAM Policy fetched successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error fetching IAM policy: {data_agent_response.status_code}")
    print(data_agent_response.text)

מחליפים את הערכים לדוגמה באופן הבא:

  • YOUR-BILLING-PROJECT: מזהה הפרויקט לחיוב.
  • data_agent_1: המזהה של סוכן הנתונים שעבורו רוצים לקבל את מדיניות ה-IAM.

הגדרת מדיניות IAM לסוכן נתונים

כדי לשתף סוכן, אפשר להשתמש בשיטה setIamPolicy כדי להקצות תפקידי IAM למשתמשים בסוכן ספציפי. בדוגמת קוד לדוגמה הבאה אפשר לראות איך לבצע קריאה מסוג POST לכתובת ה-URL של סוכן הנתונים עם מטען ייעודי (payload) שכולל קישורים. הקישור מציין אילו תפקידים צריך להקצות לאילו משתמשים.

billing_project = "YOUR-BILLING-PROJECT"
data_agent_id = "data_agent_1"
role = "roles/geminidataanalytics.dataAgentEditor"
users = "222larabrown@gmail.com, cloudysanfrancisco@gmail.com"

data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:setIamPolicy"

# Request body
payload = {
    "policy": {
        "bindings": [
            {
                "role": role,
                "members": [
                    f"user:{i.strip()}" for i in users.split(",")
                ]
            }
        ]
    }
}

data_agent_response = requests.post(
    data_agent_url, headers=headers, json=payload
)

if data_agent_response.status_code == 200:
    print("IAM Policy set successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error setting IAM policy: {data_agent_response.status_code}")
    print(data_agent_response.text)

מחליפים את הערכים לדוגמה באופן הבא:

  • YOUR-BILLING-PROJECT: מזהה הפרויקט לחיוב.
  • data_agent_1: המזהה של סוכן הנתונים שעבורו רוצים להגדיר את מדיניות ה-IAM.
  • 222larabrown@gmail.com, cloudysanfrancisco@gmail.com: רשימה מופרדת בפסיקים של כתובות אימייל של משתמשים שרוצים להקצות להם את התפקיד שצוין.

מחיקה של סוכן נתונים

בדוגמת הקוד הבאה אפשר לראות איך לבצע מחיקה רכה של סוכן נתונים באופן סינכרוני או אסינכרוני על ידי שליחת בקשת HTTP DELETE לכתובת ה-URL של משאב סוכן הנתונים. מחיקה רכה פירושה שהסוכן נמחק אבל עדיין אפשר לשחזר אותו תוך 30 יום.

סינכרוני

billing_project = "YOUR-BILLING-PROJECT"
data_agent_id = "data_agent_1"

data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:deleteSync"

data_agent_response = requests.delete(
    data_agent_url, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agent deleted successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Deleting Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

אסינכרוני

billing_project = "YOUR-BILLING-PROJECT"
data_agent_id = "data_agent_1"

data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"

data_agent_response = requests.delete(
    data_agent_url, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agent deleted successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Deleting Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

מחליפים את הערכים לדוגמה באופן הבא:

  • YOUR-BILLING-PROJECT: מזהה הפרויקט לחיוב.
  • data_agent_1: המזהה של סוכן הנתונים שרוצים למחוק.

קבלת שיחה

בדוגמת הקוד הבאה אפשר לראות איך מאחזרים שיחה קיימת על ידי שליחת בקשת HTTP GET לכתובת ה-URL של משאב השיחה.

billing_project = "YOUR-BILLING-PROJECT"
conversation_id = "conversation_1"

conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations/{conversation_id}"

conversation_response = requests.get(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation fetched successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while fetching conversation: {conversation_response.status_code}")
    print(conversation_response.text)

מחליפים את הערכים לדוגמה באופן הבא:

  • YOUR-BILLING-PROJECT: מזהה הפרויקט לחיוב.
  • conversation_1: המזהה של השיחה שרוצים לאחזר.

רשימת השיחות

בדוגמת קוד לדוגמה הבאה אפשר לראות איך לשלוח בקשת HTTP‏ GET לנקודת הקצה conversations כדי להציג רשימה של שיחות בפרויקט נתון.

כברירת מחדל, השיטה הזו מחזירה את השיחות שיצרתם. אדמינים (משתמשים עם cloudaicompanion.topicAdmin תפקיד IAM) יכולים לראות את כל השיחות בפרויקט.

billing_project = "YOUR-BILLING-PROJECT"
conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations"

conversation_response = requests.get(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation fetched successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while fetching conversation: {conversation_response.status_code}")
    print(conversation_response.text)

מחליפים את YOUR-BILLING-PROJECT במזהה של פרויקט החיוב שבו הפעלתם את ממשקי ה-API הנדרשים.

איך רואים את רשימת ההודעות בשיחה

בדוגמת קוד לדוגמה הבאה מוצג איך לשלוח בקשת HTTP GET לנקודת הקצה messages של השיחה כדי לקבל רשימה של כל ההודעות בשיחה.

כדי להציג את רשימת ההודעות, צריכה להיות לכם הרשאה של cloudaicompanion.topics.get בשיחה.

billing_project = "YOUR-BILLING-PROJECT"

conversation_id = "conversation_1"

conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations/{conversation_id}/messages"

conversation_response = requests.get(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation fetched successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while fetching conversation: {conversation_response.status_code}")
    print(conversation_response.text)

מחליפים את הערכים לדוגמה באופן הבא:

  • YOUR-BILLING-PROJECT: מזהה הפרויקט לחיוב.
  • conversation_1: המזהה של השיחה שרוצים להציג את ההודעות שלה.

איך מוחקים שיחות

בדוגמת הקוד הבאה אפשר לראות איך מוחקים שיחה על ידי שליחת בקשת HTTP DELETE לכתובת ה-URL של משאב השיחה. אדמינים (משתמשים עם התפקיד cloudaicompanion.topicAdmin בניהול הזהויות והרשאות הגישה) או משתמשים עם הרשאת cloudaicompanion.topics.delete בניהול הזהויות והרשאות הגישה יכולים למחוק שיחות בפרויקט.

billing_project = "YOUR-BILLING-PROJECT"
conversation_id = "conversation_1"

conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations/{conversation_id}"

conversation_response = requests.delete(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation deleted successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while deleting conversation: {conversation_response.status_code}")
    print(conversation_response.text)

מחליפים את הערכים לדוגמה באופן הבא:

  • YOUR-BILLING-PROJECT: מזהה הפרויקט לחיוב.
  • conversation_1: המזהה של השיחה שרוצים למחוק.

איך לשאול שאלות באמצעות ה-API

אחרי שיוצרים סוכן נתונים ושיחה (במקרה של צ'אט עם שמירת מצב), אפשר לשלוח שאילתות לסוכן.

כששולחים שאילתה, ה-API מחזיר זרם של אובייקטים מסוג Message. הזרם הזה יכול להכיל סוגים שונים של הודעות, כולל טקסט, טבלאות נתונים ותרשימים. הודעות טקסט יכולות לספק תובנות לגבי ההיגיון של הסוכן, לדווח על ההתקדמות שלו או לתת את התשובה הסופית. הערך של TextType מציין את המטרה של כל הודעת טקסט:

  • THOUGHT: הצגת תהליך החשיבה הפנימי של הסוכן בזמן שהוא מתכנן איך לענות על השאילתה. הודעות THOUGHT מספקות תובנות שלב אחר שלב לגבי תהליך החשיבה וקבלת ההחלטות של הנציג, והן כוללות שני חלקים: parts[0] הוא סיכום החשיבה, שכולל סיכום קצר של טקסט החשיבה המלא, וparts[1] הוא טקסט החשיבה המלא.
  • PROGRESS: דיווח על התקדמות הסוכן בפעולה, כמו אחזור נתונים או הפעלה של כלי. הערך הזה מוחזר רק למקורות נתונים של Looker והוא מכיל שני חלקים: parts[0] הוא הסיכום ו-parts[1] הוא הטקסט המלא של ההתקדמות.
  • FINAL_RESPONSE: מספק את התשובה הסופית לשאילתה.

בדוגמאות שבקטע הבא נעשה שימוש בפונקציות העזר שמוגדרות בפונקציות עזר של Python להזרמת תשובות בצ'אט כדי לעבד ולהציג כל הודעה בתגובה המוזרמת של ה-API. הוראות להצגת ההודעות האלה בממשק משתמש כשמשתמשים במקורות נתונים של Looker מופיעות במאמר הצגת תשובות של סוכנים למקורות נתונים של Looker.

שליחת שאלות בצ'אט עם שמירת מצב ובצ'אט ללא שמירת מצב

אפשר ליצור אינטראקציה עם ה-API במצבים העיקריים הבאים:

  • צ'אט עם שמירת מצב: Google Cloud שומר ומנהל את היסטוריית השיחות. צ'אט עם שמירת מצב הוא מטבעו רב-שלבי, כי ה-API שומר את ההקשר מההודעות הקודמות. צריך לשלוח רק את ההודעה הנוכחית בכל תור.
  • צ'אט ללא שמירת מצב: האפליקציה שלכם מנהלת את היסטוריית השיחות. צריך לכלול את ההודעות הקודמות הרלוונטיות בכל הודעה חדשה. דוגמאות מפורטות לאופן ניהול שיחות מרובות תפניות במצב חסר מצב זמינות במאמר יצירת שיחה מרובת תפניות במצב חסר מצב.

הדוגמאות הבאות מדגימות איך להשתמש ב-API לצ'אט עם שמירת מצב ולצ'אט בלי שמירת מצב, על ידי שליחת בקשת POST לנקודת הקצה :chat.

צ'אט עם שמירת מצב

שליחת בקשה לצ'אט עם שמירת מצב עם הפניה לשיחה

בדוגמת הקוד הבאה אפשר לראות איך שואלים שאלות ב-API באמצעות השיחה שהגדרתם בשלבים הקודמים. בדוגמה הזו נעשה שימוש בפונקציית העזר get_stream כדי להזרים את התשובה.

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

data_agent_id = "data_agent_1"
conversation_id = "conversation_1"

# Construct the payload
chat_payload = {
    "parent": f"projects/{billing_project}/locations/global",
    "messages": [
        {
            "userMessage": {
                "text": "Make a bar graph for the top 5 states by the total number of airports"
            }
        }
    ],
    "conversation_reference": {
        "conversation": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}",
        "data_agent_context": {
            "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
            # "credentials": looker_credentials
        }
    }
}

# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)

מחליפים את הערכים לדוגמה באופן הבא:

  • data_agent_1: המזהה של סוכן הנתונים, כפי שמוגדר בבלוק הקוד לדוגמה במאמר יצירת סוכן נתונים.
  • conversation_1: מזהה ייחודי של השיחה.
  • ההנחיה לדוגמה הייתה Make a bar graph for the top 5 states by the total number of airports.

צ'אט ללא שמירת מצב

בדוגמאות הבאות מוצגות דרכים לשליחת בקשות בלי שמירת מצב באמצעות הפניה לסוכן נתונים או באמצעות הקשר מוטבע.

שליחת בקשת צ'אט בלי שמירת מצב עם הפניה לסוכן נתונים

בדוגמת הקוד הבאה אפשר לראות איך שואלים את ה-API שאלה בלי מצב (stateless) באמצעות סוכן הנתונים שהגדרתם בשלבים הקודמים. בדוגמה הזו נעשה שימוש בפונקציית העזר get_stream כדי להזרים את התשובה.

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

data_agent_id = "data_agent_1"

# Construct the payload
chat_payload = {
    "parent": f"projects/{billing_project}/locations/global",
    "messages": [
        {
            "userMessage": {
                "text": "Make a bar graph for the top 5 states by the total number of airports"
            }
        }
    ],
    "data_agent_context": {
        "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
        # "credentials": looker_credentials
    }
}

# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)

מחליפים את הערכים לדוגמה באופן הבא:

  • data_agent_1: המזהה של סוכן הנתונים, כפי שמוגדר בבלוק הקוד לדוגמה במאמר יצירת סוכן נתונים.
  • ההנחיה לדוגמה הייתה Make a bar graph for the top 5 states by the total number of airports.

שליחת בקשה לצ'אט ללא שמירת מצב עם הקשר מוטבע

בדוגמת הקוד הבאה אפשר לראות איך לשאול את ה-API שאלה ללא מצב (stateless) באמצעות הקשר בתוך השורה. בדוגמה הזו נעשה שימוש בפונקציית העזר get_stream כדי להזרים את התגובה, ומקור נתונים של BigQuery משמש כדוגמה.

אפשר גם להפעיל ניתוח מתקדם באמצעות Python על ידי הוספת הפרמטר options למטען הייעודי (payload) של הבקשה. מידע נוסף על הפרמטר options ועל האפשרויות שניתן להגדיר לשיחה זמין בדף REST Resource: projects.locations.dataAgents.

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"

# Construct the payload
chat_payload = {
    "parent": f"projects/{billing_project}/locations/global",
    "messages": [
        {
            "userMessage": {
                "text": "Make a bar graph for the top 5 states by the total number of airports"
            }
        }
    ],
    "inline_context": {
        "datasource_references": bigquery_data_sources,
          # Optional: To enable advanced analysis with Python, include the following options block:
          "options": {
              "analysis": {
                  "python": {
                      "enabled": True
                  }
              }
          }
    }
}

# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)

קוד לדוגמה הבא מדגים איך לשאול את ה-API שאלה בלי שמירת מצב באמצעות הקשר מוטבע ומקור נתונים של AlloyDB כדוגמה.

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

# The natural language question to ask the data agent
user_prompt = "what is the average loan amount in the US?"  # Replace with your question

# Construct the payload
chat_payload = {
    "parent": f"projects/{billing_project}/locations/global",
    "messages": [
        {
            "userMessage": {
                "text": user_prompt
            }
        }
    ],
    "data_agent_context": {
          "data_agent": f"projects/{billing_project}/locations/global/dataAgents/{data_agent_id}",
    }
}

print(f"Sending prompt to :chat: '{user_prompt}'")
print(f"Endpoint: {chat_url}")
print(f"Payload: {json.dumps(chat_payload, indent=2)}")

get_stream(chat_url, chat_payload)

יצירת שיחה רב-שלבית בלי שמירת מצב

כדי לשאול שאלות המשך בשיחה בלי שמירת מצב, האפליקציה צריכה לנהל את ההקשר של השיחה על ידי שליחת היסטוריית ההודעות המלאה עם כל בקשה חדשה. בקטעים הבאים מוסבר איך להגדיר פונקציות עזר ולקרוא להן כדי ליצור שיחה רב-שלבית:

שליחת בקשות מרובות שלבים

פונקציית העזרה multi_turn_Conversation הבאה מנהלת את ההקשר של השיחה על ידי אחסון ההודעות ברשימה. כך תוכלו לשלוח שאלות המשך שמבוססות על תגובות קודמות. במטען הייעודי (payload) של הפונקציה, אפשר להפנות לסוכן נתונים או לספק את מקור הנתונים ישירות באמצעות הקשר בתוך השורה.

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"

# List that is used to track previous turns and is reused across requests
conversation_messages = []

data_agent_id = "data_agent_1"

# Helper function for calling the API
def multi_turn_Conversation(msg):

  userMessage = {
      "userMessage": {
          "text": msg
      }
  }

  # Send a multi-turn request by including previous turns and the new message
  conversation_messages.append(userMessage)

  # Construct the payload
  chat_payload = {
      "parent": f"projects/{billing_project}/locations/global",
      "messages": conversation_messages,
      # Use a data agent reference
      "data_agent_context": {
          "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
          # "credentials": looker_credentials
      },
      # Use inline context
      # "inline_context": {
      #     "datasource_references": bigquery_data_sources,
      # }
  }

  # Call the get_stream_multi_turn helper function to stream the response
  get_stream_multi_turn(chat_url, chat_payload, conversation_messages)

בדוגמה הקודמת, מחליפים את data_agent_1 במזהה של סוכן הנתונים, כפי שמוגדר בבלוק הקוד לדוגמה במאמר יצירת סוכן נתונים.

אפשר לקרוא לפונקציית העזר multi_turn_Conversation בכל תור של השיחה. בדוגמת הקוד הבאה אפשר לראות איך לשלוח בקשה ראשונית ואז בקשת המשך שמבוססת על התשובה הקודמת.

# Send first-turn request
multi_turn_Conversation("Which species of tree is most prevalent?")

# Send follow-up-turn request
multi_turn_Conversation("Can you show me the results as a bar chart?")

בדוגמה הקודמת, מחליפים את הערכים לדוגמה באופן הבא:

  • Which species of tree is most prevalent?: שאלה בשפה טבעית לשליחה לסוכן הנתונים.
  • Can you show me the results as a bar chart?: שאלה המשך שמבוססת על השאלה הקודמת או מחדדת אותה.

עיבוד התשובות

הפונקציה get_stream_multi_turn הבאה מעבדת את התשובה של ה-API לסטרימינג. הפונקציה הזו דומה לפונקציית העזר get_stream, אבל היא שומרת את התגובה ברשימה conversation_messages כדי לשמור את הקשר השיחה לתור הבא.

def get_stream_multi_turn(url, json, conversation_messages):
    s = requests.Session()

    acc = ''

    with s.post(url, json=json, headers=headers, stream=True) as resp:
        for line in resp.iter_lines():
            if not line:
                continue

            decoded_line = str(line, encoding='utf-8')

            if decoded_line == '[{':
                acc = '{'
            elif decoded_line == '}]':
                acc += '}'
            elif decoded_line == ',':
                continue
            else:
                acc += decoded_line

            if not is_json(acc):
                continue

            data_json = json_lib.loads(acc)
            # Store the response that will be used in the next iteration
            conversation_messages.append(data_json)

            if not 'systemMessage' in data_json:
                if 'error' in data_json:
                    handle_error(data_json['error'])
                continue

            if 'text' in data_json['systemMessage']:
                handle_text_response(data_json['systemMessage']['text'])
            elif 'schema' in data_json['systemMessage']:
                handle_schema_response(data_json['systemMessage']['schema'])
            elif 'data' in data_json['systemMessage']:
                handle_data_response(data_json['systemMessage']['data'])
            elif 'chart' in data_json['systemMessage']:
                handle_chart_response(data_json['systemMessage']['chart'])
            else:
                colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter())
                print(colored_json)
            print('\n')
            acc = ''

דוגמאות קוד מקצה לקצה

דוגמאות הקוד הבאות ניתנות להרחבה וכוללות את כל המשימות שמפורטות במדריך הזה.

איך יוצרים סוכן נתונים ל-BigQuery ול-Looker באמצעות HTTP ו-Python

    import json
    import json as json_lib
    import textwrap

    import altair as alt
    import google.auth
    from google.auth.transport.requests import Request
    import IPython
    from IPython.display import display, HTML
    import pandas as pd
    from pygments import formatters, highlight, lexers
    import requests

    from google.colab import auth
    auth.authenticate_user()

    access_token = !gcloud auth application-default print-access-token
    headers = {
        "Authorization": f"Bearer {access_token[0]}",
        "Content-Type": "application/json",
        "x-server-timeout": "300", # Custom timeout up to 600s
    }

    ################### Data source details ###################

    billing_project = "your_billing_project"
    location = "global"
    system_instruction = "Help the user in analyzing their data"


    # BigQuery data source
    bigquery_data_sources = {
        "bq": {
        "tableReferences": [
            {
            "projectId": "bigquery-public-data",
            "datasetId": "san_francisco",
            "tableId": "street_trees"
            }
        ]
        }
    }

    # Looker data source
    looker_credentials = {
        "oauth": {
            "secret": {
            "client_id": "your_looker_client_id",
            "client_secret": "your_looker_client_secret",
            }
        }
    }

    # To use access_token for authentication, uncomment the following looker_credentials code block and comment out the previous looker_credentials code block.
    # looker_credentials = {
    #     "oauth": {
    #         "token": {
    #           "access_token": "your_looker_access_token",
    #         }
    #     }
    # }

    looker_data_source = {
        "looker": {
        "explore_references": [
            {
                "looker_instance_uri": looker_instance_uri,
                "lookml_model": "your_model",
                "explore": "your_explore",
            },
            {
                "looker_instance_uri": looker_instance_uri,
                "lookml_model": "your_model_2",
                "explore": "your_explore_2",
            },
            # Add up to 5 total Explore references
        ],
        # Do not include the following line during agent creation
        # "credentials": looker_credentials
    }

    # Looker Studio data source
    looker_studio_data_source = {
        "studio":{
            "studio_references": [
                {
                  "studio_datasource_id": "studio_datasource_id"
                }
            ]
        }
    }

    ################### Create data agent ###################
    data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents"

    data_agent_id = "data_agent_1"

    data_agent_payload = {
        "name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional
        "description": "This is the description of data_agent.", # Optional

        "data_analytics_agent": {
            "published_context": {
                "datasource_references": bigquery_data_sources,
                "system_instruction": system_instruction,
                # Optional: To enable advanced analysis with Python, include the following options block:
                "options": {
                    "analysis": {
                        "python": {
                            "enabled": True
                        }
                    }
                }
            }
        }
    }

    params = {"data_agent_id": data_agent_id} # Optional

    data_agent_response = requests.post(
        data_agent_url, params=params, json=data_agent_payload, headers=headers
    )

    if data_agent_response.status_code == 200:
        print("Data Agent created successfully!")
        print(json.dumps(data_agent_response.json(), indent=2))
    else:
        print(f"Error creating Data Agent: {data_agent_response.status_code}")
        print(data_agent_response.text)


    ################### Create conversation ###################

    conversation_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/conversations"

    data_agent_id = "data_agent_1"
    conversation_id = "conversation _1"

    conversation_payload = {
        "agents": [
            f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
        ],
        "name": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}"
    }
    params = {
        "conversation_id": conversation_id
    }

    conversation_response = requests.post(conversation_url, headers=headers, params=params, json=conversation_payload)

    if conversation_response.status_code == 200:
        print("Conversation created successfully!")
        print(json.dumps(conversation_response.json(), indent=2))
    else:
        print(f"Error creating Conversation: {conversation_response.status_code}")
        print(conversation_response.text)


    ################### Chat with the API by using conversation (stateful) ####################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

    data_agent_id = "data_agent_1"
    conversation_id = "conversation _1"

    # Construct the payload
    chat_payload = {
        "parent": f"projects/{billing_project}/locations/global",
        "messages": [
            {
                "userMessage": {
                    "text": "Make a bar graph for the top 5 states by the total number of airports"
                }
            }
        ],
        "conversation_reference": {
            "conversation": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}",
            "data_agent_context": {
                "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
                # "credentials": looker_credentials
            }
        }
    }

    # Call the get_stream function to stream the response
    get_stream(chat_url, chat_payload)

    ################### Chat with the API by using dataAgents (stateless) ####################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

    data_agent_id = "data_agent_1"

    # Construct the payload
    chat_payload = {
        "parent": f"projects/{billing_project}/locations/global",
        "messages": [
            {
                "userMessage": {
                    "text": "Make a bar graph for the top 5 states by the total number of airports"
                }
            }
        ],
        "data_agent_context": {
            "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
            # "credentials": looker_credentials
        }
    }

    # Call the get_stream function to stream the response
    get_stream(chat_url, chat_payload)

    ################### Chat with the API by using inline context (stateless) ####################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"

    # Construct the payload
    chat_payload = {
        "parent": f"projects/{billing_project}/locations/global",
        "messages": [
            {
                "userMessage": {
                    "text": "Make a bar graph for the top 5 states by the total number of airports"
                }
            }
        ],
        "inline_context": {
            "datasource_references": bigquery_data_sources,
            # Optional - if wanting to use advanced analysis with python
            "options": {
                "analysis": {
                    "python": {
                        "enabled": True
                    }
                }
            }
        }
    }

    # Call the get_stream function to stream the response
    get_stream(chat_url, chat_payload)

    ################### Multi-turn conversation ###################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"

    # List that is used to track previous turns and is reused across requests
    conversation_messages = []

    data_agent_id = "data_agent_1"

    # Helper function for calling the API
    def multi_turn_Conversation(msg):

      userMessage = {
          "userMessage": {
              "text": msg
          }
      }

      # Send a multi-turn request by including previous turns and the new message
      conversation_messages.append(userMessage)

      # Construct the payload
      chat_payload = {
          "parent": f"projects/{billing_project}/locations/global",
          "messages": conversation_messages,
          # Use a data agent reference
          "data_agent_context": {
              "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
              # "credentials": looker_credentials
          },
          # Use inline context
          # "inline_context": {
          #     "datasource_references": bigquery_data_sources,
          # }
      }

      # Call the get_stream_multi_turn helper function to stream the response
      get_stream_multi_turn(chat_url, chat_payload, conversation_messages)

    # Send first-turn request
    multi_turn_Conversation("Which species of tree is most prevalent?")

    # Send follow-up-turn request
    multi_turn_Conversation("Can you show me the results as a bar chart?")
    

הגדרת פונקציות עזר

קוד לדוגמה הבא מכיל הגדרות של פונקציות עזר שמשמשות בדוגמאות הקוד הקודמות. הפונקציות האלה עוזרות לנתח תשובות מממשק ה-API ולהציג את התוצאות.

פונקציות עזר של Python להזרמת תשובות בצ'אט

    def is_json(str):
      try:
          json_object = json_lib.loads(str)
      except ValueError as e:
          return False
      return True

    def handle_text_response(resp):
      parts = resp['parts']
      full_text = "".join(parts)
      if "\n" not in full_text and len(full_text) > 80:
        wrapped_text = textwrap.fill(full_text, width=80)
        print(wrapped_text)
      else:
        print(full_text)

    def get_property(data, field_name, default = ''):
      return data[field_name] if field_name in data else default

    def display_schema(data):
      fields = data['fields']
      df = pd.DataFrame({
        "Column": map(lambda field: get_property(field, 'name'), fields),
        "Type": map(lambda field: get_property(field, 'type'), fields),
        "Description": map(lambda field: get_property(field, 'description', '-'), fields),
        "Mode": map(lambda field: get_property(field, 'mode'), fields)
      })
      display(df)

    def display_section_title(text):
      display(HTML('<h2>{}</h2>'.format(text)))

    def format_bq_table_ref(table_ref):
      return '{}.{}.{}'.format(table_ref['projectId'], table_ref['datasetId'], table_ref['tableId'])

    def format_looker_table_ref(table_ref):
      return 'lookmlModel: {}, explore: {}, lookerInstanceUri: {}'.format(table_ref['lookmlModel'], table_ref['explore'], table_ref['lookerInstanceUri'])

    def display_datasource(datasource):
      source_name = ''

      if 'studioDatasourceId' in datasource:
        source_name = datasource['studioDatasourceId']
      elif 'lookerExploreReference' in datasource:
        source_name = format_looker_table_ref(datasource['lookerExploreReference'])
      else:
        source_name = format_bq_table_ref(datasource['bigqueryTableReference'])

      print(source_name)
      display_schema(datasource['schema'])

    def handle_schema_response(resp):
      if 'query' in resp:
        print(resp['query']['question'])
      elif 'result' in resp:
        display_section_title('Schema resolved')
        print('Data sources:')
        for datasource in resp['result']['datasources']:
          display_datasource(datasource)

    def handle_data_response(resp):
      if 'query' in resp:
        query = resp['query']
        display_section_title('Retrieval query')
        print('Query name: {}'.format(query['name']))
        if 'question' in query:
          print('Question: {}'.format(query['question']))
        if 'datasources' in query:
          print('Data sources:')
          for datasource in query['datasources']:
            display_datasource(datasource)
      elif 'generatedSql' in resp:
        display_section_title('SQL generated')
        print(resp['generatedSql'])
      elif 'result' in resp:
        display_section_title('Data retrieved')

        fields = map(lambda field: get_property(field, 'name'), resp['result']['schema']['fields'])
        dict = {}

        for field in fields:
          dict[field] = map(lambda el: get_property(el, field), resp['result']['data'])

        display(pd.DataFrame(dict))

    def handle_chart_response(resp):
      if 'query' in resp:
        print(resp['query']['instructions'])
      elif 'result' in resp:
        vegaConfig = resp['result']['vegaConfig']
        alt.Chart.from_json(json_lib.dumps(vegaConfig)).display();

    def handle_error(resp):
      display_section_title('Error')
      print('Code: {}'.format(resp['code']))
      print('Message: {}'.format(resp['message']))

    def get_stream(url, json):
      s = requests.Session()

      acc = ''

      with s.post(url, json=json, headers=headers, stream=True) as resp:
        for line in resp.iter_lines():
          if not line:
            continue

          decoded_line = str(line, encoding='utf-8')

          if decoded_line == '[{':
            acc = '{'
          elif decoded_line == '}]':
            acc += '}'
          elif decoded_line == ',':
            continue
          else:
            acc += decoded_line

          if not is_json(acc):
            continue

          data_json = json_lib.loads(acc)

          if not 'systemMessage' in data_json:
            if 'error' in data_json:
                handle_error(data_json['error'])
            continue

          if 'text' in data_json['systemMessage']:
            handle_text_response(data_json['systemMessage']['text'])
          elif 'schema' in data_json['systemMessage']:
            handle_schema_response(data_json['systemMessage']['schema'])
          elif 'data' in data_json['systemMessage']:
            handle_data_response(data_json['systemMessage']['data'])
          elif 'chart' in data_json['systemMessage']:
            handle_chart_response(data_json['systemMessage']['chart'])
          else:
            colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter())
            print(colored_json)
            print('\n')
            acc = ''

    def get_stream_multi_turn(url, json, conversation_messages):
        s = requests.Session()

        acc = ''

        with s.post(url, json=json, headers=headers, stream=True) as resp:
            for line in resp.iter_lines():
                if not line:
                    continue

                decoded_line = str(line, encoding='utf-8')

                if decoded_line == '[{':
                    acc = '{'
                elif decoded_line == '}]':
                    acc += '}'
                elif decoded_line == ',':
                    continue
                else:
                    acc += decoded_line

                if not is_json(acc):
                    continue

                data_json = json_lib.loads(acc)
                # Store the response that will be used in the next iteration
                conversation_messages.append(data_json)

                if not 'systemMessage' in data_json:
                    if 'error' in data_json:
                        handle_error(data_json['error'])
                    continue

                if 'text' in data_json['systemMessage']:
                    handle_text_response(data_json['systemMessage']['text'])
                elif 'schema' in data_json['systemMessage']:
                    handle_schema_response(data_json['systemMessage']['schema'])
                elif 'data' in data_json['systemMessage']:
                    handle_data_response(data_json['systemMessage']['data'])
                elif 'chart' in data_json['systemMessage']:
                    handle_chart_response(data_json['systemMessage']['chart'])
                else:
                    colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter())
                    print(colored_json)
                print('\n')
                acc = ''