בדף הזה מוסבר איך להשתמש ב-Python כדי לשלוח בקשות HTTP אל Conversational Analytics API (שניתן לגשת אליו דרך geminidataanalytics.googleapis.com).
בדף הזה יש דוגמה לקוד Python שמראה איך לבצע את הפעולות הבאות:
- הגדרת ההגדרות הראשוניות והאימות
- חיבור למקור נתונים של Looker, BigQuery או Looker Studio
- יצירת סוכן נתונים
- איך יוצרים שיחה
- ניהול סוכני נתונים ושיחות
- שימוש ב-API כדי לשאול שאלות
- יצירת שיחה מרובת תפניות ללא שמירת מצב
בסוף הדף מופיעה גרסה מלאה של קוד לדוגמה, וגם פונקציות עזר שמשמשות להזרמת התגובה של ה-API.
הגדרת ההגדרות הראשוניות והאימות
קוד Python לדוגמה שמבצע את המשימות האלה:
- ייבוא של ספריות Python הנדרשות
- קבלת אסימון גישה לאימות HTTP באמצעות Google Cloud CLI
- הגדרת משתנים לפרויקט החיוב, למיקום ולהוראות המערכת
import json
import json as json_lib
import textwrap
import altair as alt
import google.auth
from google.auth.transport.requests import Request
import IPython
from IPython.display import display, HTML
import pandas as pd
from pygments import highlight, lexers, formatters
import requests
from google.colab import auth
auth.authenticate_user()
access_token = !gcloud auth application-default print-access-token
headers = {
"Authorization": f"Bearer {access_token[0]}",
"Content-Type": "application/json",
"x-server-timeout": "300", # Custom timeout up to 600s
}
billing_project = 'YOUR-BILLING-PROJECT'
location = "global"
system_instruction = 'YOUR-SYSTEM-INSTRUCTIONS'
מחליפים את הערכים לדוגמה באופן הבא:
- YOUR-BILLING-PROJECT: המזהה של פרויקט החיוב שבו הפעלתם את ממשקי ה-API הנדרשים.
- YOUR-SYSTEM-INSTRUCTIONS: הוראות למערכת שמנחות את התנהגות הסוכן ומאפשרות להתאים אותה לצורכי הנתונים שלכם. לדוגמה, אפשר להשתמש בהוראות מערכת כדי להגדיר מונחים עסקיים, לשלוט באורך התשובה או להגדיר את עיצוב הנתונים. מומלץ להגדיר את ההוראות למערכת באמצעות פורמט ה-YAML המומלץ במאמר איך כותבים הוראות למערכת בצורה יעילה כדי לספק הנחיות מפורטות ומובְנות.
אימות ל-Looker
אם אתם מתכננים להתחבר למקור נתונים של Looker, תצטרכו לבצע אימות למכונה של Looker.
שימוש במפתחות API
בדוגמת הקוד הבאה ב-Python אפשר לראות איך מאמתים את הסוכן למופע Looker באמצעות מפתחות API.
looker_credentials = {
"oauth": {
"secret": {
"client_id": "YOUR-LOOKER-CLIENT-ID",
"client_secret": "YOUR-LOOKER-CLIENT-SECRET",
}
}
}
מחליפים את הערכים לדוגמה באופן הבא:
- YOUR-LOOKER-CLIENT-ID: מזהה הלקוח של מפתח Looker API שנוצר.
- YOUR-LOOKER-CLIENT-SECRET: ה-client secret של מפתח Looker API שנוצר.
שימוש באסימוני גישה
בדוגמת הקוד הבאה ב-Python אפשר לראות איך מאמתים את הסוכן למופע Looker באמצעות טוקנים של גישה.
looker_credentials = {
"oauth": {
"token": {
"access_token": "YOUR-TOKEN",
}
}
}
מחליפים את הערכים לדוגמה באופן הבא:
- YOUR-TOKEN: הערך
access_tokenשיוצרים כדי לבצע אימות ל-Looker.
התחברות למקור נתונים
בקטעים הבאים מוסבר איך מגדירים את פרטי החיבור למקורות הנתונים של הסוכן. הסוכן יכול להתחבר לנתונים ב-Looker, ב-BigQuery או ב-Looker Studio.
חיבור לנתונים ב-Looker
בדוגמת הקוד הבאה מוגדר חיבור ל-Looker Explore. כדי ליצור חיבור למופע Looker, צריך לוודא שיצרתם מפתחות Looker API, כמו שמתואר במאמר אימות וחיבור למקור נתונים באמצעות Conversational Analytics API. אפשר להתחבר לעד חמישה Looker Explores בו-זמנית באמצעות Conversational Analytics API.
כשמתחברים למקור נתונים של Looker, חשוב לשים לב לנקודות הבאות:
- אתם יכולים לשאול שאלות על כל דוח Explore שכלול בשיחה.
- סוכן יכול לשלוח שאילתה רק למסמך Explore אחד בכל פעם. אי אפשר להריץ שאילתות בכמה דוחות Explore בו-זמנית.
- נציג יכול לשאול כמה שאלות ב-Explore באותה שיחה.
סוכן יכול לשלוח שאילתות לכמה ניתוחים בשיחה שכוללת שאלות עם כמה חלקים, או בשיחות שכוללות שאלות המשך.
לדוגמה: משתמש מחבר שני דוחות Explore, אחד בשם
cat-exploreואחד בשםdog-explore. המשתמש מזין את השאלה 'מה גדול יותר: מספר החתולים או מספר הכלבים?' כך יישאלו שתי שאילתות: אחת לספירת מספר החתולים ב-cat-exploreואחת לספירת מספר הכלבים ב-dog-explore. הנציג משווה את המספר משתי השאילתות אחרי שהוא משלים את שתיהן.
בדוגמת הקוד הבאה מוגדר חיבור לכמה אפשרויות נוספות ב-Looker. כדי לשפר את הביצועים של הסוכן, אתם יכולים לספק שאילתות לדוגמה כהקשר מובנה לניתוחים שלכם. מידע נוסף זמין במאמר בנושא הגדרת הקשר של סוכן נתונים למקורות נתונים של Looker.
looker_instance_uri = "https://your_company.looker.com"
looker_data_source = {
"looker": {
"explore_references": [
{
"looker_instance_uri": looker_instance_uri,
"lookml_model": "your_model",
"explore": "your_explore",
},
{
"looker_instance_uri": looker_instance_uri,
"lookml_model": "your_model_2",
"explore": "your_explore_2",
},
# Add up to 5 total Explore references
],
# Do not include the following line during agent creation
"credentials": looker_credentials
}
}
מחליפים את הערכים לדוגמה באופן הבא:
- https://your_company.looker.com: כתובת ה-URL המלאה של מכונת Looker.
- your_model: השם של מודל LookML שכולל את כלי הניתוח שרוצים להתחבר אליו.
- your_explore: השם של Looker Explore שרוצים שהסוכן לגישה לנתונים ישלח אליו שאילתה.
- your_model_2: השם של מודל LookML השני שכולל את התכונה Explore שאליה רוצים להתחבר. אפשר לחזור על התהליך הזה כדי להוסיף משתנים למודלים נוספים, עד חמישה ניתוחים.
- your_explore_2: השם של Looker Explore נוסף שרוצים שהסוכן לגישה לנתונים יריץ עליו שאילתה. אפשר לחזור על המשתנה הזה כדי לכלול עד חמישה ניתוחים.
התחברות לנתוני BigQuery
ב-Conversational Analytics API, אין מגבלות קשיחות על מספר הטבלאות ב-BigQuery שאפשר להתחבר אליהן. עם זאת, חיבור למספר גדול של טבלאות עלול להפחית את רמת הדיוק או לגרום לחריגה ממגבלת הטוקנים של הקלט במודל.
קוד לדוגמה הבא מגדיר חיבור לכמה טבלאות ב-BigQuery, והוא כולל דוגמאות לשדות אופציונליים של הקשר מובנה. כדי לשפר את הביצועים של הסוכן, אתם יכולים לספק הקשר מובנה לטבלאות BigQuery, כמו תיאורים של טבלאות ועמודות, מילים נרדפות, תגים ושאילתות לדוגמה. מידע נוסף זמין במאמר בנושא הגדרת הקשר של סוכן הנתונים למקורות נתונים של BigQuery.
bigquery_data_sources = {
"bq": {
"tableReferences": [
{
"projectId": "my_project_id",
"datasetId": "my_dataset_id",
"tableId": "my_table_id",
"schema": {
"description": "my_table_description",
"fields": [{
"name": "my_column_name",
"description": "my_column_description"
}]
}
},
{
"projectId": "my_project_id_2",
"datasetId": "my_dataset_id_2",
"tableId": "my_table_id_2"
},
{
"projectId": "my_project_id_3",
"datasetId": "my_dataset_id_3",
"tableId": "my_table_id_3"
},
]
}
}
מחליפים את הערכים לדוגמה באופן הבא:
- my_project_id: המזהה של הפרויקט Google Cloud שמכיל את מערך הנתונים והטבלה ב-BigQuery שאליהם רוצים להתחבר. כדי להתחבר למערך נתונים ציבורי, מציינים
bigquery-public-data. - my_dataset_id: המזהה של מערך הנתונים ב-BigQuery.
- my_table_id: המזהה של הטבלה ב-BigQuery.
- my_table_description: תיאור אופציונלי של תוכן הטבלה והמטרה שלה.
- my_column_name: שם העמודה בטבלה שרוצים לספק לה תיאור אופציונלי.
- my_column_description: תיאור אופציונלי של התוכן והמטרה של העמודה.
חיבור לנתונים ב-Looker Studio
בדוגמת הקוד הבאה מוגדר חיבור למקור נתונים ב-Looker Studio.
looker_studio_data_source = {
"studio":{
"studio_references": [
{
"studio_datasource_id": "studio_datasource_id"
}
]
}
}
מחליפים את studio_datasource_id במזהה של מקור הנתונים.
יצירת סוכן נתונים
קוד לדוגמה הבא מדגים איך ליצור את סוכן הנתונים באופן סינכרוני או אסינכרוני על ידי שליחת בקשת HTTP POST לנקודת הקצה ליצירת סוכן נתונים. מטען הייעודי (payload) של הבקשה כולל את הפרטים הבאים:
- השם המלא של המשאב של הסוכן. הערך הזה כולל את מזהה הפרויקט, המיקום ומזהה ייחודי של הסוכן.
- תיאור של סוכן הנתונים.
- ההקשר של סוכן הנתונים, כולל תיאור המערכת (מוגדר בקטע הגדרת ההגדרות הראשוניות והאימות) ומקור הנתונים שבו הסוכן משתמש (מוגדר בקטע קישור למקור נתונים).
אפשר גם להפעיל ניתוח מתקדם באמצעות Python על ידי הוספת הפרמטר options למטען הייעודי (payload) של הבקשה. מידע נוסף על הפרמטר options ועל האפשרויות שאפשר להגדיר לשיחה זמין במאמר REST Resource: projects.locations.dataAgents.
סינכרוני
data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents:createSync"
data_agent_id = "data_agent_1"
data_agent_payload = {
"name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional
"description": "This is the description of data_agent_1.", # Optional
"data_analytics_agent": {
"published_context": {
"datasource_references": bigquery_data_sources,
"system_instruction": system_instruction,
# Optional: To enable advanced analysis with Python, include the following options block:
"options": {
"analysis": {
"python": {
"enabled": True
}
}
}
}
}
}
params = {"data_agent_id": data_agent_id} # Optional
data_agent_response = requests.post(
data_agent_url, params=params, json=data_agent_payload, headers=headers
)
if data_agent_response.status_code == 200:
print("Data Agent created successfully!")
print(json.dumps(data_agent_response.json(), indent=2))
else:
print(f"Error creating Data Agent: {data_agent_response.status_code}")
print(data_agent_response.text)
אסינכרוני
data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents"
data_agent_id = "data_agent_1"
data_agent_payload = {
"name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional
"description": "This is the description of data_agent_1.", # Optional
"data_analytics_agent": {
"published_context": {
"datasource_references": bigquery_data_sources,
"system_instruction": system_instruction,
# Optional: To enable advanced analysis with Python, include the following options block:
"options": {
"analysis": {
"python": {
"enabled": True
}
}
}
}
}
}
params = {"data_agent_id": data_agent_id} # Optional
data_agent_response = requests.post(
data_agent_url, params=params, json=data_agent_payload, headers=headers
)
if data_agent_response.status_code == 200:
print("Data Agent created successfully!")
print(json.dumps(data_agent_response.json(), indent=2))
else:
print(f"Error creating Data Agent: {data_agent_response.status_code}")
print(data_agent_response.text)
מחליפים את הערכים לדוגמה באופן הבא:
- data_agent_1: מזהה ייחודי של סוכן הנתונים. הערך הזה משמש בשם המשאב של הסוכן וכפרמטר השאילתה של כתובת האתר
data_agent_id. - This is the description of data_agent_1.: תיאור של סוכן הנתונים.
יצירת שיחה
בדוגמת הקוד הבאה אפשר לראות איך יוצרים שיחה עם סוכן הנתונים.
conversation_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/conversations"
data_agent_id = "data_agent_1"
conversation_id = "conversation_1"
conversation_payload = {
"agents": [
f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
],
"name": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}"
}
params = {
"conversation_id": conversation_id
}
conversation_response = requests.post(conversation_url, headers=headers, params=params, json=conversation_payload)
if conversation_response.status_code == 200:
print("Conversation created successfully!")
print(json.dumps(conversation_response.json(), indent=2))
else:
print(f"Error creating Conversation: {conversation_response.status_code}")
print(conversation_response.text)
מחליפים את הערכים לדוגמה באופן הבא:
- data_agent_1: המזהה של סוכן הנתונים, כפי שמוגדר בבלוק הקוד לדוגמה במאמר יצירת סוכן נתונים.
- conversation_1: מזהה ייחודי של השיחה.
ניהול סוכני נתונים ושיחות
בדוגמאות הקוד הבאות אפשר לראות איך לנהל את סוכני הנתונים והשיחות באמצעות Conversational Analytics API. אפשר לבצע את המשימות הבאות:
- קבלת סוכן נתונים
- רשימת סוכני נתונים
- רשימת סוכני נתונים שאפשר לגשת אליהם
- עדכון סוכן נתונים
- קבלת מדיניות IAM עבור סוכן נתונים
- הגדרת מדיניות IAM לסוכן נתונים
- מחיקת סוכן נתונים
- איך מקבלים שיחה
- רשימת שיחות
- איך רואים את רשימת ההודעות בשיחה
קבלת סוכן נתונים
בדוגמת הקוד הבאה אפשר לראות איך מאחזרים סוכן נתונים קיים על ידי שליחת בקשת HTTP GET לכתובת ה-URL של משאב סוכן הנתונים.
data_agent_id = "data_agent_1"
data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
data_agent_response = requests.get(
data_agent_url, headers=headers
)
if data_agent_response.status_code == 200:
print("Fetched Data Agent successfully!")
print(json.dumps(data_agent_response.json(), indent=2))
else:
print(f"Error: {data_agent_response.status_code}")
print(data_agent_response.text)
בדוגמה הקודמת, מחליפים את data_agent_1 במזהה של סוכן הנתונים שרוצים לאחזר.
הצגת רשימה של סוכני נתונים
בדוגמת הקוד הבאה אפשר לראות איך לשלוח בקשת HTTP GET לנקודת הקצה dataAgents כדי להציג רשימה של כל סוכני הנתונים בפרויקט נתון.
כדי לראות את רשימת כל הסוכנים, צריכה להיות לכם הרשאה geminidataanalytics.dataAgents.list בפרויקט. למידע נוסף על תפקידי IAM שכוללים את ההרשאה הזו, אפשר לעיין ברשימת התפקידים המוגדרים מראש.
billing_project = "YOUR-BILLING-PROJECT"
data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents"
data_agent_response = requests.get(
data_agent_url, headers=headers
)
if data_agent_response.status_code == 200:
print("Data Agents Listed successfully!")
print(json.dumps(data_agent_response.json(), indent=2))
else:
print(f"Error Listing Data Agents: {data_agent_response.status_code}")
מחליפים את YOUR-BILLING-PROJECT במזהה פרויקט החיוב.
הצגת רשימה של סוכני נתונים שאפשר לגשת אליהם
בדוגמת הקוד הבאה מוצג איך לשלוח בקשת HTTP GET לנקודת הקצה dataAgents:listAccessible כדי לקבל רשימה של כל סוכני הנתונים שאפשר לגשת אליהם בפרויקט נתון.
billing_project = "YOUR-BILLING-PROJECT"
creator_filter = "YOUR-CREATOR-FILTER"
data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents:listAccessible"
params = {
"creator_filter": creator_filter
}
data_agent_response = requests.get(
data_agent_url, headers=headers, params=params
)
if data_agent_response.status_code == 200:
print("Accessible Data Agents Listed successfully!")
print(json.dumps(data_agent_response.json(), indent=2))
else:
print(f"Error Listing Accessible Data Agents: {data_agent_response.status_code}")
מחליפים את הערכים לדוגמה באופן הבא:
- YOUR-BILLING-PROJECT: מזהה הפרויקט לחיוב.
- YOUR-CREATOR-FILTER: המסנן להחלה על סמך יוצר סוכן הנתונים. הערכים האפשריים כוללים
NONE(ברירת מחדל),CREATOR_ONLYו-NOT_CREATOR_ONLY.
עדכון סוכן נתונים
בדוגמת הקוד הבאה אפשר לראות איך מעדכנים סוכן נתונים באופן סינכרוני או אסינכרוני על ידי שליחת בקשת HTTP PATCH לכתובת ה-URL של משאב סוכן הנתונים. מטען הייעודי (payload) של הבקשה כולל את הערכים החדשים של השדות שרוצים לשנות, והפרמטרים של הבקשה כוללים פרמטר updateMask שמציין את השדות שצריך לעדכן.
סינכרוני
data_agent_id = "data_agent_1"
billing_project = "YOUR-BILLING-PROJECT"
data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:updateSync"
payload = {
"description": "Updated description of the data agent.",
"data_analytics_agent": {
"published_context": {
"datasource_references": bigquery_data_sources,
"system_instruction": system_instruction
}
},
}
fields = ["description", "data_analytics_agent"]
params = {
"updateMask": ",".join(fields)
}
data_agent_response = requests.patch(
data_agent_url, headers=headers, params=params, json=payload
)
if data_agent_response.status_code == 200:
print("Data Agent updated successfully!")
print(json.dumps(data_agent_response.json(), indent=2))
else:
print(f"Error Updating Data Agent: {data_agent_response.status_code}")
print(data_agent_response.text)
אסינכרוני
data_agent_id = "data_agent_1"
billing_project = "YOUR-BILLING-PROJECT"
data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
payload = {
"description": "Updated description of the data agent.",
"data_analytics_agent": {
"published_context": {
"datasource_references": bigquery_data_sources,
"system_instruction": system_instruction
}
},
}
fields = ["description", "data_analytics_agent"]
params = {
"updateMask": ",".join(fields)
}
data_agent_response = requests.patch(
data_agent_url, headers=headers, params=params, json=payload
)
if data_agent_response.status_code == 200:
print("Data Agent updated successfully!")
print(json.dumps(data_agent_response.json(), indent=2))
else:
print(f"Error Updating Data Agent: {data_agent_response.status_code}")
print(data_agent_response.text)
מחליפים את הערכים לדוגמה באופן הבא:
- data_agent_1: המזהה של סוכן הנתונים שרוצים לעדכן.
- YOUR-BILLING-PROJECT: מזהה הפרויקט לחיוב.
- Updated description of the data agent.: התיאור החדש של סוכן הנתונים.
קבלת מדיניות IAM עבור סוכן נתונים
בדוגמת הקוד הבאה אפשר לראות איך מאחזרים את מדיניות IAM של סוכן נתונים על ידי שליחת בקשת HTTP POST לכתובת ה-URL של סוכן הנתונים. המטען הייעודי (payload) של הבקשה כולל את הנתיב של סוכן הנתונים.
billing_project = "YOUR-BILLING-PROJECT"
data_agent_id = "data_agent_1"
data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:getIamPolicy"
# Request body
payload = {
"resource": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
}
data_agent_response = requests.post(
data_agent_url, headers=headers, json=payload
)
if data_agent_response.status_code == 200:
print("IAM Policy fetched successfully!")
print(json.dumps(data_agent_response.json(), indent=2))
else:
print(f"Error fetching IAM policy: {data_agent_response.status_code}")
print(data_agent_response.text)
מחליפים את הערכים לדוגמה באופן הבא:
- YOUR-BILLING-PROJECT: מזהה הפרויקט לחיוב.
- data_agent_1: המזהה של סוכן הנתונים שעבורו רוצים לקבל את מדיניות ה-IAM.
הגדרת מדיניות IAM לסוכן נתונים
כדי לשתף סוכן, אפשר להשתמש בשיטה setIamPolicy כדי להקצות תפקידי IAM למשתמשים בסוכן ספציפי. בדוגמת הקוד הבאה אפשר לראות איך לבצע קריאה מסוג POST לכתובת ה-URL של סוכן הנתונים עם מטען ייעודי (payload) שכולל קישורים. הקישור מציין אילו תפקידים צריך להקצות לאילו משתמשים.
billing_project = "YOUR-BILLING-PROJECT"
data_agent_id = "data_agent_1"
role = "roles/geminidataanalytics.dataAgentEditor"
users = "222larabrown@gmail.com, cloudysanfrancisco@gmail.com"
data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:setIamPolicy"
# Request body
payload = {
"policy": {
"bindings": [
{
"role": role,
"members": [
f"user:{i.strip()}" for i in users.split(",")
]
}
]
}
}
data_agent_response = requests.post(
data_agent_url, headers=headers, json=payload
)
if data_agent_response.status_code == 200:
print("IAM Policy set successfully!")
print(json.dumps(data_agent_response.json(), indent=2))
else:
print(f"Error setting IAM policy: {data_agent_response.status_code}")
print(data_agent_response.text)
מחליפים את הערכים לדוגמה באופן הבא:
- YOUR-BILLING-PROJECT: מזהה הפרויקט לחיוב.
- data_agent_1: המזהה של סוכן הנתונים שעבורו רוצים להגדיר את מדיניות ה-IAM.
- 222larabrown@gmail.com, cloudysanfrancisco@gmail.com: רשימה מופרדת בפסיקים של כתובות אימייל של משתמשים שרוצים להקצות להם את התפקיד שצוין.
מחיקה של סוכן נתונים
בדוגמת קוד לדוגמה הבאה אפשר לראות איך לבצע מחיקה עם יכולת שחזור של סוכן נתונים באופן סינכרוני או אסינכרוני על ידי שליחת בקשת HTTP DELETE לכתובת ה-URL של משאב סוכן הנתונים. מחיקה רכה פירושה שהסוכן נמחק אבל עדיין אפשר לשחזר אותו תוך 30 יום.
סינכרוני
billing_project = "YOUR-BILLING-PROJECT"
data_agent_id = "data_agent_1"
data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:deleteSync"
data_agent_response = requests.delete(
data_agent_url, headers=headers
)
if data_agent_response.status_code == 200:
print("Data Agent deleted successfully!")
print(json.dumps(data_agent_response.json(), indent=2))
else:
print(f"Error Deleting Data Agent: {data_agent_response.status_code}")
print(data_agent_response.text)
אסינכרוני
billing_project = "YOUR-BILLING-PROJECT"
data_agent_id = "data_agent_1"
data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
data_agent_response = requests.delete(
data_agent_url, headers=headers
)
if data_agent_response.status_code == 200:
print("Data Agent deleted successfully!")
print(json.dumps(data_agent_response.json(), indent=2))
else:
print(f"Error Deleting Data Agent: {data_agent_response.status_code}")
print(data_agent_response.text)
מחליפים את הערכים לדוגמה באופן הבא:
- YOUR-BILLING-PROJECT: מזהה הפרויקט לחיוב.
- data_agent_1: המזהה של סוכן הנתונים שרוצים למחוק.
קבלת שיחה
קוד לדוגמה הבא מדגים איך לאחזר שיחה קיימת על ידי שליחת בקשת HTTP GET לכתובת ה-URL של משאב השיחה.
billing_project = "YOUR-BILLING-PROJECT"
conversation_id = "conversation_1"
conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations/{conversation_id}"
conversation_response = requests.get(conversation_url, headers=headers)
# Handle the response
if conversation_response.status_code == 200:
print("Conversation fetched successfully!")
print(json.dumps(conversation_response.json(), indent=2))
else:
print(f"Error while fetching conversation: {conversation_response.status_code}")
print(conversation_response.text)
מחליפים את הערכים לדוגמה באופן הבא:
- YOUR-BILLING-PROJECT: מזהה הפרויקט לחיוב.
- conversation_1: המזהה של השיחה שרוצים לאחזר.
רשימת השיחות
בדוגמת הקוד הבאה אפשר לראות איך לשלוח בקשת HTTP GET לנקודת הקצה conversations כדי להציג רשימה של שיחות בפרויקט נתון.
כברירת מחדל, השיטה הזו מחזירה את השיחות שיצרתם. אדמינים (משתמשים עם תפקיד IAM cloudaicompanion.topicAdmin) יכולים לראות את כל השיחות בפרויקט.
billing_project = "YOUR-BILLING-PROJECT"
conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations"
conversation_response = requests.get(conversation_url, headers=headers)
# Handle the response
if conversation_response.status_code == 200:
print("Conversation fetched successfully!")
print(json.dumps(conversation_response.json(), indent=2))
else:
print(f"Error while fetching conversation: {conversation_response.status_code}")
print(conversation_response.text)
מחליפים את YOUR-BILLING-PROJECT במזהה של פרויקט החיוב שבו הפעלתם את ממשקי ה-API הנדרשים.
הצגת רשימת ההודעות בשיחה
בדוגמת הקוד הבאה מוצג איך לקבל רשימה של כל ההודעות בשיחה על ידי שליחת בקשת HTTP GET לנקודת הקצה messages של השיחה.
כדי להציג את רשימת ההודעות, צריכה להיות לכם הרשאה מסוג cloudaicompanion.topics.get בשיחה.
billing_project = "YOUR-BILLING-PROJECT"
conversation_id = "conversation_1"
conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations/{conversation_id}/messages"
conversation_response = requests.get(conversation_url, headers=headers)
# Handle the response
if conversation_response.status_code == 200:
print("Conversation fetched successfully!")
print(json.dumps(conversation_response.json(), indent=2))
else:
print(f"Error while fetching conversation: {conversation_response.status_code}")
print(conversation_response.text)
מחליפים את הערכים לדוגמה באופן הבא:
- YOUR-BILLING-PROJECT: מזהה הפרויקט לחיוב.
- conversation_1: המזהה של השיחה שרוצים להציג את ההודעות שלה.
איך מוחקים שיחות
קוד לדוגמה הבא מדגים איך למחוק שיחה על ידי שליחת בקשת HTTP DELETE לכתובת ה-URL של משאב השיחה. אדמינים (משתמשים עם התפקיד cloudaicompanion.topicAdmin בניהול הזהויות והרשאות הגישה) או משתמשים עם cloudaicompanion.topics.delete ההרשאה בניהול הזהויות והרשאות הגישה יכולים למחוק שיחות בפרויקט.
billing_project = "YOUR-BILLING-PROJECT"
conversation_id = "conversation_1"
conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations/{conversation_id}"
conversation_response = requests.delete(conversation_url, headers=headers)
# Handle the response
if conversation_response.status_code == 200:
print("Conversation deleted successfully!")
print(json.dumps(conversation_response.json(), indent=2))
else:
print(f"Error while deleting conversation: {conversation_response.status_code}")
print(conversation_response.text)
מחליפים את הערכים לדוגמה באופן הבא:
- YOUR-BILLING-PROJECT: מזהה הפרויקט לחיוב.
- conversation_1: המזהה של השיחה שרוצים למחוק.
איך לשאול שאלות באמצעות ה-API
אחרי שיוצרים סוכן נתונים ושיחה (בצ'אט עם שמירת מצב), אפשר לשלוח שאילתות לסוכן.
כששולחים שאילתה, ה-API מחזיר זרם של אובייקטים מסוג Message. הזרם הזה יכול להכיל סוגים שונים של הודעות, כולל טקסט, טבלאות נתונים ותרשימים. הודעות טקסט יכולות לספק תובנות לגבי ההיגיון של הסוכן, לדווח על ההתקדמות שלו או לתת את התשובה הסופית. המטרה של כל הודעת טקסט מצוינת על ידי הערך TextType שלה:
-
THOUGHT: מציג את תהליך החשיבה הפנימי של הסוכן בזמן שהוא מתכנן איך לענות על השאילתה. הודעותTHOUGHTמספקות תובנות שלב אחר שלב לגבי תהליך החשיבה הרציונלית וקבלת ההחלטות של ה-Agent, והן כוללות שני חלקים:parts[0]הוא סיכום החשיבה, שכולל סיכום קצר של טקסט החשיבה המלא, וparts[1]הוא טקסט החשיבה המלא. -
PROGRESS: דיווח על התקדמות הסוכן בפעולה, כמו אחזור נתונים או הפעלה של כלי. הערך הזה מוחזר רק למקורות נתונים של Looker, והוא מכיל שני חלקים:parts[0]הוא הסיכום, ו-parts[1]הוא הטקסט המלא של ההתקדמות. -
FINAL_RESPONSE: מספקת את התשובה הסופית לשאילתה.
בדוגמאות שבקטע הבא נעשה שימוש בפונקציות העזר שמוגדרות במאמר פונקציות עזר של Python להזרמת תשובות בצ'אט כדי לעבד ולהציג כל הודעה בתגובה המוזרמת של ה-API. הוראות להצגת ההודעות האלה בממשק משתמש כשמשתמשים במקורות נתונים של Looker מופיעות במאמר הצגת תשובות של סוכנים למקורות נתונים של Looker.
שליחת שאלות בצ'אט עם שמירת מצב ובצ'אט ללא שמירת מצב
אפשר ליצור אינטראקציה עם ה-API במצבים העיקריים הבאים:
- צ'אט עם שמירת מצב: Google Cloud שומר ומנהל את היסטוריית השיחות. צ'אט עם שמירת מצב הוא מטבעו רב-שלבי, כי ה-API שומר את ההקשר מההודעות הקודמות. צריך לשלוח רק את ההודעה הנוכחית בכל תור.
- צ'אט ללא שמירת מצב: האפליקציה שלכם מנהלת את היסטוריית השיחות. צריך לכלול את ההודעות הקודמות הרלוונטיות בכל הודעה חדשה. דוגמאות מפורטות לאופן ניהול שיחות מרובות תורות במצב בלי שמירת מצב זמינות במאמר יצירת שיחה מרובת תורות בלי שמירת מצב.
בדוגמאות הבאות מוצגות בקשות POST לנקודת הקצה :chat, שמדגימות איך להשתמש ב-API לצ'אט עם שמירת מצב ולצ'אט בלי שמירת מצב.
צ'אט עם שמירת מצב
שליחת בקשה לצ'אט עם שמירת מצב עם הפניה לשיחה
בדוגמת הקוד הבאה אפשר לראות איך שואלים שאלות ב-API באמצעות השיחה שהגדרתם בשלבים הקודמים. בדוגמה הזו נעשה שימוש בפונקציית העזר get_stream כדי להזרים את התשובה.
chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"
data_agent_id = "data_agent_1"
conversation_id = "conversation_1"
# Construct the payload
chat_payload = {
"parent": f"projects/{billing_project}/locations/global",
"messages": [
{
"userMessage": {
"text": "Make a bar graph for the top 5 states by the total number of airports"
}
}
],
"conversation_reference": {
"conversation": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}",
"data_agent_context": {
"data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
# "credentials": looker_credentials
}
}
}
# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)
מחליפים את הערכים לדוגמה באופן הבא:
- data_agent_1: המזהה של סוכן הנתונים, כפי שמוגדר בבלוק הקוד לדוגמה במאמר יצירת סוכן נתונים.
- conversation_1: מזהה ייחודי של השיחה.
- ההנחיה לדוגמה הייתה
Make a bar graph for the top 5 states by the total number of airports.
צ'אט ללא שמירת מצב
בדוגמאות הבאות מוצגות דרכים לשליחת בקשות בלי שמירת מצב באמצעות הפניה לסוכן נתונים או באמצעות הקשר מוטבע.
שליחת בקשת צ'אט בלי שמירת מצב עם הפניה לסוכן נתונים
בדוגמת הקוד הבאה אפשר לראות איך שואלים את ה-API שאלה בלי שמירת מצב באמצעות סוכן הנתונים שהגדרתם בשלבים הקודמים. בדוגמה הזו נעשה שימוש בפונקציית העזר get_stream כדי להזרים את התשובה.
chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"
data_agent_id = "data_agent_1"
# Construct the payload
chat_payload = {
"parent": f"projects/{billing_project}/locations/global",
"messages": [
{
"userMessage": {
"text": "Make a bar graph for the top 5 states by the total number of airports"
}
}
],
"data_agent_context": {
"data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
# "credentials": looker_credentials
}
}
# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)
מחליפים את הערכים לדוגמה באופן הבא:
- data_agent_1: המזהה של סוכן הנתונים, כפי שמוגדר בבלוק הקוד לדוגמה במאמר יצירת סוכן נתונים.
- ההנחיה לדוגמה הייתה
Make a bar graph for the top 5 states by the total number of airports.
שליחת בקשה לצ'אט בלי שמירת מצב עם הקשר מוטבע
בדוגמת הקוד הבאה אפשר לראות איך שואלים את ה-API שאלה בלי שמירת מצב באמצעות הקשר בתוך השורה. בדוגמה הזו נעשה שימוש בפונקציית העזר get_stream כדי להזרים את התגובה, ומקור נתונים של BigQuery משמש כדוגמה.
אפשר גם להפעיל ניתוח מתקדם באמצעות Python על ידי הוספת הפרמטר options למטען הייעודי (payload) של הבקשה. מידע נוסף על הפרמטר options ועל האפשרויות שאפשר להגדיר לשיחה זמין בדף REST Resource: projects.locations.dataAgents.
chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"
# Construct the payload
chat_payload = {
"parent": f"projects/{billing_project}/locations/global",
"messages": [
{
"userMessage": {
"text": "Make a bar graph for the top 5 states by the total number of airports"
}
}
],
"inline_context": {
"datasource_references": bigquery_data_sources,
# Optional: To enable advanced analysis with Python, include the following options block:
"options": {
"analysis": {
"python": {
"enabled": True
}
}
}
}
}
# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)
יצירת שיחה מרובת תפניות ללא שמירת מצב
כדי לשאול שאלות להמשך בשיחה בלי שמירת מצב, האפליקציה צריכה לנהל את ההקשר של השיחה על ידי שליחת היסטוריית ההודעות המלאה עם כל בקשה חדשה. בקטעים הבאים מוסבר איך להגדיר פונקציות עזר ולקרוא להן כדי ליצור שיחה מרובת תפניות:
שליחת בקשות מרובות תגובות
פונקציית העזרה הבאה multi_turn_Conversation מנהלת את ההקשר של השיחה על ידי אחסון ההודעות ברשימה. כך תוכלו לשלוח שאלות המשך שמבוססות על תגובות קודמות. במטען הייעודי (payload) של הפונקציה, אפשר להפנות לסוכן נתונים או לספק את מקור הנתונים ישירות באמצעות הקשר המוטבע.
chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"
# List that is used to track previous turns and is reused across requests
conversation_messages = []
data_agent_id = "data_agent_1"
# Helper function for calling the API
def multi_turn_Conversation(msg):
userMessage = {
"userMessage": {
"text": msg
}
}
# Send a multi-turn request by including previous turns and the new message
conversation_messages.append(userMessage)
# Construct the payload
chat_payload = {
"parent": f"projects/{billing_project}/locations/global",
"messages": conversation_messages,
# Use a data agent reference
"data_agent_context": {
"data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
# "credentials": looker_credentials
},
# Use inline context
# "inline_context": {
# "datasource_references": bigquery_data_sources,
# }
}
# Call the get_stream_multi_turn helper function to stream the response
get_stream_multi_turn(chat_url, chat_payload, conversation_messages)
בדוגמה הקודמת, מחליפים את data_agent_1 במזהה של סוכן הנתונים, כפי שמוגדר בבלוק הקוד לדוגמה במאמר יצירת סוכן נתונים.
אפשר לקרוא לפונקציית העזר multi_turn_Conversation בכל תור של השיחה. קוד לדוגמה הבא מראה איך לשלוח בקשה ראשונית ואז בקשת המשך שמבוססת על התשובה הקודמת.
# Send first-turn request
multi_turn_Conversation("Which species of tree is most prevalent?")
# Send follow-up-turn request
multi_turn_Conversation("Can you show me the results as a bar chart?")
בדוגמה הקודמת, מחליפים את הערכים לדוגמה באופן הבא:
- Which species of tree is most prevalent?: שאלה בשפה טבעית לשליחה לסוכן הנתונים.
- Can you show me the results as a bar chart?: שאלה המשך שמבוססת על השאלה הקודמת או משפרת אותה.
עיבוד התשובות
הפונקציה הבאה get_stream_multi_turn מעבדת את התשובה של ה-API לסטרימינג. הפונקציה הזו דומה לפונקציית העזר get_stream, אבל היא שומרת את התגובה ברשימה conversation_messages כדי לשמור את הקשר השיחה לתור הבא.
def get_stream_multi_turn(url, json, conversation_messages):
s = requests.Session()
acc = ''
with s.post(url, json=json, headers=headers, stream=True) as resp:
for line in resp.iter_lines():
if not line:
continue
decoded_line = str(line, encoding='utf-8')
if decoded_line == '[{':
acc = '{'
elif decoded_line == '}]':
acc += '}'
elif decoded_line == ',':
continue
else:
acc += decoded_line
if not is_json(acc):
continue
data_json = json_lib.loads(acc)
# Store the response that will be used in the next iteration
conversation_messages.append(data_json)
if not 'systemMessage' in data_json:
if 'error' in data_json:
handle_error(data_json['error'])
continue
if 'text' in data_json['systemMessage']:
handle_text_response(data_json['systemMessage']['text'])
elif 'schema' in data_json['systemMessage']:
handle_schema_response(data_json['systemMessage']['schema'])
elif 'data' in data_json['systemMessage']:
handle_data_response(data_json['systemMessage']['data'])
elif 'chart' in data_json['systemMessage']:
handle_chart_response(data_json['systemMessage']['chart'])
else:
colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter())
print(colored_json)
print('\n')
acc = ''
דוגמת קוד מקצה לקצה
דוגמת הקוד הבאה ניתנת להרחבה ומכילה את כל המשימות שמוסברות במדריך הזה.
פיתוח סוכן נתונים באמצעות HTTP ו-Python
import json import json as json_lib import textwrap import altair as alt import google.auth from google.auth.transport.requests import Request import IPython from IPython.display import display, HTML import pandas as pd from pygments import formatters, highlight, lexers import requests from google.colab import auth auth.authenticate_user() access_token = !gcloud auth application-default print-access-token headers = { "Authorization": f"Bearer {access_token[0]}", "Content-Type": "application/json", "x-server-timeout": "300", # Custom timeout up to 600s } ################### Data source details ################### billing_project = "your_billing_project" location = "global" system_instruction = "Help the user in analyzing their data" # BigQuery data source bigquery_data_sources = { "bq": { "tableReferences": [ { "projectId": "bigquery-public-data", "datasetId": "san_francisco", "tableId": "street_trees" } ] } } # Looker data source looker_credentials = { "oauth": { "secret": { "client_id": "your_looker_client_id", "client_secret": "your_looker_client_secret", } } } # To use access_token for authentication, uncomment the following looker_credentials code block and comment out the previous looker_credentials code block. # looker_credentials = { # "oauth": { # "token": { # "access_token": "your_looker_access_token", # } # } # } looker_data_source = { "looker": { "explore_references": [ { "looker_instance_uri": looker_instance_uri, "lookml_model": "your_model", "explore": "your_explore", }, { "looker_instance_uri": looker_instance_uri, "lookml_model": "your_model_2", "explore": "your_explore_2", }, # Add up to 5 total Explore references ], # Do not include the following line during agent creation # "credentials": looker_credentials } # Looker Studio data source looker_studio_data_source = { "studio":{ "studio_references": [ { "studio_datasource_id": "studio_datasource_id" } ] } } ################### Create data agent ################### data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents" data_agent_id = "data_agent_1" data_agent_payload = { "name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional "description": "This is the description of data_agent.", # Optional "data_analytics_agent": { "published_context": { "datasource_references": bigquery_data_sources, "system_instruction": system_instruction, # Optional: To enable advanced analysis with Python, include the following options block: "options": { "analysis": { "python": { "enabled": True } } } } } } params = {"data_agent_id": data_agent_id} # Optional data_agent_response = requests.post( data_agent_url, params=params, json=data_agent_payload, headers=headers ) if data_agent_response.status_code == 200: print("Data Agent created successfully!") print(json.dumps(data_agent_response.json(), indent=2)) else: print(f"Error creating Data Agent: {data_agent_response.status_code}") print(data_agent_response.text) ################### Create conversation ################### conversation_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/conversations" data_agent_id = "data_agent_1" conversation_id = "conversation _1" conversation_payload = { "agents": [ f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}" ], "name": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}" } params = { "conversation_id": conversation_id } conversation_response = requests.post(conversation_url, headers=headers, params=params, json=conversation_payload) if conversation_response.status_code == 200: print("Conversation created successfully!") print(json.dumps(conversation_response.json(), indent=2)) else: print(f"Error creating Conversation: {conversation_response.status_code}") print(conversation_response.text) ################### Chat with the API by using conversation (stateful) #################### chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat" data_agent_id = "data_agent_1" conversation_id = "conversation _1" # Construct the payload chat_payload = { "parent": f"projects/{billing_project}/locations/global", "messages": [ { "userMessage": { "text": "Make a bar graph for the top 5 states by the total number of airports" } } ], "conversation_reference": { "conversation": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}", "data_agent_context": { "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # "credentials": looker_credentials } } } # Call the get_stream function to stream the response get_stream(chat_url, chat_payload) ################### Chat with the API by using dataAgents (stateless) #################### chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat" data_agent_id = "data_agent_1" # Construct the payload chat_payload = { "parent": f"projects/{billing_project}/locations/global", "messages": [ { "userMessage": { "text": "Make a bar graph for the top 5 states by the total number of airports" } } ], "data_agent_context": { "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # "credentials": looker_credentials } } # Call the get_stream function to stream the response get_stream(chat_url, chat_payload) ################### Chat with the API by using inline context (stateless) #################### chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat" # Construct the payload chat_payload = { "parent": f"projects/{billing_project}/locations/global", "messages": [ { "userMessage": { "text": "Make a bar graph for the top 5 states by the total number of airports" } } ], "inline_context": { "datasource_references": bigquery_data_sources, # Optional - if wanting to use advanced analysis with python "options": { "analysis": { "python": { "enabled": True } } } } } # Call the get_stream function to stream the response get_stream(chat_url, chat_payload) ################### Multi-turn conversation ################### chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat" # List that is used to track previous turns and is reused across requests conversation_messages = [] data_agent_id = "data_agent_1" # Helper function for calling the API def multi_turn_Conversation(msg): userMessage = { "userMessage": { "text": msg } } # Send a multi-turn request by including previous turns and the new message conversation_messages.append(userMessage) # Construct the payload chat_payload = { "parent": f"projects/{billing_project}/locations/global", "messages": conversation_messages, # Use a data agent reference "data_agent_context": { "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # "credentials": looker_credentials }, # Use inline context # "inline_context": { # "datasource_references": bigquery_data_sources, # } } # Call the get_stream_multi_turn helper function to stream the response get_stream_multi_turn(chat_url, chat_payload, conversation_messages) # Send first-turn request multi_turn_Conversation("Which species of tree is most prevalent?") # Send follow-up-turn request multi_turn_Conversation("Can you show me the results as a bar chart?")
דוגמת הקוד הבאה ניתנת להרחבה ומכילה את פונקציות העזר של Python שמשמשות לסטרימינג של תשובות בצ'אט.
פונקציות עזר של Python להזרמת תשובות בצ'אט
def is_json(str): try: json_object = json_lib.loads(str) except ValueError as e: return False return True def handle_text_response(resp): parts = resp['parts'] full_text = "".join(parts) if "\n" not in full_text and len(full_text) > 80: wrapped_text = textwrap.fill(full_text, width=80) print(wrapped_text) else: print(full_text) def get_property(data, field_name, default = ''): return data[field_name] if field_name in data else default def display_schema(data): fields = data['fields'] df = pd.DataFrame({ "Column": map(lambda field: get_property(field, 'name'), fields), "Type": map(lambda field: get_property(field, 'type'), fields), "Description": map(lambda field: get_property(field, 'description', '-'), fields), "Mode": map(lambda field: get_property(field, 'mode'), fields) }) display(df) def display_section_title(text): display(HTML('<h2>{}</h2>'.format(text))) def format_bq_table_ref(table_ref): return '{}.{}.{}'.format(table_ref['projectId'], table_ref['datasetId'], table_ref['tableId']) def format_looker_table_ref(table_ref): return 'lookmlModel: {}, explore: {}, lookerInstanceUri: {}'.format(table_ref['lookmlModel'], table_ref['explore'], table_ref['lookerInstanceUri']) def display_datasource(datasource): source_name = '' if 'studioDatasourceId' in datasource: source_name = datasource['studioDatasourceId'] elif 'lookerExploreReference' in datasource: source_name = format_looker_table_ref(datasource['lookerExploreReference']) else: source_name = format_bq_table_ref(datasource['bigqueryTableReference']) print(source_name) display_schema(datasource['schema']) def handle_schema_response(resp): if 'query' in resp: print(resp['query']['question']) elif 'result' in resp: display_section_title('Schema resolved') print('Data sources:') for datasource in resp['result']['datasources']: display_datasource(datasource) def handle_data_response(resp): if 'query' in resp: query = resp['query'] display_section_title('Retrieval query') print('Query name: {}'.format(query['name'])) if 'question' in query: print('Question: {}'.format(query['question'])) if 'datasources' in query: print('Data sources:') for datasource in query['datasources']: display_datasource(datasource) elif 'generatedSql' in resp: display_section_title('SQL generated') print(resp['generatedSql']) elif 'result' in resp: display_section_title('Data retrieved') fields = map(lambda field: get_property(field, 'name'), resp['result']['schema']['fields']) dict = {} for field in fields: dict[field] = map(lambda el: get_property(el, field), resp['result']['data']) display(pd.DataFrame(dict)) def handle_chart_response(resp): if 'query' in resp: print(resp['query']['instructions']) elif 'result' in resp: vegaConfig = resp['result']['vegaConfig'] alt.Chart.from_json(json_lib.dumps(vegaConfig)).display(); def handle_error(resp): display_section_title('Error') print('Code: {}'.format(resp['code'])) print('Message: {}'.format(resp['message'])) def get_stream(url, json): s = requests.Session() acc = '' with s.post(url, json=json, headers=headers, stream=True) as resp: for line in resp.iter_lines(): if not line: continue decoded_line = str(line, encoding='utf-8') if decoded_line == '[{': acc = '{' elif decoded_line == '}]': acc += '}' elif decoded_line == ',': continue else: acc += decoded_line if not is_json(acc): continue data_json = json_lib.loads(acc) if not 'systemMessage' in data_json: if 'error' in data_json: handle_error(data_json['error']) continue if 'text' in data_json['systemMessage']: handle_text_response(data_json['systemMessage']['text']) elif 'schema' in data_json['systemMessage']: handle_schema_response(data_json['systemMessage']['schema']) elif 'data' in data_json['systemMessage']: handle_data_response(data_json['systemMessage']['data']) elif 'chart' in data_json['systemMessage']: handle_chart_response(data_json['systemMessage']['chart']) else: colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter()) print(colored_json) print('\n') acc = '' def get_stream_multi_turn(url, json, conversation_messages): s = requests.Session() acc = '' with s.post(url, json=json, headers=headers, stream=True) as resp: for line in resp.iter_lines(): if not line: continue decoded_line = str(line, encoding='utf-8') if decoded_line == '[{': acc = '{' elif decoded_line == '}]': acc += '}' elif decoded_line == ',': continue else: acc += decoded_line if not is_json(acc): continue data_json = json_lib.loads(acc) # Store the response that will be used in the next iteration conversation_messages.append(data_json) if not 'systemMessage' in data_json: if 'error' in data_json: handle_error(data_json['error']) continue if 'text' in data_json['systemMessage']: handle_text_response(data_json['systemMessage']['text']) elif 'schema' in data_json['systemMessage']: handle_schema_response(data_json['systemMessage']['schema']) elif 'data' in data_json['systemMessage']: handle_data_response(data_json['systemMessage']['data']) elif 'chart' in data_json['systemMessage']: handle_chart_response(data_json['systemMessage']['chart']) else: colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter()) print(colored_json) print('\n') acc = ''