使用 HTTP 和 Python 构建数据代理

本页面将引导您使用 Python 向 Conversational Analytics API(通过 geminidataanalytics.googleapis.com 访问)发出 HTTP 请求。

本页面上的示例 Python 代码展示了如何完成以下任务:

此页面末尾包含完整版示例代码,以及用于流式传输 API 响应的辅助函数

配置初始设置和身份验证

以下示例 Python 代码会执行以下任务:

  • 导入所需的 Python 库
  • 使用 Google Cloud CLI 获取用于 HTTP 身份验证的访问令牌
  • 定义结算项目的变量和系统指令
from pygments import highlight, lexers, formatters
import pandas as pd
import json as json_lib
import requests
import json
import altair as alt
import IPython
from IPython.display import display, HTML
import google.auth
from google.auth.transport.requests import Request

from google.colab import auth
auth.authenticate_user()

access_token = !gcloud auth application-default print-access-token
headers = {
        "Authorization": f"Bearer {access_token[0]}",
        "Content-Type": "application/json",
        "x-server-timeout": "300", # Custom timeout up to 600s
}

billing_project = 'YOUR-BILLING-PROJECT'
system_instruction = 'YOUR-SYSTEM-INSTRUCTIONS'

按如下所示替换示例值:

  • YOUR-BILLING-PROJECT启用了所需 API 的结算项目的 ID。
  • YOUR-SYSTEM-INSTRUCTIONS:用于指导智能体行为并根据您的数据需求自定义智能体的系统指令。例如,您可以使用系统指令来定义业务术语、控制回答长度或设置数据格式。最好使用撰写有效的系统指令部分推荐的 YAML 格式定义系统指令,以提供详尽的结构化指导。

向 Looker 进行身份验证

如果您打算连接到 Looker 数据源,则需要向 Looker 实例进行身份验证。

使用 API 密钥

以下 Python 代码示例演示了如何使用 API 密钥向 Looker 实例进行代理身份验证。

looker_credentials = {
    "oauth": {
        "secret": {
            "client_id": "YOUR-LOOKER-CLIENT-ID",
            "client_secret": "YOUR-LOOKER-CLIENT-SECRET",
        }
    }
}

按如下所示替换示例值:

  • YOUR-LOOKER-CLIENT-ID:您生成的 Looker API 密钥的客户端 ID。
  • YOUR-LOOKER-CLIENT-SECRET:您生成的 Looker API 密钥的客户端密钥。

使用访问令牌

以下 Python 代码示例演示了如何使用访问令牌向 Looker 实例进行代理身份验证。

looker_credentials = {
    "oauth": {
        "token": {
            "access_token": "YOUR-TOKEN",
        }
    }
}

按如下所示替换示例值:

  • YOUR-TOKEN:您生成的用于向 Looker 进行身份验证的 access_token 值。

连接到数据源

以下部分介绍了如何为代理的数据源定义连接详细信息。智能体可以连接到 LookerBigQueryLooker Studio 中的数据。

连接到 Looker 数据

以下示例代码定义了与 Looker 探索的连接。如需与 Looker 实例建立连接,请验证您是否已生成 Looker API 密钥,如使用 Conversational Analytics API 对数据源进行身份验证并连接到数据源中所述。 您一次最多可以使用 Conversational Analytics API 连接到五个 Looker 探索。

连接到 Looker 数据源时,请注意以下事项:

  • 您可以在对话中查询任何包含的探索。
  • 代理一次只能查询一个探索。无法同时对多个探索进行查询。
  • 在同一对话中,代理可以查询多个探索。
  • 在包含多部分问题的对话中,或者在包含后续问题的对话中,代理可以在一次对话中查询多个探索。

    例如:某用户连接了两个探索,一个名为 cat-explore,另一个名为 dog-explore。用户输入问题“猫的数量多还是狗的数量多?”这将创建两个查询:一个用于统计 cat-explore 中的猫数,另一个用于统计 dog-explore 中的狗数。在完成这两个查询后,代理会比较这两个查询中的数字。

looker_instance_uri = "https://my_company.looker.com"
looker_data_source = {
    "looker": {
        "explore_references": {
            "looker_instance_uri": "https://your_company.looker.com"
            "lookml_model": "your_model",
            "explore": "your_explore",
       },
       {
            "looker_instance_uri": looker_instance_uri,
            "lookml_model": "your_model_2",
            "explore": "your_explore_2",
       },
       # Do not include the following line during agent creation
       "credentials": looker_credentials
    }
}

按如下所示替换示例值:

  • https://your_company.looker.com:Looker 实例的完整网址。
  • your_model:包含您要连接的探索的 LookML 模型的名称。
  • your_explore:您希望数据代理查询的 Looker 探索的名称。
  • my_model_2:包含您要连接的探索的第二个 LookML 模型的名称。您可以重复使用此变量,为最多 5 个探索添加其他模型。
  • my_explore_2:您希望数据代理查询的其他 Looker 探索的名称。您可以重复使用此变量,最多可添加 5 个探索。

连接到 BigQuery 数据

借助 Conversational Analytics API,您可以连接的 BigQuery 表的数量没有硬性限制。不过,连接到大量表格可能会降低准确率,或者导致您超出模型的输入令牌限制。

以下示例代码定义了与多个 BigQuery 表的连接,并包含可选的结构化上下文字段示例。为了提高代理的性能,您可以选择性地为 BigQuery 表提供结构化上下文,例如表和列说明、同义词、标记和示例查询。如需了解详情,请参阅为 BigQuery 数据源定义数据代理上下文

bigquery_data_sources = {
    "bq": {
        "tableReferences": [
            {
                "projectId": "my_project_id",
                "datasetId": "my_dataset_id",
                "tableId": "my_table_id",
                "schema": {
                    "description": "my_table_description",
                    "fields": [{
                        "name": "my_column_name",
                        "description": "my_column_description"
                    }]
                }
            },
            {
                "projectId": "my_project_id_2",
                "datasetId": "my_dataset_id_2",
                "tableId": "my_table_id_2"
            },
            {
                "projectId": "my_project_id_3",
                "datasetId": "my_dataset_id_3",
                "tableId": "my_table_id_3"
            },
        ]
    }
}

按如下所示替换示例值:

  • my_project_id:包含您要连接到的 BigQuery 数据集和表的 Google Cloud 项目的 ID。如需连接到公共数据集,请指定 bigquery-public-data
  • my_dataset_id:BigQuery 数据集的 ID。
  • my_table_id:BigQuery 表的 ID。
  • my_table_description:对表内容和用途的可选说明。
  • my_column_name:您要为其提供可选说明的表中的列的名称。
  • my_column_description:列内容和用途的可选说明。

连接到 Looker Studio 数据

以下示例代码定义了与 Looker Studio 数据源的连接。

looker_studio_data_source = {
    "studio":{
        "studio_references": [
            {
              "studio_datasource_id": "studio_datasource_id"
            }
        ]
    }
}

studio_datasource_id 替换为数据源 ID。

创建数据智能体

以下示例代码演示了如何通过向数据智能体创建端点发送 HTTP POST 请求来创建数据智能体。请求载荷包含以下详细信息:

  • 代理的完整资源名称。此值包括项目 ID、位置和代理的唯一标识符。
  • 数据代理的说明。
  • 数据代理的上下文,包括系统说明(在配置初始设置和身份验证中定义)和代理使用的数据源(在连接到数据源中定义)。

您还可以选择在请求载荷中添加 options 参数,以启用使用 Python 进行高级分析。 如需详细了解 options 参数以及您可以为对话配置的选项,请参阅 REST 资源:projects.locations.dataAgents

data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents"

data_agent_id = "data_agent_1"

data_agent_payload = {
      "name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional
      "description": "This is the description of data_agent_1.", # Optional

      "data_analytics_agent": {
          "published_context": {
              "datasource_references": bigquery_data_sources,
              "system_instruction": system_instruction,
              # Optional: To enable advanced analysis with Python, include the following options block:
              "options": {
                  "analysis": {
                      "python": {
                          "enabled": True
                      }
                  }
              }
          }
      }
  }

params = {"data_agent_id": data_agent_id} # Optional

data_agent_response = requests.post(
    data_agent_url, params=params, json=data_agent_payload, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agent created successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error creating Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

按如下所示替换示例值:

  • data_agent_1:数据智能体的唯一标识符。此值用于代理的资源名称中,并作为 data_agent_id 网址查询参数。
  • This is the description of data_agent_1.:数据智能体的说明。

创建对话

以下示例代码演示了如何创建与数据代理的对话。

conversation_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/conversations"

data_agent_id = "data_agent_1"
conversation_id = "conversation_1"

conversation_payload = {
    "agents": [
        f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
    ],
    "name": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}"
}
params = {
    "conversation_id": conversation_id
}

conversation_response = requests.post(conversation_url, headers=headers, params=params, json=conversation_payload)

if conversation_response.status_code == 200:
    print("Conversation created successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error creating Conversation: {conversation_response.status_code}")
    print(conversation_response.text)

按如下所示替换示例值:

  • data_agent_1:数据代理的 ID,如创建数据代理部分的示例代码块中所定义。
  • conversation_1:对话的唯一标识符。

管理数据智能体及对话

以下代码示例展示了如何使用 Conversational Analytics API 管理数据智能体及对话。您可以执行以下任务:

获取数据智能体

以下示例代码演示了如何通过向数据智能体资源网址发送 HTTP GET 请求来获取现有数据智能体。

data_agent_id = "data_agent_1"
data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"

data_agent_response = requests.get(
    data_agent_url, headers=headers
)

if data_agent_response.status_code == 200:
    print("Fetched Data Agent successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error: {data_agent_response.status_code}")
    print(data_agent_response.text)

在上面的示例中,将 data_agent_1 替换为要获取的数据智能体的 ID。

列出数据智能体

以下代码演示了如何通过向 dataAgents 端点发送 HTTP GET 请求来列出给定项目的所有数据智能体。

如需列出所有智能体,您必须拥有项目的 geminidataanalytics.dataAgents.list 权限。如需详细了解有哪些 IAM 角色包含此权限,请参阅预定义角色列表。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents"

data_agent_response = requests.get(
    data_agent_url, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agents Listed successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Listing Data Agents: {data_agent_response.status_code}")

YOUR-BILLING-PROJECT 替换为您的结算项目的 ID。

列出可访问的数据代理

以下代码演示了如何通过向 dataAgents:listAccessible 端点发送 HTTP GET 请求来列出给定项目的所有可访问的数据代理。

billing_project = "YOUR-BILLING-PROJECT"
creator_filter = "YOUR-CREATOR-FILTER"
location = "global"
data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents:listAccessible"

params = {
    "creator_filter": creator_filter
}

data_agent_response = requests.get(
    data_agent_url, headers=headers, params=params
)

if data_agent_response.status_code == 200:
    print("Accessible Data Agents Listed successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Listing Accessible Data Agents: {data_agent_response.status_code}")

按如下所示替换示例值:

  • YOUR-BILLING-PROJECT:结算项目的 ID。
  • YOUR-CREATOR-FILTER:要根据数据代理的创建者应用的过滤条件。可能的值包括 NONE(默认)、CREATOR_ONLYNOT_CREATOR_ONLY

更新数据智能体

以下示例代码演示了如何通过向数据智能体资源网址发送 HTTP PATCH 请求来更新数据智能体。请求载荷包含您要更改的字段的新值;请求参数包含 updateMask 参数,用于指定要更新的字段。

data_agent_id = "data_agent_1"
billing_project = "YOUR-BILLING-PROJECT"
location = "global"

data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"

payload = {
    "description": "Updated description of the data agent.",
    "data_analytics_agent": {
        "published_context": {
            "datasource_references": bigquery_data_sources,
            "system_instruction": system_instruction
        }
    },
}

fields = ["description", "data_analytics_agent"]
params = {
    "updateMask": ",".join(fields)
}

data_agent_response = requests.patch(
    data_agent_url, headers=headers, params=params, json=payload
)

if data_agent_response.status_code == 200:
    print("Data Agent updated successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Updating Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

按如下所示替换示例值:

  • data_agent_1:要更新的数据智能体的 ID。
  • YOUR-BILLING-PROJECT:结算项目的 ID。
  • Updated description of the data agent.:数据智能体的新说明。

获取数据智能体的 IAM 政策

以下示例代码演示了如何通过向数据智能体网址发送 HTTP POST 请求来获取数据智能体的 IAM 政策。请求载荷包含数据智能体路径。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
data_agent_id = "data_agent_1"

data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:getIamPolicy"

# Request body
payload = {
    "resource": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
}

data_agent_response = requests.post(
    data_agent_url, headers=headers, json=payload
)

if data_agent_response.status_code == 200:
    print("IAM Policy fetched successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error fetching IAM policy: {data_agent_response.status_code}")
    print(data_agent_response.text)

按如下所示替换示例值:

  • YOUR-BILLING-PROJECT:结算项目的 ID。
  • data_agent_1:要获取其 IAM 政策的数据智能体的 ID。

为数据智能体设置 IAM 政策

如需共享智能体,您可以使用 setIamPolicy 方法向用户分配特定智能体的 IAM 角色。以下示例代码演示了如何使用包含绑定的载荷对数据智能体网址进行 POST 调用。绑定指定了应向哪些用户分配哪些角色。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
data_agent_id = "data_agent_1"
role = "roles/geminidataanalytics.dataAgentEditor"
users = "222larabrown@gmail.com, cloudysanfrancisco@gmail.com"

data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:setIamPolicy"

# Request body
payload = {
    "policy": {
        "bindings": [
            {
                "role": role,
                "members": [
                    f"user:{i.strip()}" for i in users.split(",")
                ]
            }
        ]
    }
}

data_agent_response = requests.post(
    data_agent_url, headers=headers, json=payload
)

if data_agent_response.status_code == 200:
    print("IAM Policy set successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error setting IAM policy: {data_agent_response.status_code}")
    print(data_agent_response.text)

按如下所示替换示例值:

  • YOUR-BILLING-PROJECT:结算项目的 ID。
  • data_agent_1:要为其设置 IAM 政策的数据智能体的 ID。
  • 222larabrown@gmail.com, cloudysanfrancisco@gmail.com:要向其授予指定角色的用户的邮箱列表(以英文逗号分隔)。

删除数据智能体

以下示例代码演示了如何通过向数据智能体资源网址发送 HTTP DELETE 请求来软删除数据智能体。软删除意味着,虽然该智能体会被删除,但在 30 天内仍可检索到。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
data_agent_id = "data_agent_1"

data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"

data_agent_response = requests.delete(
    data_agent_url, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agent deleted successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Deleting Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

按如下所示替换示例值:

  • YOUR-BILLING-PROJECT:结算项目的 ID。
  • data_agent_1:要删除的数据智能体的 ID。

获取对话

以下示例代码演示了如何通过向对话资源网址发送 HTTP GET 请求来提取现有对话。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
conversation_id = "conversation_1"

conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations/{conversation_id}"

conversation_response = requests.get(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation fetched successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while fetching conversation: {conversation_response.status_code}")
    print(conversation_response.text)

按如下所示替换示例值:

  • YOUR-BILLING-PROJECT:结算项目的 ID。
  • conversation_1:要提取的对话的 ID。

列出对话

以下示例代码演示了如何通过向 conversations 端点发送 HTTP GET 请求来列出给定项目的对话。

默认情况下,此方法会返回您创建的对话。管理员(具有 cloudaicompanion.topicAdmin IAM 角色的用户)可以查看项目中的所有对话。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations"

conversation_response = requests.get(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation fetched successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while fetching conversation: {conversation_response.status_code}")
    print(conversation_response.text)

YOUR-BILLING-PROJECT 替换为启用了所需 API 的结算项目的 ID。

列出对话中的消息

以下示例代码演示了如何通过向对话的 messages 端点发送 HTTP GET 请求来列出对话中的所有消息。

如需列出消息,您必须拥有对话的 cloudaicompanion.topics.get 权限

billing_project = "YOUR-BILLING-PROJECT"
location = "global"

conversation_id = "conversation_1"

conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations/{conversation_id}/messages"

conversation_response = requests.get(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation fetched successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while fetching conversation: {conversation_response.status_code}")
    print(conversation_response.text)

按如下所示替换示例值:

  • YOUR-BILLING-PROJECT:结算项目的 ID。
  • conversation_1:要列出其消息的对话的 ID。

删除对话

以下示例代码演示了如何通过向对话资源网址发送 HTTP DELETE 请求来删除对话。管理员(具有 cloudaicompanion.topicAdmin Identity and Access Management 角色的用户)或具有 cloudaicompanion.topics.delete Identity and Access Management 权限的用户可以删除项目中的对话。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
conversation_id = "conversation_1"

conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations/{conversation_id}"

conversation_response = requests.delete(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation deleted successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while deleting conversation: {conversation_response.status_code}")
    print(conversation_response.text)

按如下所示替换示例值:

  • YOUR-BILLING-PROJECT:结算项目的 ID。
  • conversation_1:要删除的对话的 ID。

使用 API 提问

创建数据代理对话后,您可以提出有关数据的问题。

Conversational Analytics API 支持多轮对话,让用户可以提出基于之前的上下文的后续问题。该 API 提供了以下用于管理对话历史记录的方法:

  • 有状态聊天: Google Cloud 会存储和管理对话历史记录。有状态聊天本身就是多轮聊天,因为 API 会保留之前消息中的上下文。您只需在每轮中发送当前消息。
  • 无状态聊天:您的应用会管理对话历史记录。您必须在每条新消息中添加相关的先前消息。如需查看有关如何在无状态模式下管理多轮对话的详细示例,请参阅创建无状态多轮对话

有状态聊天

发送包含对话引用的有状态聊天请求

以下示例代码演示了如何使用您在之前的步骤中定义的对话向 API 提问。此示例使用 get_stream 辅助函数来流式传输回答。

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

data_agent_id = "data_agent_1"
conversation_id = "conversation_1"

# Construct the payload
chat_payload = {
    "parent": f"projects/{billing_project}/locations/global",
    "messages": [
        {
            "userMessage": {
                "text": "Make a bar graph for the top 5 states by the total number of airports"
            }
        }
    ],
    "conversation_reference": {
        "conversation": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}",
        "data_agent_context": {
            "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
            # "credentials": looker_credentials
        }
    }
}

# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)

按如下所示替换示例值:

  • data_agent_1:数据代理的 ID,如创建数据代理部分的示例代码块中所定义。
  • conversation_1:对话的唯一标识符。
  • Make a bar graph for the top 5 states by the total number of airports 用作示例提示。

无状态聊天

发送包含数据代理引用的无状态聊天请求

以下示例代码演示了如何使用您在之前的步骤中定义的数据代理向 API 提出无状态问题。此示例使用 get_stream 辅助函数来流式传输回答。

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

data_agent_id = "data_agent_1"

# Construct the payload
chat_payload = {
    "parent": f"projects/{billing_project}/locations/global",
    "messages": [
        {
            "userMessage": {
                "text": "Make a bar graph for the top 5 states by the total number of airports"
            }
        }
    ],
    "data_agent_context": {
        "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
        # "credentials": looker_credentials
    }
}

# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)

按如下所示替换示例值:

  • data_agent_1:数据代理的 ID,如创建数据代理部分的示例代码块中所定义。
  • Make a bar graph for the top 5 states by the total number of airports 用作示例提示。

发送包含内嵌上下文的无状态对话请求

以下示例代码演示了如何使用内嵌上下文向 API 提出无状态问题。此示例使用 get_stream 辅助函数来流式传输回答,并使用 BigQuery 数据源作为示例。

您还可以选择在请求载荷中添加 options 参数,以启用使用 Python 进行高级分析。 如需详细了解 options 参数以及您可以为对话配置的选项,请参阅 REST 资源:projects.locations.dataAgents 页面。

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"

# Construct the payload
chat_payload = {
    "parent": f"projects/{billing_project}/locations/global",
    "messages": [
        {
            "userMessage": {
                "text": "Make a bar graph for the top 5 states by the total number of airports"
            }
        }
    ],
    "inline_context": {
        "datasource_references": bigquery_data_sources,
          # Optional: To enable advanced analysis with Python, include the following options block:
          "options": {
              "analysis": {
                  "python": {
                      "enabled": True
                  }
              }
          }
    }
}

# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)

创建无状态多轮对话

如需在无状态对话中提出后续问题,您的应用必须通过在每次新请求中发送完整的消息历史记录来管理对话的上下文。以下部分展示了如何定义和调用辅助函数来创建多轮对话:

发送多轮请求

以下 multi_turn_Conversation 辅助函数通过将消息存储在列表中来管理对话上下文。这样,您可以发送基于之前的轮次的后续问题。在函数的载荷中,您可以引用数据代理,也可以使用内嵌上下文直接提供数据源。

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"

# List that is used to track previous turns and is reused across requests
conversation_messages = []

data_agent_id = "data_agent_1"

# Helper function for calling the API
def multi_turn_Conversation(msg):

  userMessage = {
      "userMessage": {
          "text": msg
      }
  }

  # Send a multi-turn request by including previous turns and the new message
  conversation_messages.append(userMessage)

  # Construct the payload
  chat_payload = {
      "parent": f"projects/{billing_project}/locations/global",
      "messages": conversation_messages,
      # Use a data agent reference
      "data_agent_context": {
          "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
          # "credentials": looker_credentials
      },
      # Use inline context
      # "inline_context": {
      #     "datasource_references": bigquery_data_sources,
      # }
  }

  # Call the get_stream_multi_turn helper function to stream the response
  get_stream_multi_turn(chat_url, chat_payload, conversation_messages)

在上述示例中,将 data_agent_1 替换为数据代理的 ID,如创建数据代理部分的示例代码块中所定义。

您可以在每轮对话中调用 multi_turn_Conversation 辅助函数。以下示例代码展示了如何发送初始请求,然后发送基于之前回答的后续请求。

# Send first-turn request
multi_turn_Conversation("Which species of tree is most prevalent?")

# Send follow-up-turn request
multi_turn_Conversation("Can you show me the results as a bar chart?")

在上面的示例中,按如下所示替换示例值:

  • Which species of tree is most prevalent?:要发送给数据智能体的自然语言问题。
  • Can you show me the results as a bar chart?:基于或优化上一个问题的后续问题。

处理回答

以下 get_stream_multi_turn 函数用于处理流式 API 响应。此函数类似于 get_stream 辅助函数,但它会将回答存储在 conversation_messages 列表中,以便保存下一轮对话的对话上下文。

def get_stream_multi_turn(url, json, conversation_messages):
    s = requests.Session()

    acc = ''

    with s.post(url, json=json, headers=headers, stream=True) as resp:
        for line in resp.iter_lines():
            if not line:
                continue

            decoded_line = str(line, encoding='utf-8')

            if decoded_line == '[{':
                acc = '{'
            elif decoded_line == '}]':
                acc += '}'
            elif decoded_line == ',':
                continue
            else:
                acc += decoded_line

            if not is_json(acc):
                continue

            data_json = json_lib.loads(acc)
            # Store the response that will be used in the next iteration
            conversation_messages.append(data_json)

            if not 'systemMessage' in data_json:
                if 'error' in data_json:
                    handle_error(data_json['error'])
                continue

            if 'text' in data_json['systemMessage']:
                handle_text_response(data_json['systemMessage']['text'])
            elif 'schema' in data_json['systemMessage']:
                handle_schema_response(data_json['systemMessage']['schema'])
            elif 'data' in data_json['systemMessage']:
                handle_data_response(data_json['systemMessage']['data'])
            elif 'chart' in data_json['systemMessage']:
                handle_chart_response(data_json['systemMessage']['chart'])
            else:
                colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter())
                print(colored_json)
            print('\n')
            acc = ''

端到端代码示例

以下可展开的代码示例包含本指南中涵盖的所有任务。

使用 HTTP 和 Python 构建数据代理

    from pygments import highlight, lexers, formatters
    import pandas as pd
    import json as json_lib
    import requests
    import json
    import altair as alt
    import IPython
    from IPython.display import display, HTML
    import requests
    import google.auth
    from google.auth.transport.requests import Request

    from google.colab import auth
    auth.authenticate_user()

    access_token = !gcloud auth application-default print-access-token
    headers = {
        "Authorization": f"Bearer {access_token[0]}",
        "Content-Type": "application/json",
        "x-server-timeout": "300", # Custom timeout up to 600s
    }

    ################### Data source details ###################

    billing_project = "your_billing_project"
    location = "global"
    system_instruction = "Help the user in analyzing their data"


    # BigQuery data source
    bigquery_data_sources = {
        "bq": {
        "tableReferences": [
            {
            "projectId": "bigquery-public-data",
            "datasetId": "san_francisco",
            "tableId": "street_trees"
            }
        ]
        }
    }

    # Looker data source
    looker_credentials = {
        "oauth": {
            "secret": {
            "client_id": "your_looker_client_id",
            "client_secret": "your_looker_client_secret",
            }
        }
    }

    # To use access_token for authentication, uncomment the following looker_credentials code block and comment out the previous looker_credentials code block.
    # looker_credentials = {
    #     "oauth": {
    #         "token": {
    #           "access_token": "your_looker_access_token",
    #         }
    #     }
    # }

    looker_data_source = {
        "looker": {
        "explore_references": {
            "looker_instance_uri": "https://my_company.looker.com",
            "lookml_model": "my_model",
            "explore": "my_explore",
        },
        # Do not include the following line during agent creation
        # "credentials": looker_credentials
    }

    # Looker Studio data source
    looker_studio_data_source = {
        "studio":{
            "studio_references":
            [
                {
                "datasource_id": "your_studio_datasource_id"
                }
            ]
        }
    }

    ################### Create data agent ###################
    data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents"

    data_agent_id = "data_agent_1"

    data_agent_payload = {
        "name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional
        "description": "This is the description of data_agent.", # Optional

        "data_analytics_agent": {
            "published_context": {
                "datasource_references": bigquery_data_sources,
                "system_instruction": system_instruction,
                # Optional: To enable advanced analysis with Python, include the following options block:
                "options": {
                    "analysis": {
                        "python": {
                            "enabled": True
                        }
                    }
                }
            }
        }
    }

    params = {"data_agent_id": data_agent_id} # Optional

    data_agent_response = requests.post(
        data_agent_url, params=params, json=data_agent_payload, headers=headers
    )

    if data_agent_response.status_code == 200:
        print("Data Agent created successfully!")
        print(json.dumps(data_agent_response.json(), indent=2))
    else:
        print(f"Error creating Data Agent: {data_agent_response.status_code}")
        print(data_agent_response.text)


    ################### Create conversation ###################

    conversation_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/conversations"

    data_agent_id = "data_agent_1"
    conversation_id = "conversation _1"

    conversation_payload = {
        "agents": [
            f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
        ],
        "name": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}"
    }
    params = {
        "conversation_id": conversation_id
    }

    conversation_response = requests.post(conversation_url, headers=headers, params=params, json=conversation_payload)

    if conversation_response.status_code == 200:
        print("Conversation created successfully!")
        print(json.dumps(conversation_response.json(), indent=2))
    else:
        print(f"Error creating Conversation: {conversation_response.status_code}")
        print(conversation_response.text)


    ################### Chat with the API by using conversation (stateful) ####################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

    data_agent_id = "data_agent_1"
    conversation_id = "conversation _1"

    # Construct the payload
    chat_payload = {
        "parent": f"projects/{billing_project}/locations/global",
        "messages": [
            {
                "userMessage": {
                    "text": "Make a bar graph for the top 5 states by the total number of airports"
                }
            }
        ],
        "conversation_reference": {
            "conversation": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}",
            "data_agent_context": {
                "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
                # "credentials": looker_credentials
            }
        }
    }

    # Call the get_stream function to stream the response
    get_stream(chat_url, chat_payload)

    ################### Chat with the API by using dataAgents (stateless) ####################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

    data_agent_id = "data_agent_1"

    # Construct the payload
    chat_payload = {
        "parent": f"projects/{billing_project}/locations/global",
        "messages": [
            {
                "userMessage": {
                    "text": "Make a bar graph for the top 5 states by the total number of airports"
                }
            }
        ],
        "data_agent_context": {
            "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
            # "credentials": looker_credentials
        }
    }

    # Call the get_stream function to stream the response
    get_stream(chat_url, chat_payload)

    ################### Chat with the API by using inline context (stateless) ####################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"

    # Construct the payload
    chat_payload = {
        "parent": f"projects/{billing_project}/locations/global",
        "messages": [
            {
                "userMessage": {
                    "text": "Make a bar graph for the top 5 states by the total number of airports"
                }
            }
        ],
        "inline_context": {
            "datasource_references": bigquery_data_sources,
            # Optional - if wanting to use advanced analysis with python
            "options": {
                "analysis": {
                    "python": {
                        "enabled": True
                    }
                }
            }
        }
    }

    # Call the get_stream function to stream the response
    get_stream(chat_url, chat_payload)

    ################### Multi-turn conversation ###################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"

    # List that is used to track previous turns and is reused across requests
    conversation_messages = []

    data_agent_id = "data_agent_1"

    # Helper function for calling the API
    def multi_turn_Conversation(msg):

      userMessage = {
          "userMessage": {
              "text": msg
          }
      }

      # Send a multi-turn request by including previous turns and the new message
      conversation_messages.append(userMessage)

      # Construct the payload
      chat_payload = {
          "parent": f"projects/{billing_project}/locations/global",
          "messages": conversation_messages,
          # Use a data agent reference
          "data_agent_context": {
              "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
              # "credentials": looker_credentials
          },
          # Use inline context
          # "inline_context": {
          #     "datasource_references": bigquery_data_sources,
          # }
      }

      # Call the get_stream_multi_turn helper function to stream the response
      get_stream_multi_turn(chat_url, chat_payload, conversation_messages)

    # Send first-turn request
    multi_turn_Conversation("Which species of tree is most prevalent?")

    # Send follow-up-turn request
    multi_turn_Conversation("Can you show me the results as a bar chart?")
    

以下可展开的代码示例包含用于流式传输聊天回答的 Python 辅助函数。

用于流式传输聊天回答的辅助 Python 函数

    def is_json(str):
      try:
          json_object = json_lib.loads(str)
      except ValueError as e:
          return False
      return True

    def handle_text_response(resp):
      parts = resp['parts']
      print(''.join(parts))

    def get_property(data, field_name, default = ''):
      return data[field_name] if field_name in data else default

    def display_schema(data):
      fields = data['fields']
      df = pd.DataFrame({
        "Column": map(lambda field: get_property(field, 'name'), fields),
        "Type": map(lambda field: get_property(field, 'type'), fields),
        "Description": map(lambda field: get_property(field, 'description', '-'), fields),
        "Mode": map(lambda field: get_property(field, 'mode'), fields)
      })
      display(df)

    def display_section_title(text):
      display(HTML('<h2>{}</h2>'.format(text)))

    def format_bq_table_ref(table_ref):
      return '{}.{}.{}'.format(table_ref['projectId'], table_ref['datasetId'], table_ref['tableId'])

    def format_looker_table_ref(table_ref):
      return 'lookmlModel: {}, explore: {}, lookerInstanceUri: {}'.format(table_ref['lookmlModel'], table_ref['explore'], table_ref['lookerInstanceUri'])

    def display_datasource(datasource):
      source_name = ''

      if 'studioDatasourceId' in datasource:
        source_name = datasource['studioDatasourceId']
      elif 'lookerExploreReference' in datasource:
        source_name = format_looker_table_ref(datasource['lookerExploreReference'])
      else:
        source_name = format_bq_table_ref(datasource['bigqueryTableReference'])

      print(source_name)
      display_schema(datasource['schema'])

    def handle_schema_response(resp):
      if 'query' in resp:
        print(resp['query']['question'])
      elif 'result' in resp:
        display_section_title('Schema resolved')
        print('Data sources:')
        for datasource in resp['result']['datasources']:
          display_datasource(datasource)

    def handle_data_response(resp):
      if 'query' in resp:
        query = resp['query']
        display_section_title('Retrieval query')
        print('Query name: {}'.format(query['name']))
        print('Question: {}'.format(query['question']))
        print('Data sources:')
        for datasource in query['datasources']:
          display_datasource(datasource)
      elif 'generatedSql' in resp:
        display_section_title('SQL generated')
        print(resp['generatedSql'])
      elif 'result' in resp:
        display_section_title('Data retrieved')

        fields = map(lambda field: get_property(field, 'name'), resp['result']['schema']['fields'])
        dict = {}

        for field in fields:
          dict[field] = map(lambda el: get_property(el, field), resp['result']['data'])

        display(pd.DataFrame(dict))

    def handle_chart_response(resp):
      if 'query' in resp:
        print(resp['query']['instructions'])
      elif 'result' in resp:
        vegaConfig = resp['result']['vegaConfig']
        alt.Chart.from_json(json_lib.dumps(vegaConfig)).display();

    def handle_error(resp):
      display_section_title('Error')
      print('Code: {}'.format(resp['code']))
      print('Message: {}'.format(resp['message']))

    def get_stream(url, json):
      s = requests.Session()

      acc = ''

      with s.post(url, json=json, headers=headers, stream=True) as resp:
        for line in resp.iter_lines():
          if not line:
            continue

          decoded_line = str(line, encoding='utf-8')

          if decoded_line == '[{':
            acc = '{'
          elif decoded_line == '}]':
            acc += '}'
          elif decoded_line == ',':
            continue
          else:
            acc += decoded_line

          if not is_json(acc):
            continue

          data_json = json_lib.loads(acc)

          if not 'systemMessage' in data_json:
            if 'error' in data_json:
                handle_error(data_json['error'])
            continue

          if 'text' in data_json['systemMessage']:
            handle_text_response(data_json['systemMessage']['text'])
          elif 'schema' in data_json['systemMessage']:
            handle_schema_response(data_json['systemMessage']['schema'])
          elif 'data' in data_json['systemMessage']:
            handle_data_response(data_json['systemMessage']['data'])
          elif 'chart' in data_json['systemMessage']:
            handle_chart_response(data_json['systemMessage']['chart'])
          else:
            colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter())
            print(colored_json)
            print('\n')
            acc = ''

    def get_stream_multi_turn(url, json, conversation_messages):
        s = requests.Session()

        acc = ''

        with s.post(url, json=json, headers=headers, stream=True) as resp:
            for line in resp.iter_lines():
                if not line:
                    continue

                decoded_line = str(line, encoding='utf-8')

                if decoded_line == '[{':
                    acc = '{'
                elif decoded_line == '}]':
                    acc += '}'
                elif decoded_line == ',':
                    continue
                else:
                    acc += decoded_line

                if not is_json(acc):
                    continue

                data_json = json_lib.loads(acc)
                # Store the response that will be used in the next iteration
                conversation_messages.append(data_json)

                if not 'systemMessage' in data_json:
                    if 'error' in data_json:
                        handle_error(data_json['error'])
                    continue

                if 'text' in data_json['systemMessage']:
                    handle_text_response(data_json['systemMessage']['text'])
                elif 'schema' in data_json['systemMessage']:
                    handle_schema_response(data_json['systemMessage']['schema'])
                elif 'data' in data_json['systemMessage']:
                    handle_data_response(data_json['systemMessage']['data'])
                elif 'chart' in data_json['systemMessage']:
                    handle_chart_response(data_json['systemMessage']['chart'])
                else:
                    colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter())
                    print(colored_json)
                print('\n')
                acc = ''