本頁面說明如何使用 Python 向 對話式數據分析 API (透過 geminidataanalytics.googleapis.com 存取) 發出 HTTP 要求。
本頁面的 Python 程式碼範例說明如何完成下列工作:
- 設定初始設定和驗證。
- 連線至 Looker、BigQuery、數據分析、AlloyDB for PostgreSQL、Cloud SQL 或 Spanner 資料來源。
- 建立資料代理。
- 建立對話。
- 管理資料代理程式和對話。
- 使用 API 提問。
- 建立無狀態的多輪對話。
本頁面結尾會提供完整的程式碼範例,以及用於串流 API 回應的輔助函式。
設定初始設定和驗證
下列 Python 程式碼範例會執行這些工作:
- 匯入必要的 Python 程式庫
- 使用 Google Cloud CLI 取得 HTTP 驗證的存取權杖
- 定義報帳專案、位置和系統指令的變數
import json
import json as json_lib
import textwrap
import altair as alt
import google.auth
from google.auth.transport.requests import Request
import IPython
from IPython.display import display, HTML
import pandas as pd
from pygments import highlight, lexers, formatters
import requests
from google.colab import auth
auth.authenticate_user()
access_token = !gcloud auth application-default print-access-token
headers = {
"Authorization": f"Bearer {access_token[0]}",
"Content-Type": "application/json",
"x-server-timeout": "300", # Custom timeout up to 600s
}
billing_project = 'YOUR-BILLING-PROJECT'
location = "global"
system_instruction = 'YOUR-SYSTEM-INSTRUCTIONS'
請依下列方式替換範例值:
- YOUR-BILLING-PROJECT:您啟用必要 API 的報帳專案 ID。
- YOUR-SYSTEM-INSTRUCTIONS:系統指令,可引導代理程式的行為,並根據您的資料需求進行自訂。舉例來說,您可以使用系統指令定義業務用語、控制回覆長度,或設定資料格式。建議您使用「撰寫有效的系統指令」中建議的 YAML 格式定義系統指令,提供詳細且結構化的指引。
向 Looker 進行驗證
如要連結至 Looker 資料來源,請先驗證 Looker 執行個體。
使用 API 金鑰
下列 Python 程式碼範例說明如何使用 API 金鑰,向 Looker 執行個體驗證代理程式。
looker_credentials = {
"oauth": {
"secret": {
"client_id": "YOUR-LOOKER-CLIENT-ID",
"client_secret": "YOUR-LOOKER-CLIENT-SECRET",
}
}
}
請依下列方式替換範例值:
- YOUR-LOOKER-CLIENT-ID:您產生的 Looker API 金鑰用戶端 ID。
- YOUR-LOOKER-CLIENT-SECRET:您產生的 Looker API 金鑰用戶端密鑰。
使用存取權杖
下列 Python 程式碼範例說明如何使用存取權杖,向 Looker 執行個體驗證代理程式。
looker_credentials = {
"oauth": {
"token": {
"access_token": "YOUR-TOKEN",
}
}
}
請依下列方式替換範例值:
- YOUR-TOKEN:您產生的
access_token值,用於向 Looker 驗證。
向 AlloyDB for PostgreSQL 驗證
如要連線至 AlloyDB 資料來源,請使用 gcloud CLI 向 AlloyDB 執行個體進行驗證。
- 在 AlloyDB 執行個體上啟用
alloydb.iam_authentication標記,即可在 AlloyDB 上啟用 Identity and Access Management (IAM) 驗證。 - 授予 IAM 角色。呼叫 Conversational Analytics API 的主體 (使用者或服務帳戶) 必須在Google Cloud 專案中具備適當的 IAM 權限。通常包括
alloydb.databaseUser等角色。 - 建立對應的資料庫使用者。使用使用者或服務帳戶電子郵件地址做為使用者名稱,建立與 IAM 主體對應的 AlloyDB 使用者。
詳情請參閱「IAM 資料庫驗證」。
驗證 MySQL 適用的 Cloud SQL 和 PostgreSQL 適用的 Cloud SQL
如要連線至 MySQL 適用的 Cloud SQL 或 PostgreSQL 適用的 Cloud SQL 資料來源,您必須驗證 Cloud SQL 執行個體。
啟用 IAM 資料庫驗證。請確認 MySQL 適用的 Cloud SQL 執行個體中的
cloudsql.iam_authentication旗標已設為on。您可以在建立執行個體時新增這項設定,也可以修補現有執行個體時新增。gcloud sql instances patch INSTANCE_NAME --database-flags cloudsql.iam_authentication=on授予必要的 IAM 角色。
嘗試連線的主體 (使用者或服務帳戶) 必須具備專案的
cloudsql.instances.login和cloudsql.instances.connectIAM 權限。如要授予這些角色和權限,請採取下列任一做法:- 授予
roles/cloudsql.instanceUser角色。包括登入執行個體的權限 (cloudsql.instances.login),以及使用 Proxy 連線的權限 (cloudsql.instances.connect)。 - 或者,授予至少包含
cloudsql.instances.login和cloudsql.instances.connect的自訂角色。
- 授予
將 IAM 主體新增為資料庫使用者。在 Cloud SQL 執行個體中,建立對應至 IAM 主體電子郵件地址的使用者。
如果是服務帳戶,請執行下列指令:
gcloud sql users create SERVICE_ACCOUNT_EMAIL --instance=INSTANCE_NAME --type=cloud_iam_service_account如果是使用者帳戶,請執行下列指令:
gcloud sql users create USER_EMAIL --instance=INSTANCE_NAME --type=cloud_iam_user授予資料庫權限。透過 IAM 連線可驗證使用者身分,但不會自動授予資料庫權限。您需要使用標準 PostgreSQL 指令 (例如
GRANT),授予這個新資料庫使用者特定物件 (例如資料表和結構定義) 的權限。以超級使用者 (例如預設的 postgres 使用者) 身分連線,然後執行下列指令:
-- Example: Connect using psql as the 'postgres' user GRANT SELECT ON ALL TABLES IN SCHEMA public TO "user-or-service-account@example.com";使用 IAM 驗證機制連線。
用戶端通常會透過 Cloud SQL 驗證 Proxy 或連接器程式庫連線,這類工具會處理權杖交換作業。資料庫使用者名稱是 IAM 主體的電子郵件地址。系統不會使用密碼。改用 OAuth 2.0 權杖,通常由 Cloud SQL Auth Proxy 或 Google Cloud CLI 自動處理。
下列範例使用 psql 和 gcloud CLI 產生權杖:
PGPASSWORD=$(gcloud sql generate-login-token) psql --host=HOST_IP \ --username=IAM_PRINCIPAL_EMAIL --dbname=DATABASE_NAME\詳情請參閱「IAM 驗證」。
請依下列方式替換範例值:
- :要修補的 Cloud SQL 執行個體 ID,以啟用 IAM 資料庫驗證。
INSTANCE_NAME SERVICE_ACCOUNT_EMAIL:服務帳戶的電子郵件地址。USER_EMAIL:使用者帳戶的電子郵件地址。HOST_IP:Cloud SQL 執行個體的 IP 位址。IAM_PRINCIPAL_EMAIL:IAM 主體的電子郵件地址,例如服務帳戶或使用者。DATABASE_NAME:要連線的資料庫名稱。
- :要修補的 Cloud SQL 執行個體 ID,以啟用 IAM 資料庫驗證。
向 Spanner 進行驗證
如果您打算連線至 Spanner 資料來源,請先驗證 Spanner 執行個體。
- 授予 IAM 角色。呼叫 Conversational Analytics API 的主體 (使用者或服務帳戶) 必須在Google Cloud 專案中具備適當的 IAM 權限。通常包括
databaseUser等角色。 - 使用資料定義語言 (DDL) 在 Spanner 中建立對應的抽象資料庫角色。
- 使用 IAM 政策,允許 IAM 主體擔任這些資料庫角色。
詳情請參閱「精細的存取控管權限」。
連結至資料來源
以下各節說明如何定義代理程式資料來源的連線詳細資料。虛擬服務專員可透過下列方式連結至資料:
連結至 Looker 資料
下列程式碼範例會定義與 Looker 探索的連線。如要與 Looker 執行個體建立連線,請確認您已產生 Looker API 金鑰,詳情請參閱「使用對話式數據分析 API 驗證及連線至資料來源」。您一次最多可透過 Conversational Analytics API 連結五個 Looker 探索。
連結至 Looker 資料來源時,請注意下列事項:
- 您可以在對話中查詢任何包含的探索。
- 代理程式一次只能查詢一個 Explore。您無法同時查詢多個探索。
- 在同一場對話中,代理程式可以查詢多個探索。
在包含多個問題的對話中,或在包含後續問題的對話中,代理程式可以查詢多個探索。
舉例來說,使用者連結了兩個探索,分別是
cat-explore和dog-explore。使用者輸入「貓的數量比較多還是狗的數量比較多?」這個問題,這會建立兩項查詢:一項用於計算「cat-explore」中的貓隻數量,另一項則用於計算「dog-explore」中的狗隻數量。代理程式完成這兩項查詢後,會比較兩項查詢的結果。
下列程式碼範例會定義與多個 Looker 探索的連線。如要提升代理程式效能,您可以選擇提供黃金查詢,做為探索的結構化背景資訊。詳情請參閱「為 Looker 資料來源定義資料代理程式環境」。
looker_instance_uri = "https://your_company.looker.com"
looker_data_source = {
"looker": {
"explore_references": [
{
"looker_instance_uri": looker_instance_uri,
"lookml_model": "your_model",
"explore": "your_explore",
},
{
"looker_instance_uri": looker_instance_uri,
"lookml_model": "your_model_2",
"explore": "your_explore_2",
},
# Add up to 5 total Explore references
],
# Do not include the following line during agent creation
"credentials": looker_credentials
}
}
請依下列方式替換範例值:
- https://your_company.looker.com:Looker 執行個體的完整網址。
- your_model:包含要連結的「探索」的 LookML 模型名稱。
- your_explore:您希望資料代理程式查詢的 Looker 探索名稱。
- your_model_2:第二個 LookML 模型的名稱,其中包含要連結的「探索」。您最多可以為五個探索重複使用這個變數,以新增其他模型。
- your_explore_2:您希望資料代理程式查詢的其他 Looker 探索名稱。您可以重複使用這個變數,最多加入五個探索。
連結至 BigQuery 資料
使用對話式數據分析 API 時,可連結的 BigQuery 資料表數量沒有硬性限制。不過,連結大量資料表可能會降低準確率,或導致您超出模型的輸入權杖限制。
下列程式碼範例定義了與多個 BigQuery 資料表的連線,並包含選用結構化內容欄位的範例。如要提升代理程式效能,您可以選擇提供 BigQuery 資料表的結構化情境,例如資料表和資料欄說明、同義字、標記和查詢範例。詳情請參閱「為 BigQuery 資料來源定義資料代理程式環境」。
bigquery_data_sources = {
"bq": {
"tableReferences": [
{
"projectId": "my_project_id",
"datasetId": "my_dataset_id",
"tableId": "my_table_id",
"schema": {
"description": "my_table_description",
"fields": [{
"name": "my_column_name",
"description": "my_column_description"
}]
}
},
{
"projectId": "my_project_id_2",
"datasetId": "my_dataset_id_2",
"tableId": "my_table_id_2"
},
{
"projectId": "my_project_id_3",
"datasetId": "my_dataset_id_3",
"tableId": "my_table_id_3"
},
]
}
}
請依下列方式替換範例值:
- my_project_id:包含要連結的 BigQuery 資料集和資料表的 Google Cloud 專案 ID。如要連線至公開資料集,請指定
bigquery-public-data。 - my_dataset_id:BigQuery 資料集的 ID。
- my_table_id:BigQuery 資料表的 ID。
- my_table_description:資料表內容和用途的選用說明。
- my_column_name:表格中資料欄的名稱,您可為該資料欄提供選填說明。
- my_column_description:資料欄內容和用途的選填說明。
連結至數據分析資料
下列程式碼範例定義了與數據分析資料來源的連線。
looker_studio_data_source = {
"studio":{
"studio_references": [
{
"studio_datasource_id": "studio_datasource_id"
}
]
}
}
將 studio_datasource_id 替換為資料來源 ID。
連線至 AlloyDB 資料
對話式數據分析 API 會使用 alloydb 欄位中的 database_reference 欄位,連線至 AlloyDB 叢集。
以下範例定義與 AlloyDB 資料庫的連線。
alloydb_data_source = {
"alloydb": {
"database_reference":
{
"project_id":"PROJECT_ID",
"region":"REGION",
"cluster_id":"CLUSTER_ID",
"instance_id":"INSTANCE_ID",
"database_id":"DATABASE",
"table_ids":["TABLE_1_ID", "TABLE_2_ID"]
}
# Optional: Include this if you have pre-authored context for the agent
# "agent_context_reference": {
# "context_set_id": f"projects/billing_project/locations/location/contextSets/your_context_set_id"
# }
}
}
請依下列方式替換範例值:
PROJECT_ID:包含 AlloyDB 叢集的 Google Cloud 專案 ID。REGION:AlloyDB 叢集所在的區域,例如us-central1。CLUSTER_ID:AlloyDB 叢集的 ID。INSTANCE_ID:AlloyDB 執行個體的 ID。DATABASE:目標資料庫的名稱,例如financial。TABLE_1_ID、TABLE_2_ID:選用。資料庫中的資料表清單,建議資料代理程式使用,例如"loan"、"client"、"disp"。billing_project:已啟用必要 API 的報帳專案 ID。location:情境集所在位置,例如 global。your_context_set_id:如果您為代理程式建立了進階情境,請提供情境集的 ID。
連線至 MySQL 適用的 Cloud SQL 和 PostgreSQL 適用的 Cloud SQL 資料
對話式數據分析 API 會使用 sql 欄位中的 database_reference 欄位,連線至 Cloud SQL 執行個體。
下列範例定義與 MySQL 適用的 Cloud SQL 或 PostgreSQL 適用的 Cloud SQL 資料庫的連線。
sql_data_source = {
"cloud_sql_reference": {
"database_reference":
{
"project_id":"PROJECT_ID",
"region":"REGION",
"cluster_id":"CLUSTER_ID",
"instance_id":"INSTANCE_ID",
"database_id":"DATABASE_ID",
"table_ids":["TABLE_1_ID", "TABLE_2_ID"]
}
# Optional: Include this if you have pre-authored context for the agent
# "agent_context_reference": {
# "context_set_id": f"projects/billing_project/locations/location/contextSets/your_context_set_id"
# }
}
}
請依下列方式替換範例值:
PROJECT_ID:包含 Cloud SQL 執行個體的 Google Cloud 專案 ID。REGION:Cloud SQL 執行個體所在的區域,例如us-central1。CLUSTER_ID:Cloud SQL 叢集的 ID。INSTANCE_ID:Cloud SQL 執行個體的 ID。DATABASE_ID:目標資料庫的名稱,例如financial。TABLE_1_ID、TABLE_2_ID:選用。資料庫中的資料表清單,建議資料代理程式使用,例如"loan"、"client"、"disp"。billing_project:已啟用必要 API 的報帳專案 ID。location:情境集所在位置,例如 global。your_context_set_id:如果您為代理程式建立了進階情境,請提供情境集的 ID。
連結至 Spanner 資料
對話式數據分析 API 會使用 spanner 欄位中的 database_reference 欄位,連線至 Spanner 執行個體。
下列範例定義與 Spanner 資料庫的連線。
spanner_data_sources = {
"spanner_reference": {
"database_reference": {
"project_id": "PROJECT_ID",
"region": location,
"engine": GOOGLE_SQL,
"instance_id": "INSTANCE_ID",
"database_id": "DATABASE",
"table_ids":["TABLE_1_ID", "TABLE_2_ID"]
},
# Optional: Include this if you have pre-authored context for the agent
# "agent_context_reference": {
# "context_set_id": f"projects/billing_project/locations/location/contextSets/your_context_set_id"
# }
}
}
請依下列方式替換範例值:
PROJECT_ID:包含 Spanner 執行個體的 Google Cloud 專案 ID。INSTANCE_ID:Spanner 執行個體的 ID。DATABASE:目標資料庫的名稱,例如financial。TABLE_1_ID、TABLE_2_ID:資料庫中的資料表清單,建議資料代理程式使用,例如"loan"、"client"、"disp"。billing_project:您已啟用必要 API 的報帳專案 ID。location:情境集所在位置,例如 global。your_context_set_id:如果您為代理程式建立進階情境,請提供情境集 ID。
建立資料代理
下列程式碼範例示範如何將 HTTP POST 要求傳送至資料代理程式建立端點,以同步或非同步方式建立資料代理程式。要求酬載包含下列詳細資料:
- 代理程式的完整資源名稱。這個值包含專案 ID、位置和代理程式的專屬 ID。
- 資料代理人的說明。
- 資料代理程式的環境,包括系統說明 (在「設定初始設定和驗證」中定義) 和代理程式使用的資料來源 (在「連結至資料來源」中定義)。
您可以選擇在建立資料代理程式時提供 kms_key,使用客戶管理的加密金鑰 (CMEK) 保護資料代理程式。CMEK 僅支援 Looker 資料來源。詳情請參閱「客戶自行管理的加密金鑰 (CMEK)」一文。
您也可以選擇在要求酬載中加入 options 參數,透過 Python 啟用進階分析。如要進一步瞭解 options 參數,以及可為對話設定的選項,請參閱「REST 資源:projects.locations.dataAgents」。
同步
data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents:createSync"
data_agent_id = "data_agent_1"
# Optional: If using CMEK, replace the empty strings with your KMS key details.
kms_project_id = "" # KMS_PROJECT_ID (Defaults to billing_project if empty)
key_ring_name = "" # KEY_RING_NAME
key_name = "" # KEY_NAME
data_agent_payload = {
"name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional
"description": "This is the description of data_agent_1.", # Optional
"data_analytics_agent": {
"published_context": {
"datasource_references": bigquery_data_sources,
"system_instruction": system_instruction,
# Optional: To enable advanced analysis with Python, include the following options block:
"options": {
"analysis": {
"python": {
"enabled": True
}
}
}
}
}
}
# If key details are provided, construct and add the kms_key field.
if key_ring_name and key_name:
if not kms_project_id:
kms_project_id = billing_project
data_agent_payload["kms_key"] = f"projects/{kms_project_id}/locations/{location}/keyRings/{key_ring_name}/cryptoKeys/{key_name}"
params = {"data_agent_id": data_agent_id} # Optional
data_agent_response = requests.post(
data_agent_url, params=params, json=data_agent_payload, headers=headers
)
if data_agent_response.status_code == 200:
print("Data Agent created successfully!")
print(json.dumps(data_agent_response.json(), indent=2))
else:
print(f"Error creating Data Agent: {data_agent_response.status_code}")
print(data_agent_response.text)
非同步
data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents"
data_agent_id = "data_agent_1"
# Optional: If using CMEK, replace the empty strings with your KMS key details.
kms_project_id = "" # KMS_PROJECT_ID (Defaults to billing_project if empty)
key_ring_name = "" # KEY_RING_NAME
key_name = "" # KEY_NAME
data_agent_payload = {
"name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional
"description": "This is the description of data_agent_1.", # Optional
"data_analytics_agent": {
"published_context": {
"datasource_references": bigquery_data_sources,
"system_instruction": system_instruction,
# Optional: To enable advanced analysis with Python, include the following options block:
"options": {
"analysis": {
"python": {
"enabled": True
}
}
}
}
}
}
# If key details are provided, construct and add the kms_key field.
if key_ring_name and key_name:
if not kms_project_id:
kms_project_id = billing_project
data_agent_payload["kms_key"] = f"projects/{kms_project_id}/locations/{location}/keyRings/{key_ring_name}/cryptoKeys/{key_name}"
params = {"data_agent_id": data_agent_id} # Optional
data_agent_response = requests.post(
data_agent_url, params=params, json=data_agent_payload, headers=headers
)
if data_agent_response.status_code == 200:
print("Data Agent created successfully!")
print(json.dumps(data_agent_response.json(), indent=2))
else:
print(f"Error creating Data Agent: {data_agent_response.status_code}")
print(data_agent_response.text)
請依下列方式替換範例值:
- data_agent_1:資料代理程式的專屬 ID。這個值會用於代理程式的資源名稱,以及
data_agent_id網址查詢參數。 - KMS_PROJECT_ID:如要使用 CMEK,請提供金鑰所在的專案 ID。如未提供,系統會預設為您的報帳專案。
- KEY_RING_NAME:如果使用 CMEK,請輸入 Cloud KMS 金鑰環的名稱。
- KEY_NAME:如果使用 CMEK,請輸入 Cloud KMS 金鑰的名稱。
- This is the description of data_agent_1.:資料代理程式的說明。
建立對話
下列程式碼範例示範如何建立與資料代理程式的對話。
如有需要,您可以在建立對話時提供 kms_key,以 CMEK 保護對話。CMEK 僅支援 Looker 資料來源。詳情請參閱「客戶自行管理的加密金鑰 (CMEK)」一文。
conversation_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/conversations"
data_agent_id = "data_agent_1"
conversation_id = "conversation_1"
# Optional: If using CMEK, replace the empty strings with your KMS key details.
kms_project_id = "" # KMS_PROJECT_ID (Defaults to billing_project if empty)
key_ring_name = "" # KEY_RING_NAME
key_name = "" # KEY_NAME
conversation_payload = {
"agents": [
f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
],
"name": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}",
}
# If key details are provided, construct and add the kms_key field.
if key_ring_name and key_name:
if not kms_project_id:
kms_project_id = billing_project
conversation_payload["kms_key"] = f"projects/{kms_project_id}/locations/{location}/keyRings/{key_ring_name}/cryptoKeys/{key_name}"
params = {
"conversation_id": conversation_id
}
conversation_response = requests.post(conversation_url, headers=headers, params=params, json=conversation_payload)
if conversation_response.status_code == 200:
print("Conversation created successfully!")
print(json.dumps(conversation_response.json(), indent=2))
else:
print(f"Error creating Conversation: {conversation_response.status_code}")
print(conversation_response.text)
請依下列方式替換範例值:
- data_agent_1:資料代理程式的 ID,如「建立資料代理程式」中的程式碼範例區塊所定義。
- conversation_1:對話的專屬 ID。
- KMS_PROJECT_ID:如要使用 CMEK,請提供金鑰所在的專案 ID。如未提供,系統會預設為您的報帳專案。
- KEY_RING_NAME:如果使用 CMEK,請輸入 Cloud KMS 金鑰環的名稱。
- KEY_NAME:如果使用 CMEK,請輸入 Cloud KMS 金鑰的名稱。
管理資料代理程式和對話
下列程式碼範例說明如何使用對話式數據分析 API 管理資料代理程式和對話。您可以執行下列工作:
取得資料代理
下列程式碼範例示範如何將 HTTP GET 要求傳送至資料代理程式資源網址,以擷取現有資料代理程式。
data_agent_id = "data_agent_1"
data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
data_agent_response = requests.get(
data_agent_url, headers=headers
)
if data_agent_response.status_code == 200:
print("Fetched Data Agent successfully!")
print(json.dumps(data_agent_response.json(), indent=2))
else:
print(f"Error: {data_agent_response.status_code}")
print(data_agent_response.text)
在先前的範例中,請將 data_agent_1 替換為要擷取的資料代理程式 ID。
列出資料代理
下列程式碼示範如何將 HTTP GET 要求傳送至 dataAgents 端點,列出指定專案的所有資料代理程式。
如要列出所有代理程式,您必須具備專案的 geminidataanalytics.dataAgents.list 權限。如要進一步瞭解哪些 IAM 角色包含這項權限,請參閱預先定義的角色清單。
billing_project = "YOUR-BILLING-PROJECT"
data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents"
data_agent_response = requests.get(
data_agent_url, headers=headers
)
if data_agent_response.status_code == 200:
print("Data Agents Listed successfully!")
print(json.dumps(data_agent_response.json(), indent=2))
else:
print(f"Error Listing Data Agents: {data_agent_response.status_code}")
將 YOUR-BILLING-PROJECT 替換為報帳專案的 ID。
列出可存取的資料代理
下列程式碼示範如何將 HTTP GET 要求傳送至 dataAgents:listAccessible 端點,列出指定專案的所有可存取資料代理程式。
billing_project = "YOUR-BILLING-PROJECT"
creator_filter = "YOUR-CREATOR-FILTER"
data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents:listAccessible"
params = {
"creator_filter": creator_filter
}
data_agent_response = requests.get(
data_agent_url, headers=headers, params=params
)
if data_agent_response.status_code == 200:
print("Accessible Data Agents Listed successfully!")
print(json.dumps(data_agent_response.json(), indent=2))
else:
print(f"Error Listing Accessible Data Agents: {data_agent_response.status_code}")
請依下列方式替換範例值:
- YOUR-BILLING-PROJECT:報帳專案的 ID。
- YOUR-CREATOR-FILTER:根據資料代理程式建立者套用的篩選條件。可能的值包括
NONE(預設值)、CREATOR_ONLY和NOT_CREATOR_ONLY。
更新資料代理程式
下列程式碼範例示範如何將 HTTP PATCH 要求傳送至資料代理程式資源網址,以同步或非同步方式更新資料代理程式。要求酬載包含要變更的欄位新值,要求參數則包含 updateMask 參數,用於指定要更新的欄位。
同步
data_agent_id = "data_agent_1"
billing_project = "YOUR-BILLING-PROJECT"
data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:updateSync"
payload = {
"description": "Updated description of the data agent.",
"data_analytics_agent": {
"published_context": {
"datasource_references": bigquery_data_sources,
"system_instruction": system_instruction
}
},
}
fields = ["description", "data_analytics_agent"]
params = {
"updateMask": ",".join(fields)
}
data_agent_response = requests.patch(
data_agent_url, headers=headers, params=params, json=payload
)
if data_agent_response.status_code == 200:
print("Data Agent updated successfully!")
print(json.dumps(data_agent_response.json(), indent=2))
else:
print(f"Error Updating Data Agent: {data_agent_response.status_code}")
print(data_agent_response.text)
非同步
data_agent_id = "data_agent_1"
billing_project = "YOUR-BILLING-PROJECT"
data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
payload = {
"description": "Updated description of the data agent.",
"data_analytics_agent": {
"published_context": {
"datasource_references": bigquery_data_sources,
"system_instruction": system_instruction
}
},
}
fields = ["description", "data_analytics_agent"]
params = {
"updateMask": ",".join(fields)
}
data_agent_response = requests.patch(
data_agent_url, headers=headers, params=params, json=payload
)
if data_agent_response.status_code == 200:
print("Data Agent updated successfully!")
print(json.dumps(data_agent_response.json(), indent=2))
else:
print(f"Error Updating Data Agent: {data_agent_response.status_code}")
print(data_agent_response.text)
請依下列方式替換範例值:
- data_agent_1:要更新的資料代理程式 ID。
- YOUR-BILLING-PROJECT:報帳專案的 ID。
- Updated description of the data agent.:資料代理程式的新說明。
取得資料代理程式的身分與存取權管理政策
下列程式碼範例說明如何將 HTTP POST 要求傳送至資料代理程式 URL,以擷取資料代理程式的 IAM 政策。要求酬載包含資料代理程式路徑。
billing_project = "YOUR-BILLING-PROJECT"
data_agent_id = "data_agent_1"
data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:getIamPolicy"
# Request body
payload = {
"resource": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
}
data_agent_response = requests.post(
data_agent_url, headers=headers, json=payload
)
if data_agent_response.status_code == 200:
print("IAM Policy fetched successfully!")
print(json.dumps(data_agent_response.json(), indent=2))
else:
print(f"Error fetching IAM policy: {data_agent_response.status_code}")
print(data_agent_response.text)
請依下列方式替換範例值:
- YOUR-BILLING-PROJECT:報帳專案的 ID。
- data_agent_1:要取得 IAM 政策的資料代理程式 ID。
設定資料代理程式的 IAM 政策
如要共用代理程式,可以使用 setIamPolicy 方法,將 IAM 角色指派給特定代理程式的使用者。下列程式碼範例示範如何使用包含繫結的酬載,對資料代理程式 URL 進行 POST 呼叫。繫結會指定要將哪些角色指派給哪些使用者。
billing_project = "YOUR-BILLING-PROJECT"
data_agent_id = "data_agent_1"
role = "roles/geminidataanalytics.dataAgentEditor"
users = "222larabrown@gmail.com, cloudysanfrancisco@gmail.com"
data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:setIamPolicy"
# Request body
payload = {
"policy": {
"bindings": [
{
"role": role,
"members": [
f"user:{i.strip()}" for i in users.split(",")
]
}
]
}
}
data_agent_response = requests.post(
data_agent_url, headers=headers, json=payload
)
if data_agent_response.status_code == 200:
print("IAM Policy set successfully!")
print(json.dumps(data_agent_response.json(), indent=2))
else:
print(f"Error setting IAM policy: {data_agent_response.status_code}")
print(data_agent_response.text)
請依下列方式替換範例值:
- YOUR-BILLING-PROJECT:報帳專案的 ID。
- data_agent_1:要設定 IAM 政策的資料代理程式 ID。
- 222larabrown@gmail.com, cloudysanfrancisco@gmail.com:以半形逗號分隔的使用者電子郵件地址清單,您要將指定角色授予這些使用者。
刪除資料代理程式
下列程式碼範例示範如何將 HTTP DELETE 要求傳送至資料代理程式資源 URL,以同步或非同步方式虛刪除資料代理程式。軟刪除是指刪除代理程式,但仍可在 30 天內擷取。
同步
billing_project = "YOUR-BILLING-PROJECT"
data_agent_id = "data_agent_1"
data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:deleteSync"
data_agent_response = requests.delete(
data_agent_url, headers=headers
)
if data_agent_response.status_code == 200:
print("Data Agent deleted successfully!")
print(json.dumps(data_agent_response.json(), indent=2))
else:
print(f"Error Deleting Data Agent: {data_agent_response.status_code}")
print(data_agent_response.text)
非同步
billing_project = "YOUR-BILLING-PROJECT"
data_agent_id = "data_agent_1"
data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
data_agent_response = requests.delete(
data_agent_url, headers=headers
)
if data_agent_response.status_code == 200:
print("Data Agent deleted successfully!")
print(json.dumps(data_agent_response.json(), indent=2))
else:
print(f"Error Deleting Data Agent: {data_agent_response.status_code}")
print(data_agent_response.text)
請依下列方式替換範例值:
- YOUR-BILLING-PROJECT:報帳專案的 ID。
- data_agent_1:要刪除的資料代理程式 ID。
取得對話
下列程式碼範例示範如何將 HTTP GET 要求傳送至對話資源網址,以擷取現有對話。
billing_project = "YOUR-BILLING-PROJECT"
conversation_id = "conversation_1"
conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations/{conversation_id}"
conversation_response = requests.get(conversation_url, headers=headers)
# Handle the response
if conversation_response.status_code == 200:
print("Conversation fetched successfully!")
print(json.dumps(conversation_response.json(), indent=2))
else:
print(f"Error while fetching conversation: {conversation_response.status_code}")
print(conversation_response.text)
請依下列方式替換範例值:
- YOUR-BILLING-PROJECT:報帳專案的 ID。
- conversation_1:要擷取的對話 ID。
列出對話
下列程式碼範例示範如何將 HTTP GET 要求傳送至 conversations 端點,列出特定專案的對話。
根據預設,這個方法會傳回您建立的對話。管理員 (具備 cloudaicompanion.topicAdmin IAM 角色的使用者) 可以查看專案中的所有對話。
billing_project = "YOUR-BILLING-PROJECT"
conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations"
conversation_response = requests.get(conversation_url, headers=headers)
# Handle the response
if conversation_response.status_code == 200:
print("Conversation fetched successfully!")
print(json.dumps(conversation_response.json(), indent=2))
else:
print(f"Error while fetching conversation: {conversation_response.status_code}")
print(conversation_response.text)
將 YOUR-BILLING-PROJECT 替換為已啟用必要 API 的報帳專案 ID。
列出對話中的訊息
下列程式碼範例說明如何將 HTTP GET 要求傳送至對話的 messages 端點,列出對話中的所有訊息。
如要列出訊息,您必須具備對話的 cloudaicompanion.topics.get 權限。
billing_project = "YOUR-BILLING-PROJECT"
conversation_id = "conversation_1"
conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations/{conversation_id}/messages"
conversation_response = requests.get(conversation_url, headers=headers)
# Handle the response
if conversation_response.status_code == 200:
print("Conversation fetched successfully!")
print(json.dumps(conversation_response.json(), indent=2))
else:
print(f"Error while fetching conversation: {conversation_response.status_code}")
print(conversation_response.text)
請依下列方式替換範例值:
- YOUR-BILLING-PROJECT:報帳專案的 ID。
- conversation_1:要列出訊息的對話 ID。
刪除對話
下列程式碼範例示範如何將 HTTP DELETE 要求傳送至對話資源網址,藉此刪除對話。管理員 (具備 cloudaicompanion.topicAdmin 身分與存取權管理角色的使用者) 或具備 cloudaicompanion.topics.delete 身分與存取權管理權限的使用者,可以刪除專案中的對話。
billing_project = "YOUR-BILLING-PROJECT"
conversation_id = "conversation_1"
conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations/{conversation_id}"
conversation_response = requests.delete(conversation_url, headers=headers)
# Handle the response
if conversation_response.status_code == 200:
print("Conversation deleted successfully!")
print(json.dumps(conversation_response.json(), indent=2))
else:
print(f"Error while deleting conversation: {conversation_response.status_code}")
print(conversation_response.text)
請依下列方式替換範例值:
- YOUR-BILLING-PROJECT:報帳專案的 ID。
- conversation_1:要刪除的對話 ID。
使用 API 提問
建立資料代理程式後,如果是具狀態的即時通訊,請建立對話,然後將查詢傳送至代理程式。
傳送查詢時,API 會傳回 Message 物件的串流。這個串流可以包含不同類型的訊息,包括文字、資料表和圖表。簡訊可提供代理的推論過程、進度報告或最終答案。每則簡訊的用途會以 TextType 值表示:
THOUGHT:顯示代理的內部思考過程,瞭解代理如何規劃查詢回覆。THOUGHT訊息會逐步深入瞭解代理的推論和決策過程,並包含兩部分:parts[0]是想法摘要,簡要總結完整的想法文字,parts[1]則是完整的想法文字。PROGRESS:回報代理程式的動作進度,例如資料擷取或正在呼叫的工具。這個值只會針對 Looker 資料來源傳回,且包含兩部分:parts[0]是摘要,parts[1]則是完整進度文字。FINAL_RESPONSE:提供查詢的最終答案。
下一節的範例會使用「Helper Python functions to stream chat responses」中定義的輔助函式,處理及顯示 API 串流回應中的每則訊息。如要瞭解如何在使用 Looker 資料來源時,在使用者介面中顯示這些訊息,請參閱「顯示 Looker 資料來源的代理程式回覆」。
在有狀態和無狀態的對話中提問
您可以透過下列主要模式與 API 互動:
- 有狀態的即時通訊: Google Cloud 儲存及管理對話記錄。有狀態的即時通訊本質上是多輪對話,因為 API 會保留先前訊息的背景資訊。您只需要在每個回合傳送當下的訊息。
- 無狀態對話:應用程式會管理對話記錄。每則新訊息都必須附上相關的舊訊息。如需在無狀態模式下管理多輪對話的詳細範例,請參閱「建立無狀態多輪對話」。
下列範例示範如何透過向 :chat 端點發出 POST 要求,使用 API 進行有狀態和無狀態的對話。
有狀態的對話
傳送含有對話參照的具狀態即時通訊要求
下列程式碼範例說明如何使用您在先前步驟中定義的 conversation 向 API 提問。這個範例使用 get_stream 輔助函式串流回應。
chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"
data_agent_id = "data_agent_1"
conversation_id = "conversation_1"
# Construct the payload
chat_payload = {
"parent": f"projects/{billing_project}/locations/global",
"messages": [
{
"userMessage": {
"text": "Make a bar graph for the top 5 states by the total number of airports"
}
}
],
"conversation_reference": {
"conversation": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}",
"data_agent_context": {
"data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
# "credentials": looker_credentials
}
}
}
# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)
請依下列方式替換範例值:
- data_agent_1:資料代理程式的 ID,如「建立資料代理程式」中的程式碼範例區塊所定義。
- conversation_1:對話的專屬 ID。
- 我們以
Make a bar graph for the top 5 states by the total number of airports做為範例提示。
無狀態對話
下列範例說明如何使用資料代理程式參照或內嵌背景資訊,傳送無狀態要求。
傳送含有資料代理程式參照的無狀態即時通訊要求
下列程式碼範例示範如何使用先前步驟中定義的資料代理程式,向 API 提出無狀態問題。這個範例使用 get_stream 輔助函式串流回應。
chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"
data_agent_id = "data_agent_1"
# Construct the payload
chat_payload = {
"parent": f"projects/{billing_project}/locations/global",
"messages": [
{
"userMessage": {
"text": "Make a bar graph for the top 5 states by the total number of airports"
}
}
],
"data_agent_context": {
"data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
# "credentials": looker_credentials
}
}
# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)
請依下列方式替換範例值:
- data_agent_1:資料代理程式的 ID,如「建立資料代理程式」中的程式碼範例區塊所定義。
- 我們以
Make a bar graph for the top 5 states by the total number of airports做為範例提示。
傳送含有內嵌背景資訊的無狀態對話要求
下列程式碼範例示範如何使用內嵌環境,向 API 提出無狀態問題。這個範例會使用 get_stream 輔助函式串流回應,並以 BigQuery 資料來源為例。
您也可以選擇在要求酬載中加入 options 參數,透過 Python 啟用進階分析。如要進一步瞭解 options 參數,以及可為對話設定的選項,請參閱「REST 資源:projects.locations.dataAgents」頁面。
chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"
# Construct the payload
chat_payload = {
"parent": f"projects/{billing_project}/locations/global",
"messages": [
{
"userMessage": {
"text": "Make a bar graph for the top 5 states by the total number of airports"
}
}
],
"inline_context": {
"datasource_references": bigquery_data_sources,
# Optional: To enable advanced analysis with Python, include the following options block:
"options": {
"analysis": {
"python": {
"enabled": True
}
}
}
}
}
# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)
下列程式碼範例示範如何使用內嵌環境和 AlloyDB 資料來源,向 API 提出無狀態問題。
chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"
# The natural language question to ask the data agent
user_prompt = "what is the average loan amount in the US?" # Replace with your question
# Construct the payload
chat_payload = {
"parent": f"projects/{billing_project}/locations/global",
"messages": [
{
"userMessage": {
"text": user_prompt
}
}
],
"data_agent_context": {
"data_agent": f"projects/{billing_project}/locations/global/dataAgents/{data_agent_id}",
}
}
print(f"Sending prompt to :chat: '{user_prompt}'")
print(f"Endpoint: {chat_url}")
print(f"Payload: {json.dumps(chat_payload, indent=2)}")
get_stream(chat_url, chat_payload)
建立無狀態多輪對話
如要在無狀態對話中詢問後續問題,應用程式必須管理對話內容,方法是在每次提出新要求時,傳送完整的訊息記錄。以下各節說明如何定義及呼叫輔助函式,建立多輪對話:
傳送多輪對話要求
下列 multi_turn_Conversation 輔助函式會將訊息儲存在清單中,藉此管理對話內容。你可以根據先前的對話內容提出後續問題。在函式的酬載中,您可以參照資料代理程式,或使用內嵌環境直接提供資料來源。
chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"
# List that is used to track previous turns and is reused across requests
conversation_messages = []
data_agent_id = "data_agent_1"
# Helper function for calling the API
def multi_turn_Conversation(msg):
userMessage = {
"userMessage": {
"text": msg
}
}
# Send a multi-turn request by including previous turns and the new message
conversation_messages.append(userMessage)
# Construct the payload
chat_payload = {
"parent": f"projects/{billing_project}/locations/global",
"messages": conversation_messages,
# Use a data agent reference
"data_agent_context": {
"data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
# "credentials": looker_credentials
},
# Use inline context
# "inline_context": {
# "datasource_references": bigquery_data_sources,
# }
}
# Call the get_stream_multi_turn helper function to stream the response
get_stream_multi_turn(chat_url, chat_payload, conversation_messages)
在先前的範例中,將 data_agent_1 替換為資料代理程式的 ID,如「建立資料代理程式」中的程式碼範例區塊所定義。
您可以針對對話的每個回合呼叫 multi_turn_Conversation 輔助函式。下列程式碼範例說明如何傳送初始要求,然後根據先前的回覆內容傳送後續要求。
# Send first-turn request
multi_turn_Conversation("Which species of tree is most prevalent?")
# Send follow-up-turn request
multi_turn_Conversation("Can you show me the results as a bar chart?")
在先前的範例中,請依下列方式替換範例值:
- Which species of tree is most prevalent?:要傳送給資料代理程式的自然語言問題。
- Can you show me the results as a bar chart?:根據或修正先前問題的後續問題。
處理回覆
下列 get_stream_multi_turn 函式會處理串流 API 回應。這個函式與 get_stream 輔助函式類似,但會將回應儲存在 conversation_messages 清單中,以便為下一個回合儲存對話內容。
def get_stream_multi_turn(url, json, conversation_messages):
s = requests.Session()
acc = ''
with s.post(url, json=json, headers=headers, stream=True) as resp:
for line in resp.iter_lines():
if not line:
continue
decoded_line = str(line, encoding='utf-8')
if decoded_line == '[{':
acc = '{'
elif decoded_line == '}]':
acc += '}'
elif decoded_line == ',':
continue
else:
acc += decoded_line
if not is_json(acc):
continue
data_json = json_lib.loads(acc)
# Store the response that will be used in the next iteration
conversation_messages.append(data_json)
if not 'systemMessage' in data_json:
if 'error' in data_json:
handle_error(data_json['error'])
continue
if 'text' in data_json['systemMessage']:
handle_text_response(data_json['systemMessage']['text'])
elif 'schema' in data_json['systemMessage']:
handle_schema_response(data_json['systemMessage']['schema'])
elif 'data' in data_json['systemMessage']:
handle_data_response(data_json['systemMessage']['data'])
elif 'chart' in data_json['systemMessage']:
handle_chart_response(data_json['systemMessage']['chart'])
else:
colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter())
print(colored_json)
print('\n')
acc = ''
端對端程式碼範例
下列可展開的程式碼範例包含本指南涵蓋的所有工作。
使用 HTTP 和 Python 建構 BigQuery 和 Looker 的資料代理程式
import json import json as json_lib import textwrap import altair as alt import google.auth from google.auth.transport.requests import Request import IPython from IPython.display import display, HTML import pandas as pd from pygments import formatters, highlight, lexers import requests from google.colab import auth auth.authenticate_user() access_token = !gcloud auth application-default print-access-token headers = { "Authorization": f"Bearer {access_token[0]}", "Content-Type": "application/json", "x-server-timeout": "300", # Custom timeout up to 600s } ################### Data source details ################### billing_project = "your_billing_project" location = "global" system_instruction = "Help the user in analyzing their data" # BigQuery data source bigquery_data_sources = { "bq": { "tableReferences": [ { "projectId": "bigquery-public-data", "datasetId": "san_francisco", "tableId": "street_trees" } ] } } # Looker data source looker_credentials = { "oauth": { "secret": { "client_id": "your_looker_client_id", "client_secret": "your_looker_client_secret", } } } # To use access_token for authentication, uncomment the following looker_credentials code block and comment out the previous looker_credentials code block. # looker_credentials = { # "oauth": { # "token": { # "access_token": "your_looker_access_token", # } # } # } looker_data_source = { "looker": { "explore_references": [ { "looker_instance_uri": looker_instance_uri, "lookml_model": "your_model", "explore": "your_explore", }, { "looker_instance_uri": looker_instance_uri, "lookml_model": "your_model_2", "explore": "your_explore_2", }, # Add up to 5 total Explore references ], # Do not include the following line during agent creation # "credentials": looker_credentials } # Looker Studio data source looker_studio_data_source = { "studio":{ "studio_references": [ { "studio_datasource_id": "studio_datasource_id" } ] } } ################### Create data agent ################### data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents" data_agent_id = "data_agent_1" data_agent_payload = { "name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional "description": "This is the description of data_agent.", # Optional "data_analytics_agent": { "published_context": { "datasource_references": bigquery_data_sources, "system_instruction": system_instruction, # Optional: To enable advanced analysis with Python, include the following options block: "options": { "analysis": { "python": { "enabled": True } } } } } } params = {"data_agent_id": data_agent_id} # Optional data_agent_response = requests.post( data_agent_url, params=params, json=data_agent_payload, headers=headers ) if data_agent_response.status_code == 200: print("Data Agent created successfully!") print(json.dumps(data_agent_response.json(), indent=2)) else: print(f"Error creating Data Agent: {data_agent_response.status_code}") print(data_agent_response.text) ################### Create conversation ################### conversation_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/conversations" data_agent_id = "data_agent_1" conversation_id = "conversation _1" conversation_payload = { "agents": [ f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}" ], "name": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}" } params = { "conversation_id": conversation_id } conversation_response = requests.post(conversation_url, headers=headers, params=params, json=conversation_payload) if conversation_response.status_code == 200: print("Conversation created successfully!") print(json.dumps(conversation_response.json(), indent=2)) else: print(f"Error creating Conversation: {conversation_response.status_code}") print(conversation_response.text) ################### Chat with the API by using conversation (stateful) #################### chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat" data_agent_id = "data_agent_1" conversation_id = "conversation _1" # Construct the payload chat_payload = { "parent": f"projects/{billing_project}/locations/global", "messages": [ { "userMessage": { "text": "Make a bar graph for the top 5 states by the total number of airports" } } ], "conversation_reference": { "conversation": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}", "data_agent_context": { "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # "credentials": looker_credentials } } } # Call the get_stream function to stream the response get_stream(chat_url, chat_payload) ################### Chat with the API by using dataAgents (stateless) #################### chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat" data_agent_id = "data_agent_1" # Construct the payload chat_payload = { "parent": f"projects/{billing_project}/locations/global", "messages": [ { "userMessage": { "text": "Make a bar graph for the top 5 states by the total number of airports" } } ], "data_agent_context": { "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # "credentials": looker_credentials } } # Call the get_stream function to stream the response get_stream(chat_url, chat_payload) ################### Chat with the API by using inline context (stateless) #################### chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat" # Construct the payload chat_payload = { "parent": f"projects/{billing_project}/locations/global", "messages": [ { "userMessage": { "text": "Make a bar graph for the top 5 states by the total number of airports" } } ], "inline_context": { "datasource_references": bigquery_data_sources, # Optional - if wanting to use advanced analysis with python "options": { "analysis": { "python": { "enabled": True } } } } } # Call the get_stream function to stream the response get_stream(chat_url, chat_payload) ################### Multi-turn conversation ################### chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat" # List that is used to track previous turns and is reused across requests conversation_messages = [] data_agent_id = "data_agent_1" # Helper function for calling the API def multi_turn_Conversation(msg): userMessage = { "userMessage": { "text": msg } } # Send a multi-turn request by including previous turns and the new message conversation_messages.append(userMessage) # Construct the payload chat_payload = { "parent": f"projects/{billing_project}/locations/global", "messages": conversation_messages, # Use a data agent reference "data_agent_context": { "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # "credentials": looker_credentials }, # Use inline context # "inline_context": { # "datasource_references": bigquery_data_sources, # } } # Call the get_stream_multi_turn helper function to stream the response get_stream_multi_turn(chat_url, chat_payload, conversation_messages) # Send first-turn request multi_turn_Conversation("Which species of tree is most prevalent?") # Send follow-up-turn request multi_turn_Conversation("Can you show me the results as a bar chart?")
定義輔助函式
下列程式碼範例包含先前程式碼範例中使用的輔助函式定義。這些函式有助於剖析 API 的回應並顯示結果。
用於串流對話回覆的輔助 Python 函式
def is_json(str): try: json_object = json_lib.loads(str) except ValueError as e: return False return True def handle_text_response(resp): parts = resp['parts'] full_text = "".join(parts) if "\n" not in full_text and len(full_text) > 80: wrapped_text = textwrap.fill(full_text, width=80) print(wrapped_text) else: print(full_text) def get_property(data, field_name, default = ''): return data[field_name] if field_name in data else default def display_schema(data): fields = data['fields'] df = pd.DataFrame({ "Column": map(lambda field: get_property(field, 'name'), fields), "Type": map(lambda field: get_property(field, 'type'), fields), "Description": map(lambda field: get_property(field, 'description', '-'), fields), "Mode": map(lambda field: get_property(field, 'mode'), fields) }) display(df) def display_section_title(text): display(HTML('<h2>{}</h2>'.format(text))) def format_bq_table_ref(table_ref): return '{}.{}.{}'.format(table_ref['projectId'], table_ref['datasetId'], table_ref['tableId']) def format_looker_table_ref(table_ref): return 'lookmlModel: {}, explore: {}, lookerInstanceUri: {}'.format(table_ref['lookmlModel'], table_ref['explore'], table_ref['lookerInstanceUri']) def display_datasource(datasource): source_name = '' if 'studioDatasourceId' in datasource: source_name = datasource['studioDatasourceId'] elif 'lookerExploreReference' in datasource: source_name = format_looker_table_ref(datasource['lookerExploreReference']) else: source_name = format_bq_table_ref(datasource['bigqueryTableReference']) print(source_name) display_schema(datasource['schema']) def handle_schema_response(resp): if 'query' in resp: print(resp['query']['question']) elif 'result' in resp: display_section_title('Schema resolved') print('Data sources:') for datasource in resp['result']['datasources']: display_datasource(datasource) def handle_data_response(resp): if 'query' in resp: query = resp['query'] display_section_title('Retrieval query') print('Query name: {}'.format(query['name'])) if 'question' in query: print('Question: {}'.format(query['question'])) if 'datasources' in query: print('Data sources:') for datasource in query['datasources']: display_datasource(datasource) elif 'generatedSql' in resp: display_section_title('SQL generated') print(resp['generatedSql']) elif 'result' in resp: display_section_title('Data retrieved') fields = map(lambda field: get_property(field, 'name'), resp['result']['schema']['fields']) dict = {} for field in fields: dict[field] = map(lambda el: get_property(el, field), resp['result']['data']) display(pd.DataFrame(dict)) def handle_chart_response(resp): if 'query' in resp: print(resp['query']['instructions']) elif 'result' in resp: vegaConfig = resp['result']['vegaConfig'] alt.Chart.from_json(json_lib.dumps(vegaConfig)).display(); def handle_error(resp): display_section_title('Error') print('Code: {}'.format(resp['code'])) print('Message: {}'.format(resp['message'])) def get_stream(url, json): s = requests.Session() acc = '' with s.post(url, json=json, headers=headers, stream=True) as resp: for line in resp.iter_lines(): if not line: continue decoded_line = str(line, encoding='utf-8') if decoded_line == '[{': acc = '{' elif decoded_line == '}]': acc += '}' elif decoded_line == ',': continue else: acc += decoded_line if not is_json(acc): continue data_json = json_lib.loads(acc) if not 'systemMessage' in data_json: if 'error' in data_json: handle_error(data_json['error']) continue if 'text' in data_json['systemMessage']: handle_text_response(data_json['systemMessage']['text']) elif 'schema' in data_json['systemMessage']: handle_schema_response(data_json['systemMessage']['schema']) elif 'data' in data_json['systemMessage']: handle_data_response(data_json['systemMessage']['data']) elif 'chart' in data_json['systemMessage']: handle_chart_response(data_json['systemMessage']['chart']) else: colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter()) print(colored_json) print('\n') acc = '' def get_stream_multi_turn(url, json, conversation_messages): s = requests.Session() acc = '' with s.post(url, json=json, headers=headers, stream=True) as resp: for line in resp.iter_lines(): if not line: continue decoded_line = str(line, encoding='utf-8') if decoded_line == '[{': acc = '{' elif decoded_line == '}]': acc += '}' elif decoded_line == ',': continue else: acc += decoded_line if not is_json(acc): continue data_json = json_lib.loads(acc) # Store the response that will be used in the next iteration conversation_messages.append(data_json) if not 'systemMessage' in data_json: if 'error' in data_json: handle_error(data_json['error']) continue if 'text' in data_json['systemMessage']: handle_text_response(data_json['systemMessage']['text']) elif 'schema' in data_json['systemMessage']: handle_schema_response(data_json['systemMessage']['schema']) elif 'data' in data_json['systemMessage']: handle_data_response(data_json['systemMessage']['data']) elif 'chart' in data_json['systemMessage']: handle_chart_response(data_json['systemMessage']['chart']) else: colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter()) print(colored_json) print('\n') acc = ''