פיתוח סוכן נתונים באמצעות HTTP ו-Python

בדף הזה מוסבר איך להשתמש ב-Python כדי לשלוח בקשות HTTP אל Conversational Analytics API (שניתן לגשת אליו דרך geminidataanalytics.googleapis.com).

בדף הזה יש דוגמה לקוד Python שמראה איך לבצע את הפעולות הבאות:

בסוף הדף מופיעה גרסה מלאה של קוד לדוגמה, וגם פונקציות עזר שמשמשות להזרמת התגובה של ה-API.

הגדרת ההגדרות הראשוניות והאימות

קוד Python לדוגמה שמבצע את המשימות האלה:

  • ייבוא של ספריות Python הנדרשות
  • קבלת אסימון גישה לאימות HTTP באמצעות Google Cloud CLI
  • הגדרת משתנים לפרויקט החיוב, למיקום ולהוראות המערכת
import json
import json as json_lib
import textwrap

import altair as alt
import google.auth
from google.auth.transport.requests import Request
import IPython
from IPython.display import display, HTML
import pandas as pd
from pygments import highlight, lexers, formatters
import requests

from google.colab import auth
auth.authenticate_user()

access_token = !gcloud auth application-default print-access-token
headers = {
        "Authorization": f"Bearer {access_token[0]}",
        "Content-Type": "application/json",
        "x-server-timeout": "300", # Custom timeout up to 600s
}

billing_project = 'YOUR-BILLING-PROJECT'
location = "global"
system_instruction = 'YOUR-SYSTEM-INSTRUCTIONS'

מחליפים את הערכים לדוגמה באופן הבא:

  • YOUR-BILLING-PROJECT: המזהה של פרויקט החיוב שבו הפעלתם את ממשקי ה-API הנדרשים.
  • YOUR-SYSTEM-INSTRUCTIONS: הוראות למערכת שמנחות את התנהגות הסוכן ומאפשרות להתאים אותה לצורכי הנתונים שלכם. לדוגמה, אפשר להשתמש בהוראות מערכת כדי להגדיר מונחים עסקיים, לשלוט באורך התשובה או להגדיר את עיצוב הנתונים. מומלץ להגדיר את ההוראות למערכת באמצעות פורמט ה-YAML המומלץ במאמר איך כותבים הוראות למערכת בצורה יעילה כדי לספק הנחיות מפורטות ומובְנות.

אימות ל-Looker

אם אתם מתכננים להתחבר למקור נתונים של Looker, תצטרכו לבצע אימות למכונה של Looker.

שימוש במפתחות API

בדוגמת הקוד הבאה ב-Python אפשר לראות איך מאמתים את הסוכן למופע Looker באמצעות מפתחות API.

looker_credentials = {
    "oauth": {
        "secret": {
            "client_id": "YOUR-LOOKER-CLIENT-ID",
            "client_secret": "YOUR-LOOKER-CLIENT-SECRET",
        }
    }
}

מחליפים את הערכים לדוגמה באופן הבא:

  • YOUR-LOOKER-CLIENT-ID: מזהה הלקוח של מפתח Looker API שנוצר.
  • YOUR-LOOKER-CLIENT-SECRET: ה-client secret של מפתח Looker API שנוצר.

שימוש באסימוני גישה

בדוגמת הקוד הבאה ב-Python אפשר לראות איך מאמתים את הסוכן למופע Looker באמצעות טוקנים של גישה.

looker_credentials = {
    "oauth": {
        "token": {
            "access_token": "YOUR-TOKEN",
        }
    }
}

מחליפים את הערכים לדוגמה באופן הבא:

  • YOUR-TOKEN: הערך access_token שיוצרים כדי לבצע אימות ל-Looker.

התחברות למקור נתונים

בקטעים הבאים מוסבר איך מגדירים את פרטי החיבור למקורות הנתונים של הסוכן. הסוכן יכול להתחבר לנתונים ב-Looker, ב-BigQuery או ב-Looker Studio.

חיבור לנתונים ב-Looker

בדוגמת הקוד הבאה מוגדר חיבור ל-Looker Explore. כדי ליצור חיבור למופע Looker, צריך לוודא שיצרתם מפתחות Looker API, כמו שמתואר במאמר אימות וחיבור למקור נתונים באמצעות Conversational Analytics API. אפשר להתחבר לעד חמישה Looker Explores בו-זמנית באמצעות Conversational Analytics API.

כשמתחברים למקור נתונים של Looker, חשוב לשים לב לנקודות הבאות:

  • אתם יכולים לשאול שאלות על כל דוח Explore שכלול בשיחה.
  • סוכן יכול לשלוח שאילתה רק למסמך Explore אחד בכל פעם. אי אפשר להריץ שאילתות בכמה דוחות Explore בו-זמנית.
  • נציג יכול לשאול כמה שאלות ב-Explore באותה שיחה.
  • סוכן יכול לשלוח שאילתות לכמה ניתוחים בשיחה שכוללת שאלות עם כמה חלקים, או בשיחות שכוללות שאלות המשך.

    לדוגמה: משתמש מחבר שני דוחות Explore, אחד בשם cat-explore ואחד בשם dog-explore. המשתמש מזין את השאלה 'מה גדול יותר: מספר החתולים או מספר הכלבים?' כך יישאלו שתי שאילתות: אחת לספירת מספר החתולים ב-cat-explore ואחת לספירת מספר הכלבים ב-dog-explore. הנציג משווה את המספר משתי השאילתות אחרי שהוא משלים את שתיהן.

בדוגמת הקוד הבאה מוגדר חיבור לכמה אפשרויות נוספות ב-Looker. כדי לשפר את הביצועים של הסוכן, אתם יכולים לספק שאילתות לדוגמה כהקשר מובנה לניתוחים שלכם. מידע נוסף זמין במאמר בנושא הגדרת הקשר של סוכן נתונים למקורות נתונים של Looker.

looker_instance_uri = "https://your_company.looker.com"
looker_data_source = {
    "looker": {
        "explore_references": [
            {
                "looker_instance_uri": looker_instance_uri,
                "lookml_model": "your_model",
                "explore": "your_explore",
            },
            {
                "looker_instance_uri": looker_instance_uri,
                "lookml_model": "your_model_2",
                "explore": "your_explore_2",
            },
            # Add up to 5 total Explore references
        ],
       # Do not include the following line during agent creation
       "credentials": looker_credentials
    }
}

מחליפים את הערכים לדוגמה באופן הבא:

  • https://your_company.looker.com: כתובת ה-URL המלאה של מכונת Looker.
  • your_model: השם של מודל LookML שכולל את כלי הניתוח שרוצים להתחבר אליו.
  • your_explore: השם של Looker Explore שרוצים שהסוכן לגישה לנתונים ישלח אליו שאילתה.
  • your_model_2: השם של מודל LookML השני שכולל את התכונה Explore שאליה רוצים להתחבר. אפשר לחזור על התהליך הזה כדי להוסיף משתנים למודלים נוספים, עד חמישה ניתוחים.
  • your_explore_2: השם של Looker Explore נוסף שרוצים שהסוכן לגישה לנתונים יריץ עליו שאילתה. אפשר לחזור על המשתנה הזה כדי לכלול עד חמישה ניתוחים.

התחברות לנתוני BigQuery

ב-Conversational Analytics API, אין מגבלות קשיחות על מספר הטבלאות ב-BigQuery שאפשר להתחבר אליהן. עם זאת, חיבור למספר גדול של טבלאות עלול להפחית את רמת הדיוק או לגרום לחריגה ממגבלת הטוקנים של הקלט במודל.

קוד לדוגמה הבא מגדיר חיבור לכמה טבלאות ב-BigQuery, והוא כולל דוגמאות לשדות אופציונליים של הקשר מובנה. כדי לשפר את הביצועים של הסוכן, אתם יכולים לספק הקשר מובנה לטבלאות BigQuery, כמו תיאורים של טבלאות ועמודות, מילים נרדפות, תגים ושאילתות לדוגמה. מידע נוסף זמין במאמר בנושא הגדרת הקשר של סוכן הנתונים למקורות נתונים של BigQuery.

bigquery_data_sources = {
    "bq": {
        "tableReferences": [
            {
                "projectId": "my_project_id",
                "datasetId": "my_dataset_id",
                "tableId": "my_table_id",
                "schema": {
                    "description": "my_table_description",
                    "fields": [{
                        "name": "my_column_name",
                        "description": "my_column_description"
                    }]
                }
            },
            {
                "projectId": "my_project_id_2",
                "datasetId": "my_dataset_id_2",
                "tableId": "my_table_id_2"
            },
            {
                "projectId": "my_project_id_3",
                "datasetId": "my_dataset_id_3",
                "tableId": "my_table_id_3"
            },
        ]
    }
}

מחליפים את הערכים לדוגמה באופן הבא:

  • my_project_id: המזהה של הפרויקט Google Cloud שמכיל את מערך הנתונים והטבלה ב-BigQuery שאליהם רוצים להתחבר. כדי להתחבר למערך נתונים ציבורי, מציינים bigquery-public-data.
  • my_dataset_id: המזהה של מערך הנתונים ב-BigQuery.
  • my_table_id: המזהה של הטבלה ב-BigQuery.
  • my_table_description: תיאור אופציונלי של תוכן הטבלה והמטרה שלה.
  • my_column_name: שם העמודה בטבלה שרוצים לספק לה תיאור אופציונלי.
  • my_column_description: תיאור אופציונלי של התוכן והמטרה של העמודה.

חיבור לנתונים ב-Looker Studio

בדוגמת הקוד הבאה מוגדר חיבור למקור נתונים ב-Looker Studio.

looker_studio_data_source = {
    "studio":{
        "studio_references": [
            {
              "studio_datasource_id": "studio_datasource_id"
            }
        ]
    }
}

מחליפים את studio_datasource_id במזהה של מקור הנתונים.

יצירת סוכן נתונים

קוד לדוגמה הבא מדגים איך ליצור את סוכן הנתונים באופן סינכרוני או אסינכרוני על ידי שליחת בקשת HTTP POST לנקודת הקצה ליצירת סוכן נתונים. מטען הייעודי (payload) של הבקשה כולל את הפרטים הבאים:

  • השם המלא של המשאב של הסוכן. הערך הזה כולל את מזהה הפרויקט, המיקום ומזהה ייחודי של הסוכן.
  • תיאור של סוכן הנתונים.
  • ההקשר של סוכן הנתונים, כולל תיאור המערכת (מוגדר בקטע הגדרת ההגדרות הראשוניות והאימות) ומקור הנתונים שבו הסוכן משתמש (מוגדר בקטע קישור למקור נתונים).

אפשר גם להפעיל ניתוח מתקדם באמצעות Python על ידי הוספת הפרמטר options למטען הייעודי (payload) של הבקשה. מידע נוסף על הפרמטר options ועל האפשרויות שאפשר להגדיר לשיחה זמין במאמר REST Resource: projects.locations.dataAgents.

סינכרוני

data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents:createSync"

data_agent_id = "data_agent_1"

data_agent_payload = {
      "name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional
      "description": "This is the description of data_agent_1.", # Optional

      "data_analytics_agent": {
          "published_context": {
              "datasource_references": bigquery_data_sources,
              "system_instruction": system_instruction,
              # Optional: To enable advanced analysis with Python, include the following options block:
              "options": {
                  "analysis": {
                      "python": {
                          "enabled": True
                      }
                  }
              }
          }
      }
  }

params = {"data_agent_id": data_agent_id} # Optional

data_agent_response = requests.post(
    data_agent_url, params=params, json=data_agent_payload, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agent created successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error creating Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

אסינכרוני

data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents"

data_agent_id = "data_agent_1"

data_agent_payload = {
      "name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional
      "description": "This is the description of data_agent_1.", # Optional

      "data_analytics_agent": {
          "published_context": {
              "datasource_references": bigquery_data_sources,
              "system_instruction": system_instruction,
              # Optional: To enable advanced analysis with Python, include the following options block:
              "options": {
                  "analysis": {
                      "python": {
                          "enabled": True
                      }
                  }
              }
          }
      }
  }

params = {"data_agent_id": data_agent_id} # Optional

data_agent_response = requests.post(
    data_agent_url, params=params, json=data_agent_payload, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agent created successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error creating Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

מחליפים את הערכים לדוגמה באופן הבא:

  • data_agent_1: מזהה ייחודי של סוכן הנתונים. הערך הזה משמש בשם המשאב של הסוכן וכפרמטר השאילתה של כתובת האתר data_agent_id.
  • This is the description of data_agent_1.: תיאור של סוכן הנתונים.

יצירת שיחה

בדוגמת הקוד הבאה אפשר לראות איך יוצרים שיחה עם סוכן הנתונים.

conversation_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/conversations"

data_agent_id = "data_agent_1"
conversation_id = "conversation_1"

conversation_payload = {
    "agents": [
        f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
    ],
    "name": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}"
}
params = {
    "conversation_id": conversation_id
}

conversation_response = requests.post(conversation_url, headers=headers, params=params, json=conversation_payload)

if conversation_response.status_code == 200:
    print("Conversation created successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error creating Conversation: {conversation_response.status_code}")
    print(conversation_response.text)

מחליפים את הערכים לדוגמה באופן הבא:

  • data_agent_1: המזהה של סוכן הנתונים, כפי שמוגדר בבלוק הקוד לדוגמה במאמר יצירת סוכן נתונים.
  • conversation_1: מזהה ייחודי של השיחה.

ניהול סוכני נתונים ושיחות

בדוגמאות הקוד הבאות אפשר לראות איך לנהל את סוכני הנתונים והשיחות באמצעות Conversational Analytics API. אפשר לבצע את המשימות הבאות:

קבלת סוכן נתונים

בדוגמת הקוד הבאה אפשר לראות איך מאחזרים סוכן נתונים קיים על ידי שליחת בקשת HTTP GET לכתובת ה-URL של משאב סוכן הנתונים.

data_agent_id = "data_agent_1"
data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"

data_agent_response = requests.get(
    data_agent_url, headers=headers
)

if data_agent_response.status_code == 200:
    print("Fetched Data Agent successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error: {data_agent_response.status_code}")
    print(data_agent_response.text)

בדוגמה הקודמת, מחליפים את data_agent_1 במזהה של סוכן הנתונים שרוצים לאחזר.

הצגת רשימה של סוכני נתונים

בדוגמת הקוד הבאה אפשר לראות איך לשלוח בקשת HTTP‏ GET לנקודת הקצה dataAgents כדי להציג רשימה של כל סוכני הנתונים בפרויקט נתון.

כדי לראות את רשימת כל הסוכנים, צריכה להיות לכם הרשאה geminidataanalytics.dataAgents.list בפרויקט. למידע נוסף על תפקידי IAM שכוללים את ההרשאה הזו, אפשר לעיין ברשימת התפקידים המוגדרים מראש.

billing_project = "YOUR-BILLING-PROJECT"
data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents"

data_agent_response = requests.get(
    data_agent_url, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agents Listed successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Listing Data Agents: {data_agent_response.status_code}")

מחליפים את YOUR-BILLING-PROJECT במזהה פרויקט החיוב.

הצגת רשימה של סוכני נתונים שאפשר לגשת אליהם

בדוגמת הקוד הבאה מוצג איך לשלוח בקשת HTTP‏ GET לנקודת הקצה dataAgents:listAccessible כדי לקבל רשימה של כל סוכני הנתונים שאפשר לגשת אליהם בפרויקט נתון.

billing_project = "YOUR-BILLING-PROJECT"
creator_filter = "YOUR-CREATOR-FILTER"
data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents:listAccessible"

params = {
    "creator_filter": creator_filter
}

data_agent_response = requests.get(
    data_agent_url, headers=headers, params=params
)

if data_agent_response.status_code == 200:
    print("Accessible Data Agents Listed successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Listing Accessible Data Agents: {data_agent_response.status_code}")

מחליפים את הערכים לדוגמה באופן הבא:

  • YOUR-BILLING-PROJECT: מזהה הפרויקט לחיוב.
  • YOUR-CREATOR-FILTER: המסנן להחלה על סמך יוצר סוכן הנתונים. הערכים האפשריים כוללים NONE (ברירת מחדל), CREATOR_ONLY ו-NOT_CREATOR_ONLY.

עדכון סוכן נתונים

בדוגמת הקוד הבאה אפשר לראות איך מעדכנים סוכן נתונים באופן סינכרוני או אסינכרוני על ידי שליחת בקשת HTTP PATCH לכתובת ה-URL של משאב סוכן הנתונים. מטען הייעודי (payload) של הבקשה כולל את הערכים החדשים של השדות שרוצים לשנות, והפרמטרים של הבקשה כוללים פרמטר updateMask שמציין את השדות שצריך לעדכן.

סינכרוני

data_agent_id = "data_agent_1"
billing_project = "YOUR-BILLING-PROJECT"

data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:updateSync"

payload = {
    "description": "Updated description of the data agent.",
    "data_analytics_agent": {
        "published_context": {
            "datasource_references": bigquery_data_sources,
            "system_instruction": system_instruction
        }
    },
}

fields = ["description", "data_analytics_agent"]
params = {
    "updateMask": ",".join(fields)
}

data_agent_response = requests.patch(
    data_agent_url, headers=headers, params=params, json=payload
)

if data_agent_response.status_code == 200:
    print("Data Agent updated successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Updating Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

אסינכרוני

data_agent_id = "data_agent_1"
billing_project = "YOUR-BILLING-PROJECT"

data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"

payload = {
    "description": "Updated description of the data agent.",
    "data_analytics_agent": {
        "published_context": {
            "datasource_references": bigquery_data_sources,
            "system_instruction": system_instruction
        }
    },
}

fields = ["description", "data_analytics_agent"]
params = {
    "updateMask": ",".join(fields)
}

data_agent_response = requests.patch(
    data_agent_url, headers=headers, params=params, json=payload
)

if data_agent_response.status_code == 200:
    print("Data Agent updated successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Updating Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

מחליפים את הערכים לדוגמה באופן הבא:

  • data_agent_1: המזהה של סוכן הנתונים שרוצים לעדכן.
  • YOUR-BILLING-PROJECT: מזהה הפרויקט לחיוב.
  • Updated description of the data agent.: התיאור החדש של סוכן הנתונים.

קבלת מדיניות IAM עבור סוכן נתונים

בדוגמת הקוד הבאה אפשר לראות איך מאחזרים את מדיניות IAM של סוכן נתונים על ידי שליחת בקשת HTTP POST לכתובת ה-URL של סוכן הנתונים. המטען הייעודי (payload) של הבקשה כולל את הנתיב של סוכן הנתונים.

billing_project = "YOUR-BILLING-PROJECT"
data_agent_id = "data_agent_1"

data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:getIamPolicy"

# Request body
payload = {
    "resource": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
}

data_agent_response = requests.post(
    data_agent_url, headers=headers, json=payload
)

if data_agent_response.status_code == 200:
    print("IAM Policy fetched successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error fetching IAM policy: {data_agent_response.status_code}")
    print(data_agent_response.text)

מחליפים את הערכים לדוגמה באופן הבא:

  • YOUR-BILLING-PROJECT: מזהה הפרויקט לחיוב.
  • data_agent_1: המזהה של סוכן הנתונים שעבורו רוצים לקבל את מדיניות ה-IAM.

הגדרת מדיניות IAM לסוכן נתונים

כדי לשתף סוכן, אפשר להשתמש בשיטה setIamPolicy כדי להקצות תפקידי IAM למשתמשים בסוכן ספציפי. בדוגמת הקוד הבאה אפשר לראות איך לבצע קריאה מסוג POST לכתובת ה-URL של סוכן הנתונים עם מטען ייעודי (payload) שכולל קישורים. הקישור מציין אילו תפקידים צריך להקצות לאילו משתמשים.

billing_project = "YOUR-BILLING-PROJECT"
data_agent_id = "data_agent_1"
role = "roles/geminidataanalytics.dataAgentEditor"
users = "222larabrown@gmail.com, cloudysanfrancisco@gmail.com"

data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:setIamPolicy"

# Request body
payload = {
    "policy": {
        "bindings": [
            {
                "role": role,
                "members": [
                    f"user:{i.strip()}" for i in users.split(",")
                ]
            }
        ]
    }
}

data_agent_response = requests.post(
    data_agent_url, headers=headers, json=payload
)

if data_agent_response.status_code == 200:
    print("IAM Policy set successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error setting IAM policy: {data_agent_response.status_code}")
    print(data_agent_response.text)

מחליפים את הערכים לדוגמה באופן הבא:

  • YOUR-BILLING-PROJECT: מזהה הפרויקט לחיוב.
  • data_agent_1: המזהה של סוכן הנתונים שעבורו רוצים להגדיר את מדיניות ה-IAM.
  • 222larabrown@gmail.com, cloudysanfrancisco@gmail.com: רשימה מופרדת בפסיקים של כתובות אימייל של משתמשים שרוצים להקצות להם את התפקיד שצוין.

מחיקה של סוכן נתונים

בדוגמת קוד לדוגמה הבאה אפשר לראות איך לבצע מחיקה עם יכולת שחזור של סוכן נתונים באופן סינכרוני או אסינכרוני על ידי שליחת בקשת HTTP DELETE לכתובת ה-URL של משאב סוכן הנתונים. מחיקה רכה פירושה שהסוכן נמחק אבל עדיין אפשר לשחזר אותו תוך 30 יום.

סינכרוני

billing_project = "YOUR-BILLING-PROJECT"
data_agent_id = "data_agent_1"

data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:deleteSync"

data_agent_response = requests.delete(
    data_agent_url, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agent deleted successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Deleting Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

אסינכרוני

billing_project = "YOUR-BILLING-PROJECT"
data_agent_id = "data_agent_1"

data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"

data_agent_response = requests.delete(
    data_agent_url, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agent deleted successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Deleting Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

מחליפים את הערכים לדוגמה באופן הבא:

  • YOUR-BILLING-PROJECT: מזהה הפרויקט לחיוב.
  • data_agent_1: המזהה של סוכן הנתונים שרוצים למחוק.

קבלת שיחה

קוד לדוגמה הבא מדגים איך לאחזר שיחה קיימת על ידי שליחת בקשת HTTP GET לכתובת ה-URL של משאב השיחה.

billing_project = "YOUR-BILLING-PROJECT"
conversation_id = "conversation_1"

conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations/{conversation_id}"

conversation_response = requests.get(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation fetched successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while fetching conversation: {conversation_response.status_code}")
    print(conversation_response.text)

מחליפים את הערכים לדוגמה באופן הבא:

  • YOUR-BILLING-PROJECT: מזהה הפרויקט לחיוב.
  • conversation_1: המזהה של השיחה שרוצים לאחזר.

רשימת השיחות

בדוגמת הקוד הבאה אפשר לראות איך לשלוח בקשת HTTP‏ GET לנקודת הקצה conversations כדי להציג רשימה של שיחות בפרויקט נתון.

כברירת מחדל, השיטה הזו מחזירה את השיחות שיצרתם. אדמינים (משתמשים עם תפקיד IAM‏ cloudaicompanion.topicAdmin) יכולים לראות את כל השיחות בפרויקט.

billing_project = "YOUR-BILLING-PROJECT"
conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations"

conversation_response = requests.get(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation fetched successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while fetching conversation: {conversation_response.status_code}")
    print(conversation_response.text)

מחליפים את YOUR-BILLING-PROJECT במזהה של פרויקט החיוב שבו הפעלתם את ממשקי ה-API הנדרשים.

הצגת רשימת ההודעות בשיחה

בדוגמת הקוד הבאה מוצג איך לקבל רשימה של כל ההודעות בשיחה על ידי שליחת בקשת HTTP GET לנקודת הקצה messages של השיחה.

כדי להציג את רשימת ההודעות, צריכה להיות לכם הרשאה מסוג cloudaicompanion.topics.get בשיחה.

billing_project = "YOUR-BILLING-PROJECT"

conversation_id = "conversation_1"

conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations/{conversation_id}/messages"

conversation_response = requests.get(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation fetched successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while fetching conversation: {conversation_response.status_code}")
    print(conversation_response.text)

מחליפים את הערכים לדוגמה באופן הבא:

  • YOUR-BILLING-PROJECT: מזהה הפרויקט לחיוב.
  • conversation_1: המזהה של השיחה שרוצים להציג את ההודעות שלה.

איך מוחקים שיחות

קוד לדוגמה הבא מדגים איך למחוק שיחה על ידי שליחת בקשת HTTP DELETE לכתובת ה-URL של משאב השיחה. אדמינים (משתמשים עם התפקיד cloudaicompanion.topicAdmin בניהול הזהויות והרשאות הגישה) או משתמשים עם cloudaicompanion.topics.delete ההרשאה בניהול הזהויות והרשאות הגישה יכולים למחוק שיחות בפרויקט.

billing_project = "YOUR-BILLING-PROJECT"
conversation_id = "conversation_1"

conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations/{conversation_id}"

conversation_response = requests.delete(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation deleted successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while deleting conversation: {conversation_response.status_code}")
    print(conversation_response.text)

מחליפים את הערכים לדוגמה באופן הבא:

  • YOUR-BILLING-PROJECT: מזהה הפרויקט לחיוב.
  • conversation_1: המזהה של השיחה שרוצים למחוק.

איך לשאול שאלות באמצעות ה-API

אחרי שיוצרים סוכן נתונים ושיחה (בצ'אט עם שמירת מצב), אפשר לשלוח שאילתות לסוכן.

כששולחים שאילתה, ה-API מחזיר זרם של אובייקטים מסוג Message. הזרם הזה יכול להכיל סוגים שונים של הודעות, כולל טקסט, טבלאות נתונים ותרשימים. הודעות טקסט יכולות לספק תובנות לגבי ההיגיון של הסוכן, לדווח על ההתקדמות שלו או לתת את התשובה הסופית. המטרה של כל הודעת טקסט מצוינת על ידי הערך TextType שלה:

  • THOUGHT: מציג את תהליך החשיבה הפנימי של הסוכן בזמן שהוא מתכנן איך לענות על השאילתה. הודעות THOUGHT מספקות תובנות שלב אחר שלב לגבי תהליך החשיבה הרציונלית וקבלת ההחלטות של ה-Agent, והן כוללות שני חלקים: parts[0] הוא סיכום החשיבה, שכולל סיכום קצר של טקסט החשיבה המלא, וparts[1] הוא טקסט החשיבה המלא.
  • PROGRESS: דיווח על התקדמות הסוכן בפעולה, כמו אחזור נתונים או הפעלה של כלי. הערך הזה מוחזר רק למקורות נתונים של Looker, והוא מכיל שני חלקים: parts[0] הוא הסיכום, ו-parts[1] הוא הטקסט המלא של ההתקדמות.
  • FINAL_RESPONSE: מספקת את התשובה הסופית לשאילתה.

בדוגמאות שבקטע הבא נעשה שימוש בפונקציות העזר שמוגדרות במאמר פונקציות עזר של Python להזרמת תשובות בצ'אט כדי לעבד ולהציג כל הודעה בתגובה המוזרמת של ה-API. הוראות להצגת ההודעות האלה בממשק משתמש כשמשתמשים במקורות נתונים של Looker מופיעות במאמר הצגת תשובות של סוכנים למקורות נתונים של Looker.

שליחת שאלות בצ'אט עם שמירת מצב ובצ'אט ללא שמירת מצב

אפשר ליצור אינטראקציה עם ה-API במצבים העיקריים הבאים:

  • צ'אט עם שמירת מצב: Google Cloud שומר ומנהל את היסטוריית השיחות. צ'אט עם שמירת מצב הוא מטבעו רב-שלבי, כי ה-API שומר את ההקשר מההודעות הקודמות. צריך לשלוח רק את ההודעה הנוכחית בכל תור.
  • צ'אט ללא שמירת מצב: האפליקציה שלכם מנהלת את היסטוריית השיחות. צריך לכלול את ההודעות הקודמות הרלוונטיות בכל הודעה חדשה. דוגמאות מפורטות לאופן ניהול שיחות מרובות תורות במצב בלי שמירת מצב זמינות במאמר יצירת שיחה מרובת תורות בלי שמירת מצב.

בדוגמאות הבאות מוצגות בקשות POST לנקודת הקצה :chat, שמדגימות איך להשתמש ב-API לצ'אט עם שמירת מצב ולצ'אט בלי שמירת מצב.

צ'אט עם שמירת מצב

שליחת בקשה לצ'אט עם שמירת מצב עם הפניה לשיחה

בדוגמת הקוד הבאה אפשר לראות איך שואלים שאלות ב-API באמצעות השיחה שהגדרתם בשלבים הקודמים. בדוגמה הזו נעשה שימוש בפונקציית העזר get_stream כדי להזרים את התשובה.

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

data_agent_id = "data_agent_1"
conversation_id = "conversation_1"

# Construct the payload
chat_payload = {
    "parent": f"projects/{billing_project}/locations/global",
    "messages": [
        {
            "userMessage": {
                "text": "Make a bar graph for the top 5 states by the total number of airports"
            }
        }
    ],
    "conversation_reference": {
        "conversation": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}",
        "data_agent_context": {
            "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
            # "credentials": looker_credentials
        }
    }
}

# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)

מחליפים את הערכים לדוגמה באופן הבא:

  • data_agent_1: המזהה של סוכן הנתונים, כפי שמוגדר בבלוק הקוד לדוגמה במאמר יצירת סוכן נתונים.
  • conversation_1: מזהה ייחודי של השיחה.
  • ההנחיה לדוגמה הייתה Make a bar graph for the top 5 states by the total number of airports.

צ'אט ללא שמירת מצב

בדוגמאות הבאות מוצגות דרכים לשליחת בקשות בלי שמירת מצב באמצעות הפניה לסוכן נתונים או באמצעות הקשר מוטבע.

שליחת בקשת צ'אט בלי שמירת מצב עם הפניה לסוכן נתונים

בדוגמת הקוד הבאה אפשר לראות איך שואלים את ה-API שאלה בלי שמירת מצב באמצעות סוכן הנתונים שהגדרתם בשלבים הקודמים. בדוגמה הזו נעשה שימוש בפונקציית העזר get_stream כדי להזרים את התשובה.

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

data_agent_id = "data_agent_1"

# Construct the payload
chat_payload = {
    "parent": f"projects/{billing_project}/locations/global",
    "messages": [
        {
            "userMessage": {
                "text": "Make a bar graph for the top 5 states by the total number of airports"
            }
        }
    ],
    "data_agent_context": {
        "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
        # "credentials": looker_credentials
    }
}

# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)

מחליפים את הערכים לדוגמה באופן הבא:

  • data_agent_1: המזהה של סוכן הנתונים, כפי שמוגדר בבלוק הקוד לדוגמה במאמר יצירת סוכן נתונים.
  • ההנחיה לדוגמה הייתה Make a bar graph for the top 5 states by the total number of airports.

שליחת בקשה לצ'אט בלי שמירת מצב עם הקשר מוטבע

בדוגמת הקוד הבאה אפשר לראות איך שואלים את ה-API שאלה בלי שמירת מצב באמצעות הקשר בתוך השורה. בדוגמה הזו נעשה שימוש בפונקציית העזר get_stream כדי להזרים את התגובה, ומקור נתונים של BigQuery משמש כדוגמה.

אפשר גם להפעיל ניתוח מתקדם באמצעות Python על ידי הוספת הפרמטר options למטען הייעודי (payload) של הבקשה. מידע נוסף על הפרמטר options ועל האפשרויות שאפשר להגדיר לשיחה זמין בדף REST Resource: projects.locations.dataAgents.

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"

# Construct the payload
chat_payload = {
    "parent": f"projects/{billing_project}/locations/global",
    "messages": [
        {
            "userMessage": {
                "text": "Make a bar graph for the top 5 states by the total number of airports"
            }
        }
    ],
    "inline_context": {
        "datasource_references": bigquery_data_sources,
          # Optional: To enable advanced analysis with Python, include the following options block:
          "options": {
              "analysis": {
                  "python": {
                      "enabled": True
                  }
              }
          }
    }
}

# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)

יצירת שיחה מרובת תפניות ללא שמירת מצב

כדי לשאול שאלות להמשך בשיחה בלי שמירת מצב, האפליקציה צריכה לנהל את ההקשר של השיחה על ידי שליחת היסטוריית ההודעות המלאה עם כל בקשה חדשה. בקטעים הבאים מוסבר איך להגדיר פונקציות עזר ולקרוא להן כדי ליצור שיחה מרובת תפניות:

שליחת בקשות מרובות תגובות

פונקציית העזרה הבאה multi_turn_Conversation מנהלת את ההקשר של השיחה על ידי אחסון ההודעות ברשימה. כך תוכלו לשלוח שאלות המשך שמבוססות על תגובות קודמות. במטען הייעודי (payload) של הפונקציה, אפשר להפנות לסוכן נתונים או לספק את מקור הנתונים ישירות באמצעות הקשר המוטבע.

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"

# List that is used to track previous turns and is reused across requests
conversation_messages = []

data_agent_id = "data_agent_1"

# Helper function for calling the API
def multi_turn_Conversation(msg):

  userMessage = {
      "userMessage": {
          "text": msg
      }
  }

  # Send a multi-turn request by including previous turns and the new message
  conversation_messages.append(userMessage)

  # Construct the payload
  chat_payload = {
      "parent": f"projects/{billing_project}/locations/global",
      "messages": conversation_messages,
      # Use a data agent reference
      "data_agent_context": {
          "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
          # "credentials": looker_credentials
      },
      # Use inline context
      # "inline_context": {
      #     "datasource_references": bigquery_data_sources,
      # }
  }

  # Call the get_stream_multi_turn helper function to stream the response
  get_stream_multi_turn(chat_url, chat_payload, conversation_messages)

בדוגמה הקודמת, מחליפים את data_agent_1 במזהה של סוכן הנתונים, כפי שמוגדר בבלוק הקוד לדוגמה במאמר יצירת סוכן נתונים.

אפשר לקרוא לפונקציית העזר multi_turn_Conversation בכל תור של השיחה. קוד לדוגמה הבא מראה איך לשלוח בקשה ראשונית ואז בקשת המשך שמבוססת על התשובה הקודמת.

# Send first-turn request
multi_turn_Conversation("Which species of tree is most prevalent?")

# Send follow-up-turn request
multi_turn_Conversation("Can you show me the results as a bar chart?")

בדוגמה הקודמת, מחליפים את הערכים לדוגמה באופן הבא:

  • Which species of tree is most prevalent?: שאלה בשפה טבעית לשליחה לסוכן הנתונים.
  • Can you show me the results as a bar chart?: שאלה המשך שמבוססת על השאלה הקודמת או משפרת אותה.

עיבוד התשובות

הפונקציה הבאה get_stream_multi_turn מעבדת את התשובה של ה-API לסטרימינג. הפונקציה הזו דומה לפונקציית העזר get_stream, אבל היא שומרת את התגובה ברשימה conversation_messages כדי לשמור את הקשר השיחה לתור הבא.

def get_stream_multi_turn(url, json, conversation_messages):
    s = requests.Session()

    acc = ''

    with s.post(url, json=json, headers=headers, stream=True) as resp:
        for line in resp.iter_lines():
            if not line:
                continue

            decoded_line = str(line, encoding='utf-8')

            if decoded_line == '[{':
                acc = '{'
            elif decoded_line == '}]':
                acc += '}'
            elif decoded_line == ',':
                continue
            else:
                acc += decoded_line

            if not is_json(acc):
                continue

            data_json = json_lib.loads(acc)
            # Store the response that will be used in the next iteration
            conversation_messages.append(data_json)

            if not 'systemMessage' in data_json:
                if 'error' in data_json:
                    handle_error(data_json['error'])
                continue

            if 'text' in data_json['systemMessage']:
                handle_text_response(data_json['systemMessage']['text'])
            elif 'schema' in data_json['systemMessage']:
                handle_schema_response(data_json['systemMessage']['schema'])
            elif 'data' in data_json['systemMessage']:
                handle_data_response(data_json['systemMessage']['data'])
            elif 'chart' in data_json['systemMessage']:
                handle_chart_response(data_json['systemMessage']['chart'])
            else:
                colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter())
                print(colored_json)
            print('\n')
            acc = ''

דוגמת קוד מקצה לקצה

דוגמת הקוד הבאה ניתנת להרחבה ומכילה את כל המשימות שמוסברות במדריך הזה.

פיתוח סוכן נתונים באמצעות HTTP ו-Python

    import json
    import json as json_lib
    import textwrap

    import altair as alt
    import google.auth
    from google.auth.transport.requests import Request
    import IPython
    from IPython.display import display, HTML
    import pandas as pd
    from pygments import formatters, highlight, lexers
    import requests

    from google.colab import auth
    auth.authenticate_user()

    access_token = !gcloud auth application-default print-access-token
    headers = {
        "Authorization": f"Bearer {access_token[0]}",
        "Content-Type": "application/json",
        "x-server-timeout": "300", # Custom timeout up to 600s
    }

    ################### Data source details ###################

    billing_project = "your_billing_project"
    location = "global"
    system_instruction = "Help the user in analyzing their data"


    # BigQuery data source
    bigquery_data_sources = {
        "bq": {
        "tableReferences": [
            {
            "projectId": "bigquery-public-data",
            "datasetId": "san_francisco",
            "tableId": "street_trees"
            }
        ]
        }
    }

    # Looker data source
    looker_credentials = {
        "oauth": {
            "secret": {
            "client_id": "your_looker_client_id",
            "client_secret": "your_looker_client_secret",
            }
        }
    }

    # To use access_token for authentication, uncomment the following looker_credentials code block and comment out the previous looker_credentials code block.
    # looker_credentials = {
    #     "oauth": {
    #         "token": {
    #           "access_token": "your_looker_access_token",
    #         }
    #     }
    # }

    looker_data_source = {
        "looker": {
        "explore_references": [
            {
                "looker_instance_uri": looker_instance_uri,
                "lookml_model": "your_model",
                "explore": "your_explore",
            },
            {
                "looker_instance_uri": looker_instance_uri,
                "lookml_model": "your_model_2",
                "explore": "your_explore_2",
            },
            # Add up to 5 total Explore references
        ],
        # Do not include the following line during agent creation
        # "credentials": looker_credentials
    }

    # Looker Studio data source
    looker_studio_data_source = {
        "studio":{
            "studio_references": [
                {
                  "studio_datasource_id": "studio_datasource_id"
                }
            ]
        }
    }

    ################### Create data agent ###################
    data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents"

    data_agent_id = "data_agent_1"

    data_agent_payload = {
        "name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional
        "description": "This is the description of data_agent.", # Optional

        "data_analytics_agent": {
            "published_context": {
                "datasource_references": bigquery_data_sources,
                "system_instruction": system_instruction,
                # Optional: To enable advanced analysis with Python, include the following options block:
                "options": {
                    "analysis": {
                        "python": {
                            "enabled": True
                        }
                    }
                }
            }
        }
    }

    params = {"data_agent_id": data_agent_id} # Optional

    data_agent_response = requests.post(
        data_agent_url, params=params, json=data_agent_payload, headers=headers
    )

    if data_agent_response.status_code == 200:
        print("Data Agent created successfully!")
        print(json.dumps(data_agent_response.json(), indent=2))
    else:
        print(f"Error creating Data Agent: {data_agent_response.status_code}")
        print(data_agent_response.text)


    ################### Create conversation ###################

    conversation_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/conversations"

    data_agent_id = "data_agent_1"
    conversation_id = "conversation _1"

    conversation_payload = {
        "agents": [
            f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
        ],
        "name": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}"
    }
    params = {
        "conversation_id": conversation_id
    }

    conversation_response = requests.post(conversation_url, headers=headers, params=params, json=conversation_payload)

    if conversation_response.status_code == 200:
        print("Conversation created successfully!")
        print(json.dumps(conversation_response.json(), indent=2))
    else:
        print(f"Error creating Conversation: {conversation_response.status_code}")
        print(conversation_response.text)


    ################### Chat with the API by using conversation (stateful) ####################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

    data_agent_id = "data_agent_1"
    conversation_id = "conversation _1"

    # Construct the payload
    chat_payload = {
        "parent": f"projects/{billing_project}/locations/global",
        "messages": [
            {
                "userMessage": {
                    "text": "Make a bar graph for the top 5 states by the total number of airports"
                }
            }
        ],
        "conversation_reference": {
            "conversation": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}",
            "data_agent_context": {
                "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
                # "credentials": looker_credentials
            }
        }
    }

    # Call the get_stream function to stream the response
    get_stream(chat_url, chat_payload)

    ################### Chat with the API by using dataAgents (stateless) ####################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

    data_agent_id = "data_agent_1"

    # Construct the payload
    chat_payload = {
        "parent": f"projects/{billing_project}/locations/global",
        "messages": [
            {
                "userMessage": {
                    "text": "Make a bar graph for the top 5 states by the total number of airports"
                }
            }
        ],
        "data_agent_context": {
            "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
            # "credentials": looker_credentials
        }
    }

    # Call the get_stream function to stream the response
    get_stream(chat_url, chat_payload)

    ################### Chat with the API by using inline context (stateless) ####################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"

    # Construct the payload
    chat_payload = {
        "parent": f"projects/{billing_project}/locations/global",
        "messages": [
            {
                "userMessage": {
                    "text": "Make a bar graph for the top 5 states by the total number of airports"
                }
            }
        ],
        "inline_context": {
            "datasource_references": bigquery_data_sources,
            # Optional - if wanting to use advanced analysis with python
            "options": {
                "analysis": {
                    "python": {
                        "enabled": True
                    }
                }
            }
        }
    }

    # Call the get_stream function to stream the response
    get_stream(chat_url, chat_payload)

    ################### Multi-turn conversation ###################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"

    # List that is used to track previous turns and is reused across requests
    conversation_messages = []

    data_agent_id = "data_agent_1"

    # Helper function for calling the API
    def multi_turn_Conversation(msg):

      userMessage = {
          "userMessage": {
              "text": msg
          }
      }

      # Send a multi-turn request by including previous turns and the new message
      conversation_messages.append(userMessage)

      # Construct the payload
      chat_payload = {
          "parent": f"projects/{billing_project}/locations/global",
          "messages": conversation_messages,
          # Use a data agent reference
          "data_agent_context": {
              "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
              # "credentials": looker_credentials
          },
          # Use inline context
          # "inline_context": {
          #     "datasource_references": bigquery_data_sources,
          # }
      }

      # Call the get_stream_multi_turn helper function to stream the response
      get_stream_multi_turn(chat_url, chat_payload, conversation_messages)

    # Send first-turn request
    multi_turn_Conversation("Which species of tree is most prevalent?")

    # Send follow-up-turn request
    multi_turn_Conversation("Can you show me the results as a bar chart?")
    

דוגמת הקוד הבאה ניתנת להרחבה ומכילה את פונקציות העזר של Python שמשמשות לסטרימינג של תשובות בצ'אט.

פונקציות עזר של Python להזרמת תשובות בצ'אט

    def is_json(str):
      try:
          json_object = json_lib.loads(str)
      except ValueError as e:
          return False
      return True

    def handle_text_response(resp):
      parts = resp['parts']
      full_text = "".join(parts)
      if "\n" not in full_text and len(full_text) > 80:
        wrapped_text = textwrap.fill(full_text, width=80)
        print(wrapped_text)
      else:
        print(full_text)

    def get_property(data, field_name, default = ''):
      return data[field_name] if field_name in data else default

    def display_schema(data):
      fields = data['fields']
      df = pd.DataFrame({
        "Column": map(lambda field: get_property(field, 'name'), fields),
        "Type": map(lambda field: get_property(field, 'type'), fields),
        "Description": map(lambda field: get_property(field, 'description', '-'), fields),
        "Mode": map(lambda field: get_property(field, 'mode'), fields)
      })
      display(df)

    def display_section_title(text):
      display(HTML('<h2>{}</h2>'.format(text)))

    def format_bq_table_ref(table_ref):
      return '{}.{}.{}'.format(table_ref['projectId'], table_ref['datasetId'], table_ref['tableId'])

    def format_looker_table_ref(table_ref):
      return 'lookmlModel: {}, explore: {}, lookerInstanceUri: {}'.format(table_ref['lookmlModel'], table_ref['explore'], table_ref['lookerInstanceUri'])

    def display_datasource(datasource):
      source_name = ''

      if 'studioDatasourceId' in datasource:
        source_name = datasource['studioDatasourceId']
      elif 'lookerExploreReference' in datasource:
        source_name = format_looker_table_ref(datasource['lookerExploreReference'])
      else:
        source_name = format_bq_table_ref(datasource['bigqueryTableReference'])

      print(source_name)
      display_schema(datasource['schema'])

    def handle_schema_response(resp):
      if 'query' in resp:
        print(resp['query']['question'])
      elif 'result' in resp:
        display_section_title('Schema resolved')
        print('Data sources:')
        for datasource in resp['result']['datasources']:
          display_datasource(datasource)

    def handle_data_response(resp):
      if 'query' in resp:
        query = resp['query']
        display_section_title('Retrieval query')
        print('Query name: {}'.format(query['name']))
        if 'question' in query:
          print('Question: {}'.format(query['question']))
        if 'datasources' in query:
          print('Data sources:')
          for datasource in query['datasources']:
            display_datasource(datasource)
      elif 'generatedSql' in resp:
        display_section_title('SQL generated')
        print(resp['generatedSql'])
      elif 'result' in resp:
        display_section_title('Data retrieved')

        fields = map(lambda field: get_property(field, 'name'), resp['result']['schema']['fields'])
        dict = {}

        for field in fields:
          dict[field] = map(lambda el: get_property(el, field), resp['result']['data'])

        display(pd.DataFrame(dict))

    def handle_chart_response(resp):
      if 'query' in resp:
        print(resp['query']['instructions'])
      elif 'result' in resp:
        vegaConfig = resp['result']['vegaConfig']
        alt.Chart.from_json(json_lib.dumps(vegaConfig)).display();

    def handle_error(resp):
      display_section_title('Error')
      print('Code: {}'.format(resp['code']))
      print('Message: {}'.format(resp['message']))

    def get_stream(url, json):
      s = requests.Session()

      acc = ''

      with s.post(url, json=json, headers=headers, stream=True) as resp:
        for line in resp.iter_lines():
          if not line:
            continue

          decoded_line = str(line, encoding='utf-8')

          if decoded_line == '[{':
            acc = '{'
          elif decoded_line == '}]':
            acc += '}'
          elif decoded_line == ',':
            continue
          else:
            acc += decoded_line

          if not is_json(acc):
            continue

          data_json = json_lib.loads(acc)

          if not 'systemMessage' in data_json:
            if 'error' in data_json:
                handle_error(data_json['error'])
            continue

          if 'text' in data_json['systemMessage']:
            handle_text_response(data_json['systemMessage']['text'])
          elif 'schema' in data_json['systemMessage']:
            handle_schema_response(data_json['systemMessage']['schema'])
          elif 'data' in data_json['systemMessage']:
            handle_data_response(data_json['systemMessage']['data'])
          elif 'chart' in data_json['systemMessage']:
            handle_chart_response(data_json['systemMessage']['chart'])
          else:
            colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter())
            print(colored_json)
            print('\n')
            acc = ''

    def get_stream_multi_turn(url, json, conversation_messages):
        s = requests.Session()

        acc = ''

        with s.post(url, json=json, headers=headers, stream=True) as resp:
            for line in resp.iter_lines():
                if not line:
                    continue

                decoded_line = str(line, encoding='utf-8')

                if decoded_line == '[{':
                    acc = '{'
                elif decoded_line == '}]':
                    acc += '}'
                elif decoded_line == ',':
                    continue
                else:
                    acc += decoded_line

                if not is_json(acc):
                    continue

                data_json = json_lib.loads(acc)
                # Store the response that will be used in the next iteration
                conversation_messages.append(data_json)

                if not 'systemMessage' in data_json:
                    if 'error' in data_json:
                        handle_error(data_json['error'])
                    continue

                if 'text' in data_json['systemMessage']:
                    handle_text_response(data_json['systemMessage']['text'])
                elif 'schema' in data_json['systemMessage']:
                    handle_schema_response(data_json['systemMessage']['schema'])
                elif 'data' in data_json['systemMessage']:
                    handle_data_response(data_json['systemMessage']['data'])
                elif 'chart' in data_json['systemMessage']:
                    handle_chart_response(data_json['systemMessage']['chart'])
                else:
                    colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter())
                    print(colored_json)
                print('\n')
                acc = ''