使用 Python SDK 构建数据代理

本页面介绍了如何使用 Python SDK 向 Conversational Analytics API 发出请求。示例 Python 代码演示了如何完成以下任务:

进行身份验证并设置环境

如需使用适用于 Conversational Analytics API 的 Python SDK,请按照 Conversational Analytics API SDK Colaboratory 笔记本中的说明下载并安装该 SDK。请注意,SDK Colab 的下载方法及内容可能会发生变化。

完成笔记本中的设置说明部分后,您便可以使用以下代码导入所需的 SDK 库,在 Colaboratory 环境中对您的 Google 账号进行身份验证,并初始化一个用于发出 API 请求的客户端:

from google.colab import auth
auth.authenticate_user()

from google.cloud import geminidataanalytics

data_agent_client = geminidataanalytics.DataAgentServiceClient()
data_chat_client = geminidataanalytics.DataChatServiceClient()

指定结算项目和系统指令

以下示例 Python 代码定义了在整个脚本中使用的结算项目和系统指令:

# Billing project
billing_project = "my_project_name"

# System instructions
system_instruction = "Help the user analyze their data."

按如下所示替换示例值:

  • my_project_name已启用所需 API 的结算项目的 ID。
  • Help the user analyze their data.:用于指导智能体行为并根据您的数据需求自定义智能体的系统指令。例如,您可以使用系统指令来定义业务术语、控制回答长度或设置数据格式。最好使用撰写有效的系统指令部分推荐的 YAML 格式定义系统指令,以提供详尽的结构化指导。

连接到数据源

以下部分介绍了如何为代理的数据源定义连接详细信息。智能体可以连接到 LookerBigQueryLooker Studio 中的数据。

连接到 Looker 数据

以下代码示例展示了如何使用 API 密钥或访问令牌定义与 Looker 探索的连接的详细信息。 您一次最多可以使用 Conversational Analytics API 连接到 5 个 Looker 探索。

连接到 Looker 数据源时,请注意以下事项:

  • 您可以在对话中查询任何包含的探索。
  • 代理一次只能查询一个探索。无法同时对多个探索进行查询。
  • 在同一对话中,代理可以查询多个探索。
  • 在包含多部分问题的对话中,或者在包含后续问题的对话中,代理可以在一次对话中查询多个探索。

    例如:某用户连接了两个探索,一个名为 cat-explore,另一个名为 dog-explore。用户输入问题“猫的数量多还是狗的数量多?”这将创建两个查询:一个用于统计 cat-explore 中的猫数,另一个用于统计 dog-explore 中的狗数。在完成这两个查询后,代理会比较这两个查询中的数字。

API 密钥

您可以按照使用 Conversational Analytics API 对数据源进行身份验证并连接到数据源中所述,使用生成的 Looker API 密钥与 Looker 实例建立连接。

looker_client_id = "my_looker_client_id"
looker_client_secret = "my_looker_client_secret"
looker_instance_uri = "https://my_company.looker.com"
lookml_model_1 = "my_model"
explore_1 = "my_explore"
lookml_model_2 = "my_model_2"
explore_2 = "my_explore_2"

looker_explore_reference = geminidataanalytics.LookerExploreReference()
looker_explore_reference.looker_instance_uri = looker_instance_uri
looker_explore_reference.lookml_model = "my_model"
looker_explore_reference.explore = "my_explore"

looker_explore_reference2 = geminidataanalytics.LookerExploreReference()
looker_explore_reference2.looker_instance_uri = looker_instance_uri
looker_explore_reference2.lookml_model = "my_model_2"
looker_explore_reference2.explore = "my_explore_2"

credentials = geminidataanalytics.Credentials()
credentials.oauth.secret.client_id = looker_client_id
credentials.oauth.secret.client_secret = looker_client_secret

datasource_references = geminidataanalytics.DatasourceReferences()
datasource_references.looker.explore_references = [looker_explore_reference, looker_explore_reference2]

# Do not include the following line during agent creation
datasource_references.credentials = credentials

按如下所示替换示例值:

  • my_looker_client_id:您生成的 Looker API 密钥的客户端 ID。
  • my_looker_client_secret:您生成的 Looker API 密钥的客户端密钥。
  • https://my_company.looker.com:Looker 实例的完整网址。
  • my_model:包含您要连接的探索的 LookML 模型的名称。
  • my_explore:您希望数据代理查询的 Looker 探索的名称。
  • my_model_2:包含您要连接的探索的第二个 LookML 模型的名称。您可以重复使用此变量,为最多 5 个探索添加其他模型。
  • my_explore_2:您希望数据代理查询的其他 Looker 探索的名称。您可以重复使用此变量,最多可添加 5 个探索。

访问令牌

您可以按照使用 Conversational Analytics API 对数据源进行身份验证并连接到数据源中所述,使用访问令牌与 Looker 实例建立连接。

looker_access_token = "my_access_token"
looker_instance_uri = "https://my_company.looker.com"
lookml_model = "my_model"
explore = "my_explore"
lookml_model_2 = "my_model_2"
explore_2 = "my_explore_2"

looker_explore_reference = geminidataanalytics.LookerExploreReference()
looker_explore_reference.looker_instance_uri = looker_instance_uri
looker_explore_reference.lookml_model = "my_model"
looker_explore_reference.explore = "my_explore"

looker_explore_reference2 = geminidataanalytics.LookerExploreReference()
looker_explore_reference2.looker_instance_uri = looker_instance_uri
looker_explore_reference2.lookml_model = "my_model_2"
looker_explore_reference2.explore = "my_explore_2"

credentials = geminidataanalytics.Credentials()
credentials.oauth.secret.client_id = looker_client_id
credentials.oauth.secret.client_secret = looker_client_secret

datasource_references = geminidataanalytics.DatasourceReferences()
datasource_references.looker.explore_references = [looker_explore_reference, looker_explore_reference2]

# Do not include the following line during agent creation
datasource_references.credentials = credentials

按如下所示替换示例值:

  • my_access_token:您生成的用于向 Looker 进行身份验证的 access_token 值。
  • https://my_company.looker.com:Looker 实例的完整网址。
  • my_model:包含您要连接的探索的 LookML 模型的名称。
  • my_explore:您希望数据代理查询的 Looker 探索的名称。
  • my_model_2:包含您要连接的探索的第二个 LookML 模型的名称。您可以重复使用此变量,为最多 5 个探索添加其他模型。
  • my_explore_2:您希望数据代理查询的其他 Looker 探索的名称。您可以重复使用此变量,最多可添加 5 个探索。

连接到 BigQuery 数据

借助 Conversational Analytics API,您可以连接的 BigQuery 表的数量没有硬性限制。不过,连接到大量表格可能会降低准确率,或者导致您超出模型的输入令牌限制。

以下示例代码定义了与多个 BigQuery 表的连接,并包含可选的结构化上下文字段示例。为了提高代理的性能,您可以选择性地为 BigQuery 表提供结构化上下文,例如表和列说明、同义词、标记和示例查询。如需了解详情,请参阅为 BigQuery 数据源定义数据代理上下文

bigquery_table_reference_1 = geminidataanalytics.BigQueryTableReference()
bigquery_table_reference_1.project_id = "my_project_id_1"
bigquery_table_reference_1.dataset_id = "my_dataset_id_1"
bigquery_table_reference_1.table_id = "my_table_id_1"
bigquery_table_reference_1.schema = geminidataanalytics.Schema()
bigquery_table_reference_1.schema.description = "my_table_description"
bigquery_table_reference_1.schema.fields = [
    geminidataanalytics.Field(
        name="my_column_name",
        description="my_column_description"
    )
]

bigquery_table_reference_2 = geminidataanalytics.BigQueryTableReference()
bigquery_table_reference_2.project_id = "my_project_id_2"
bigquery_table_reference_2.dataset_id = "my_dataset_id_2"
bigquery_table_reference_2.table_id = "my_table_id_2"

bigquery_table_reference_3 = geminidataanalytics.BigQueryTableReference()
bigquery_table_reference_3.project_id = "my_project_id_3"
bigquery_table_reference_3.dataset_id = "my_dataset_id_3"
bigquery_table_reference_3.table_id = "my_table_id_3"

# Connect to your data source
datasource_references = geminidataanalytics.DatasourceReferences()
datasource_references.bq.table_references = [bigquery_table_reference_1, bigquery_table_reference_2, bigquery_table_reference_3]

按如下所示替换示例值:

  • my_project_id_1:包含您要连接到的 BigQuery 数据集和表的 Google Cloud 项目的 ID。如需连接到公共数据集,请指定 bigquery-public-data
  • my_dataset_id_1:BigQuery 数据集的 ID。 例如 san_francisco
  • my_table_id_1:BigQuery 表的 ID。例如 street_trees
  • my_table_description:对表内容和用途的可选说明。
  • my_column_name:您要为其提供可选说明的表中的列的名称。
  • my_column_description:列内容和用途的可选说明。

连接到 Looker Studio 数据

以下示例代码定义了与 Looker Studio 数据源的连接。

studio_datasource_id = "my_datasource_id"

studio_references = geminidataanalytics.StudioDatasourceReference()
studio_references.datasource_id = studio_datasource_id

## Connect to your data source
datasource_references.studio.studio_references = [studio_references]

在上面的示例中,将 my_datasource_id 替换为数据源 ID。

为有状态或无状态聊天设置上下文

Conversational Analytics API 支持多轮对话,让用户可以提出基于之前的上下文的后续问题。以下示例 Python 代码演示了如何为有状态或无状态聊天设置上下文:

  • 有状态聊天: Google Cloud 会存储和管理对话历史记录。有状态聊天本身就是多轮聊天,因为 API 会保留之前消息中的上下文。您只需在每轮中发送当前消息。
  • 无状态聊天:您的应用会管理对话历史记录。您必须在每条新消息中包含完整的对话历史记录。如需查看有关如何在无状态模式下管理多轮对话的详细示例,请参阅创建无状态多轮对话

有状态聊天

以下代码示例为有状态聊天设置了上下文,其中 Google Cloud 会存储和管理对话历史记录。您还可以选择在以下示例代码中添加 published_context.options.analysis.python.enabled = True 行,以启用使用 Python 进行高级分析。

# Set up context for stateful chat
published_context = geminidataanalytics.Context()
published_context.system_instruction = system_instruction
published_context.datasource_references = datasource_references
# Optional: To enable advanced analysis with Python, include the following line:
published_context.options.analysis.python.enabled = True

无状态聊天

以下示例代码为无状态聊天设置了上下文,在这种聊天中,您必须在每条消息中发送完整的对话历史记录。您还可以选择在以下示例代码中添加 inline_context.options.analysis.python.enabled = True 行,以启用使用 Python 进行高级分析。

# Set up context for stateless chat
# datasource_references.looker.credentials = credentials
inline_context = geminidataanalytics.Context()
inline_context.system_instruction = system_instruction
inline_context.datasource_references = datasource_references
# Optional: To enable advanced analysis with Python, include the following line:
inline_context.options.analysis.python.enabled = True

创建数据智能体

以下示例 Python 代码会发出 API 请求来创建数据代理,然后您可以使用该代理针对数据进行对话。该数据代理配置为使用指定的数据源、系统指令和上下文。

data_agent_id = "data_agent_1"

data_agent = geminidataanalytics.DataAgent()
data_agent.data_analytics_agent.published_context = published_context
data_agent.name = f"projects/{billing_project}/locations/global/dataAgents/{data_agent_id}" # Optional

request = geminidataanalytics.CreateDataAgentRequest(
    parent=f"projects/{billing_project}/locations/global",
    data_agent_id=data_agent_id, # Optional
    data_agent=data_agent,
)

try:
    data_agent_client.create_data_agent(request=request)
    print("Data Agent created")
except Exception as e:
    print(f"Error creating Data Agent: {e}")

在上面的示例中,将值 data_agent_1 替换为数据代理的唯一标识符。

创建对话

以下示例 Python 代码会发出 API 请求来创建对话。

# Initialize request arguments
data_agent_id = "data_agent_1"
conversation_id = "conversation_1"

conversation = geminidataanalytics.Conversation()
conversation.agents = [f'projects/{billing_project}/locations/global/dataAgents/{data_agent_id}']
conversation.name = f"projects/{billing_project}/locations/global/conversations/{conversation_id}"

request = geminidataanalytics.CreateConversationRequest(
    parent=f"projects/{billing_project}/locations/global",
    conversation_id=conversation_id,
    conversation=conversation,
)

# Make the request
response = data_chat_client.create_conversation(request=request)

# Handle the response
print(response)

按如下所示替换示例值:

  • data_agent_1:数据代理的 ID,如创建数据代理部分的示例代码块中所定义。
  • conversation_1:对话的唯一标识符。

管理数据智能体及对话

以下代码示例展示了如何使用 Conversational Analytics API 管理数据智能体及对话。您可以执行以下任务:

获取数据智能体

以下示例 Python 代码演示了如何发出 API 请求来检索您之前创建的数据智能体。

# Initialize request arguments
data_agent_id = "data_agent_1"
request = geminidataanalytics.GetDataAgentRequest(
    name=f"projects/{billing_project}/locations/global/dataAgents/{data_agent_id}",
)

# Make the request
response = data_agent_client.get_data_agent(request=request)

# Handle the response
print(response)

在上面示例中,将值 data_agent_1 替换为您要检索的数据代理的唯一标识符。

列出数据智能体

以下代码演示了如何通过调用 list_data_agents 方法来列出给定项目的所有数据智能体。如需列出所有智能体,您必须拥有项目的 geminidataanalytics.dataAgents.list 权限。如需详细了解有哪些 IAM 角色包含此权限,请参阅预定义角色列表。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
request = geminidataanalytics.ListDataAgentsRequest(
    parent=f"projects/{billing_project}/locations/global",
)

# Make the request
page_result = data_agent_client.list_data_agents(request=request)

# Handle the response
for response in page_result:
    print(response)

YOUR-BILLING-PROJECT 替换为您的结算项目的 ID。

列出可访问的数据代理

以下代码演示了如何通过调用 list_accessible_data_agents 方法来列出给定项目的所有可访问的数据代理。

billing_project = "YOUR-BILLING-PROJECT"
creator_filter = "YOUR-CREATOR-FILTER"
location = "global"
request = geminidataanalytics.ListAccessibleDataAgentsRequest(
    parent=f"projects/{billing_project}/locations/global",
    creator_filter=creator_filter
)

# Make the request
page_result = data_agent_client.list_accessible_data_agents(request=request)

# Handle the response
for response in page_result:
    print(response)

按如下所示替换示例值:

  • YOUR-BILLING-PROJECT:结算项目的 ID。
  • YOUR-CREATOR-FILTER:要根据数据代理的创建者应用的过滤条件。可能的值包括 NONE(默认)、CREATOR_ONLYNOT_CREATOR_ONLY

更新数据智能体

以下示例代码演示了如何通过对数据智能体资源调用 update_data_agent 方法来更新相应数据智能体。该请求需要一个 DataAgent 对象,其中包含您要更改的字段的新值;还需要一个 update_mask 参数,该参数接收一个 FieldMask 对象作为输入来指定要更新的字段。

如需更新数据智能体,您必须拥有该智能体的 geminidataanalytics.dataAgents.update IAM 权限。如需详细了解有哪些 IAM 角色包含此权限,请参阅预定义角色列表。

data_agent_id = "data_agent_1"
billing_project = "YOUR-BILLING-PROJECT"
data_agent = geminidataanalytics.DataAgent()
data_agent.data_analytics_agent.published_context = published_context
data_agent.name = f"projects/{billing_project}/locations/global/dataAgents/{data_agent_id}"
data_agent.description = "Updated description of the data agent."

update_mask = field_mask_pb2.FieldMask(paths=['description', 'data_analytics_agent.published_context'])

request = geminidataanalytics.UpdateDataAgentRequest(
    data_agent=data_agent,
    update_mask=update_mask,
)

try:
    # Make the request
    data_agent_client.update_data_agent(request=request)
    print("Data Agent Updated")
except Exception as e:
    print(f"Error updating Data Agent: {e}")

按如下所示替换示例值:

  • data_agent_1:要更新的数据智能体的 ID。
  • YOUR-BILLING-PROJECT:结算项目的 ID。
  • Updated description of the data agent.:更新后的数据智能体的说明。

获取数据智能体的 IAM 政策

以下示例代码演示了如何使用 get_iam_policy 方法来获取数据智能体的 IAM 政策。该请求指定了数据智能体资源路径。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
data_agent_id = "data_agent_1"

resource = f"projects/{billing_project}/locations/global/dataAgents/{data_agent_id}"
request = iam_policy_pb2.GetIamPolicyRequest(
            resource=resource,
        )
try:
    response = data_agent_client.get_iam_policy(request=request)
    print("IAM Policy fetched successfully!")
    print(f"Response: {response}")
except Exception as e:
    print(f"Error setting IAM policy: {e}")

按如下所示替换示例值:

  • YOUR-BILLING-PROJECT:结算项目的 ID。
  • data_agent_1:要获取其 IAM 政策的数据智能体的 ID。

为数据智能体设置 IAM 政策

如需共享智能体,您可以使用 set_iam_policy 方法向用户分配特定智能体的 IAM 角色。该请求包含用于指定应向哪些用户分配哪些角色的绑定。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
data_agent_id = "data_agent_1"
role = "roles/geminidataanalytics.dataAgentEditor"
users = "222larabrown@gmail.com, cloudysanfrancisco@gmail.com"

resource = f"projects/{billing_project}/locations/global/dataAgents/{data_agent_id}"

# Construct the IAM policy
binding = policy_pb2.Binding(
    role=role,
    members= [f"user:{i.strip()}" for i in users.split(",")]
)

policy = policy_pb2.Policy(bindings=[binding])

# Create the request
request = iam_policy_pb2.SetIamPolicyRequest(
    resource=resource,
    policy=policy
)

# Send the request
try:
    response = data_agent_client.set_iam_policy(request=request)
    print("IAM Policy set successfully!")
    print(f"Response: {response}")
except Exception as e:
    print(f"Error setting IAM policy: {e}")

按如下所示替换示例值:

  • YOUR-BILLING-PROJECT:结算项目的 ID。
  • data_agent_1:要为其设置 IAM 政策的数据智能体的 ID。
  • 222larabrown@gmail.com, cloudysanfrancisco@gmail.com:要向其授予指定角色的用户的邮箱列表(以英文逗号分隔)。

删除数据智能体

以下示例代码演示了如何使用 delete_data_agent 方法来软删除数据智能体。软删除智能体时,虽然该智能体会被删除,但在 30 天内仍可检索到。该请求指定了数据智能体资源网址。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
data_agent_id = "data_agent_1"

request = geminidataanalytics.DeleteDataAgentRequest(
    name=f"projects/{billing_project}/locations/global/dataAgents/{data_agent_id}",
)

try:
    # Make the request
    data_agent_client.delete_data_agent(request=request)
    print("Data Agent Deleted")
except Exception as e:
    print(f"Error deleting Data Agent: {e}")

按如下所示替换示例值:

  • YOUR-BILLING-PROJECT:结算项目的 ID。
  • data_agent_1:要删除的数据智能体的 ID。

获取对话

以下示例代码演示了如何使用 get_conversation 方法来获取有关现有对话的信息。该请求指定了对话资源路径。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
conversation_id = "conversation_1"

request = geminidataanalytics.GetConversationRequest(
    name = f"projects/{billing_project}/locations/global/conversations/{conversation_id}"
)

# Make the request
response = data_chat_client.get_conversation(request=request)

# Handle the response
print(response)

按如下所示替换示例值:

  • YOUR-BILLING-PROJECT:结算项目的 ID。
  • conversation_1:要提取的对话的 ID。

列出对话

以下示例代码演示了如何通过调用 list_conversations 方法来列出给定项目的对话。该请求指定了父级资源网址,即项目和位置(例如 projects/my-project/locations/global)。

默认情况下,此方法会返回您创建的对话。管理员(具有 cloudaicompanion.topicAdmin IAM 角色的用户)可以查看项目中的所有对话。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
request = geminidataanalytics.ListConversationsRequest(
    parent=f"projects/{billing_project}/locations/global",
)

# Make the request
response = data_chat_client.list_conversations(request=request)

# Handle the response
print(response)

YOUR-BILLING-PROJECT 替换为启用了所需 API 的结算项目的 ID。

列出对话中的消息

以下示例代码演示了如何使用 list_messages 方法来提取对话中的所有消息。该请求指定了对话资源路径。

如需列出消息,您必须拥有对话的 cloudaicompanion.topics.get 权限

billing_project = "YOUR-BILLING-PROJECT"
location = "global"

conversation_id = "conversation_1"

request = geminidataanalytics.ListMessagesRequest(
    parent=f"projects/{billing_project}/locations/global/conversations/{conversation_id}",
)

# Make the request
response = data_chat_client.list_messages(request=request)

# Handle the response
print(response)

按如下所示替换示例值:

  • YOUR-BILLING-PROJECT:结算项目的 ID。
  • conversation_1:要列出其消息的对话的 ID。

使用 API 提问

创建数据代理对话后,以下示例 Python 代码会向代理发送查询。该代码使用您为有状态或无状态聊天设置的上下文。API 会返回一个消息流,表示代理回答查询所执行的步骤。

有状态聊天

发送包含 Conversation 引用的有状态聊天请求

您可以通过引用之前创建的 Conversation 资源,向数据代理发送有状态的聊天请求。

# Create a request that contains a single user message (your question)
question = "Which species of tree is most prevalent?"
messages = [geminidataanalytics.Message()]
messages[0].user_message.text = question

data_agent_id = "data_agent_1"
conversation_id = "conversation_1"

# Create a conversation_reference
conversation_reference = geminidataanalytics.ConversationReference()
conversation_reference.conversation = f"projects/{billing_project}/locations/global/conversations/{conversation_id}"
conversation_reference.data_agent_context.data_agent = f"projects/{billing_project}/locations/global/dataAgents/{data_agent_id}"
# conversation_reference.data_agent_context.credentials = credentials

# Form the request
request = geminidataanalytics.ChatRequest(
    parent = f"projects/{billing_project}/locations/global",
    messages = messages,
    conversation_reference = conversation_reference
)

# Make the request
stream = data_chat_client.chat(request=request, timeout=300 #custom timeout up to 600s)

# Handle the response
for response in stream:
    show_message(response)

按如下所示替换示例值:

  • Which species of tree is most prevalent?:要发送给数据智能体的自然语言问题。
  • data_agent_1:数据代理的唯一标识符,如创建数据代理中所定义。
  • conversation_1:对话的唯一标识符,如创建对话中所定义。

无状态聊天

以下代码示例演示了如何在为无状态聊天设置上下文时向数据代理发送查询。您可以通过引用之前定义的 DataAgent 资源或在请求中使用内嵌上下文来发送无状态查询。

发送包含 DataAgent 引用的无状态聊天请求

您可以通过引用之前创建的 DataAgent 资源,向数据代理发送请求。

# Create a request that contains a single user message (your question)
question = "Which species of tree is most prevalent?"
messages = [geminidataanalytics.Message()]
messages[0].user_message.text = question

data_agent_id = "data_agent_1"

data_agent_context = geminidataanalytics.DataAgentContext()
data_agent_context.data_agent = f"projects/{billing_project}/locations/global/dataAgents/{data_agent_id}"
# data_agent_context.credentials = credentials

# Form the request
request = geminidataanalytics.ChatRequest(
    parent=f"projects/{billing_project}/locations/global",
    messages=messages,
    data_agent_context = data_agent_context
)

# Make the request
stream = data_chat_client.chat(request=request, timeout=300 #custom timeout up to 600s)

# Handle the response
for response in stream:
    show_message(response)

按如下所示替换示例值:

  • Which species of tree is most prevalent?:要发送给数据智能体的自然语言问题。
  • data_agent_1:数据代理的唯一标识符,如创建数据代理中所定义。

发送包含内嵌上下文的无状态对话请求

以下示例代码演示了如何使用 inline_context 参数直接在无状态聊天请求中提供上下文。

# Create a request that contains a single user message (your question)
question = "Which species of tree is most prevalent?"
messages = [geminidataanalytics.Message()]
messages[0].user_message.text = question

request = geminidataanalytics.ChatRequest(
    inline_context=inline_context,
    parent=f"projects/{billing_project}/locations/global",
    messages=messages,
)

# Make the request
stream = data_chat_client.chat(request=request, timeout=300 #custom timeout up to 600s)

# Handle the response
for response in stream:
    show_message(response)

在上面的示例中,将 Which species of tree is most prevalent? 替换为要发送给数据代理的自然语言问题。

创建无状态多轮对话

如需在无状态对话中提出后续问题,您的应用必须通过在每次新请求中发送完整的消息历史记录来管理对话的上下文。以下示例展示了如何通过引用数据代理或使用内嵌上下文直接提供数据源来创建多轮对话。

# List that is used to track previous turns and is reused across requests
conversation_messages = []

data_agent_id = "data_agent_1"

# Use data agent context
data_agent_context = geminidataanalytics.DataAgentContext()
data_agent_context.data_agent = f"projects/{billing_project}/locations/global/dataAgents/{data_agent_id}"
# data_agent_context.credentials = credentials

# Helper function for calling the API
def multi_turn_Conversation(msg):

    message = geminidataanalytics.Message()
    message.user_message.text = msg

    # Send a multi-turn request by including previous turns and the new message
    conversation_messages.append(message)

    request = geminidataanalytics.ChatRequest(
        parent=f"projects/{billing_project}/locations/global",
        messages=conversation_messages,
        # Use data agent context
        data_agent_context=data_agent_context,
        # Use inline context
        # inline_context=inline_context,
    )

    # Make the request
    stream = data_chat_client.chat(request=request, timeout=300 #custom timeout up to 600s)

    # Handle the response
    for response in stream:
      show_message(response)
      conversation_messages.append(response)

# Send the first turn request
multi_turn_Conversation("Which species of tree is most prevalent?")

# Send follow-up turn request
multi_turn_Conversation("Can you show me the results as a bar chart?")

在上面的示例中,按如下所示替换示例值:

  • data_agent_1:数据代理的唯一标识符,如创建数据代理部分的示例代码块中所定义。
  • Which species of tree is most prevalent?:要发送给数据智能体的自然语言问题。
  • Can you show me the results as a bar chart?:基于或优化上一个问题的后续问题。

定义辅助函数

以下示例代码包含前面代码示例中使用的辅助函数定义。这些函数有助于解析来自 API 的回答并显示结果。

from pygments import highlight, lexers, formatters
import pandas as pd
import requests
import json as json_lib
import altair as alt
import IPython
from IPython.display import display, HTML

import proto
from google.protobuf.json_format import MessageToDict, MessageToJson

def handle_text_response(resp):
  parts = getattr(resp, 'parts')
  print(''.join(parts))

def display_schema(data):
  fields = getattr(data, 'fields')
  df = pd.DataFrame({
    "Column": map(lambda field: getattr(field, 'name'), fields),
    "Type": map(lambda field: getattr(field, 'type'), fields),
    "Description": map(lambda field: getattr(field, 'description', '-'), fields),
    "Mode": map(lambda field: getattr(field, 'mode'), fields)
  })
  display(df)

def display_section_title(text):
  display(HTML('<h2>{}</h2>'.format(text)))

def format_looker_table_ref(table_ref):
 return 'lookmlModel: {}, explore: {}, lookerInstanceUri: {}'.format(table_ref.lookml_model, table_ref.explore, table_ref.looker_instance_uri)

def format_bq_table_ref(table_ref):
  return '{}.{}.{}'.format(table_ref.project_id, table_ref.dataset_id, table_ref.table_id)

def display_datasource(datasource):
  source_name = ''
  if 'studio_datasource_id' in datasource:
   source_name = getattr(datasource, 'studio_datasource_id')
  elif 'looker_explore_reference' in datasource:
   source_name = format_looker_table_ref(getattr(datasource, 'looker_explore_reference'))
  else:
    source_name = format_bq_table_ref(getattr(datasource, 'bigquery_table_reference'))

  print(source_name)
  display_schema(datasource.schema)

def handle_schema_response(resp):
  if 'query' in resp:
    print(resp.query.question)
  elif 'result' in resp:
    display_section_title('Schema resolved')
    print('Data sources:')
    for datasource in resp.result.datasources:
      display_datasource(datasource)

def handle_data_response(resp):
  if 'query' in resp:
    query = resp.query
    display_section_title('Retrieval query')
    print('Query name: {}'.format(query.name))
    print('Question: {}'.format(query.question))
    print('Data sources:')
    for datasource in query.datasources:
      display_datasource(datasource)
  elif 'generated_sql' in resp:
    display_section_title('SQL generated')
    print(resp.generated_sql)
  elif 'result' in resp:
    display_section_title('Data retrieved')

    fields = [field.name for field in resp.result.schema.fields]
    d = {}
    for el in resp.result.data:
      for field in fields:
        if field in d:
          d[field].append(el[field])
        else:
          d[field] = [el[field]]

    display(pd.DataFrame(d))

def handle_chart_response(resp):
  def _value_to_dict(v):
    if isinstance(v, proto.marshal.collections.maps.MapComposite):
      return _map_to_dict(v)
    elif isinstance(v, proto.marshal.collections.RepeatedComposite):
      return [_value_to_dict(el) for el in v]
    elif isinstance(v, (int, float, str, bool)):
      return v
    else:
      return MessageToDict(v)

  def _map_to_dict(d):
    out = {}
    for k in d:
      if isinstance(d[k], proto.marshal.collections.maps.MapComposite):
        out[k] = _map_to_dict(d[k])
      else:
        out[k] = _value_to_dict(d[k])
    return out

  if 'query' in resp:
    print(resp.query.instructions)
  elif 'result' in resp:
    vegaConfig = resp.result.vega_config
    vegaConfig_dict = _map_to_dict(vegaConfig)
    alt.Chart.from_json(json_lib.dumps(vegaConfig_dict)).display();

def show_message(msg):
  m = msg.system_message
  if 'text' in m:
    handle_text_response(getattr(m, 'text'))
  elif 'schema' in m:
    handle_schema_response(getattr(m, 'schema'))
  elif 'data' in m:
    handle_data_response(getattr(m, 'data'))
  elif 'chart' in m:
    handle_chart_response(getattr(m, 'chart'))
  print('\n')