使用 Python SDK 构建数据智能体

本页面介绍了如何使用 Python SDK 向 Conversational Analytics API 发出请求。示例 Python 代码演示了如何完成以下任务:

进行身份验证并设置环境

如需使用适用于 Conversational Analytics API 的 Python SDK,请按照 Conversational Analytics API SDK Colaboratory 笔记本中的说明下载并安装该 SDK。请注意,SDK Colab 的下载方法及内容可能会发生变化。

完成笔记本中的设置说明部分后,您便可以使用以下代码导入所需的 SDK 库,在 Colaboratory 环境中对您的 Google 账号进行身份验证,并初始化一个用于发出 API 请求的客户端:

from google.colab import auth
auth.authenticate_user()

from google.cloud import geminidataanalytics

data_agent_client = geminidataanalytics.DataAgentServiceClient()
data_chat_client = geminidataanalytics.DataChatServiceClient()

指定结算项目和系统指令

以下示例 Python 代码定义了在整个脚本中使用的结算项目、位置和系统指令:

# Billing project
billing_project = "my_project_name"
location = "global"

# System instructions
system_instruction = "Help the user analyze their data."

按如下所示替换示例值:

  • my_project_name已启用所需 API 的结算项目的 ID。
  • Help the user analyze their data.:用于指导智能体行为并根据您的数据需求自定义智能体的系统指令。例如,您可以使用系统指令来定义业务术语、控制回答长度或设置数据格式。最好使用撰写有效的系统指令部分推荐的 YAML 格式定义系统指令,以提供详尽的结构化指导。

连接到数据源

以下部分介绍了如何为代理的数据源定义连接详细信息。您的代理可以通过以下方式连接到数据:

连接到 Looker 数据

以下代码示例展示了如何使用 API 密钥或访问令牌定义与 Looker 探索的连接的详细信息。 您最多可以同时使用 Conversational Analytics API 连接 5 个 Looker 探索。

连接到 Looker 数据源时,请注意以下事项:

  • 您可以在对话中查询任何包含的探索。
  • 代理一次只能查询一个探索。无法同时跨多个探索执行查询。
  • 在同一对话中,代理可以查询多个探索。
  • 在包含多部分问题的对话中,或者在包含后续问题的对话中,代理可以在一次对话中查询多个探索。

    例如:某用户连接了两个探索,一个名为 cat-explore,另一个名为 dog-explore。用户输入问题“猫的数量多还是狗的数量多?”这会创建两个查询:一个用于统计 cat-explore 中的猫数,另一个用于统计 dog-explore 中的狗数。在完成这两次查询后,代理会比较两次查询中的数字。

以下示例代码定义了与多个 Looker 探索的连接。为了提高代理的性能,您可以选择性地为探索提供黄金查询作为结构化上下文。如需了解详情,请参阅为 Looker 数据源定义数据代理上下文

API 密钥

您可以按照使用 Conversational Analytics API 对数据源进行身份验证并连接到数据源中所述,使用生成的 Looker API 密钥与 Looker 实例建立连接。

looker_client_id = "my_looker_client_id"
looker_client_secret = "my_looker_client_secret"
looker_instance_uri = "https://my_company.looker.com"
lookml_model_1 = "my_model"
explore_1 = "my_explore"
lookml_model_2 = "my_model_2"
explore_2 = "my_explore_2"

looker_explore_reference = geminidataanalytics.LookerExploreReference()
looker_explore_reference.looker_instance_uri = looker_instance_uri
looker_explore_reference.lookml_model = "my_model"
looker_explore_reference.explore = "my_explore"

looker_explore_reference2 = geminidataanalytics.LookerExploreReference()
looker_explore_reference2.looker_instance_uri = looker_instance_uri
looker_explore_reference2.lookml_model = "my_model_2"
looker_explore_reference2.explore = "my_explore_2"

credentials = geminidataanalytics.Credentials()
credentials.oauth.secret.client_id = looker_client_id
credentials.oauth.secret.client_secret = looker_client_secret

datasource_references = geminidataanalytics.DatasourceReferences()
datasource_references.looker.explore_references = [looker_explore_reference, looker_explore_reference2]

# Do not include the following line during agent creation
datasource_references.credentials = credentials

按如下所示替换示例值:

  • my_looker_client_id:您生成的 Looker API 密钥的客户端 ID。
  • my_looker_client_secret:您生成的 Looker API 密钥的客户端密钥。
  • https://my_company.looker.com:Looker 实例的完整网址。
  • my_model:包含您要连接的探索的 LookML 模型的名称。
  • my_explore:您希望数据代理查询的 Looker 探索的名称。
  • my_model_2:包含您要连接的探索的第二个 LookML 模型的名称。您可以针对其他模型重复使用此变量,最多可用于五个探索。
  • my_explore_2:您希望数据代理查询的其他 Looker 探索的名称。您可以重复使用此变量,最多可添加 5 个探索。

访问令牌

您可以按照使用 Conversational Analytics API 对数据源进行身份验证并连接到数据源中所述,使用访问令牌与 Looker 实例建立连接。

looker_access_token = "my_access_token"
looker_instance_uri = "https://my_company.looker.com"
lookml_model = "my_model"
explore = "my_explore"
lookml_model_2 = "my_model_2"
explore_2 = "my_explore_2"

looker_explore_reference = geminidataanalytics.LookerExploreReference()
looker_explore_reference.looker_instance_uri = looker_instance_uri
looker_explore_reference.lookml_model = "my_model"
looker_explore_reference.explore = "my_explore"

looker_explore_reference2 = geminidataanalytics.LookerExploreReference()
looker_explore_reference2.looker_instance_uri = looker_instance_uri
looker_explore_reference2.lookml_model = "my_model_2"
looker_explore_reference2.explore = "my_explore_2"

credentials = geminidataanalytics.Credentials()
credentials.oauth.secret.client_id = looker_client_id
credentials.oauth.secret.client_secret = looker_client_secret

datasource_references = geminidataanalytics.DatasourceReferences()
datasource_references.looker.explore_references = [looker_explore_reference, looker_explore_reference2]

# Do not include the following line during agent creation
datasource_references.credentials = credentials

按如下所示替换示例值:

  • my_access_token:您生成的用于向 Looker 进行身份验证的 access_token 值。
  • https://my_company.looker.com:Looker 实例的完整网址。
  • my_model:包含您要连接的探索的 LookML 模型的名称。
  • my_explore:您希望数据代理查询的 Looker 探索的名称。
  • my_model_2:包含您要连接的探索的第二个 LookML 模型的名称。您可以针对其他模型重复使用此变量,最多可用于五个探索。
  • my_explore_2:您希望数据代理查询的其他 Looker 探索的名称。您可以重复使用此变量,最多可添加 5 个探索。

连接到 BigQuery 数据

借助 Conversational Analytics API,您可以连接的 BigQuery 表的数量没有硬性限制。不过,连接到大量表格可能会降低准确率,或者导致您超出模型的输入令牌限制。

以下示例代码定义了与多个 BigQuery 表的连接,并包含可选的结构化上下文字段的示例。为了提高代理的性能,您可以选择性地为 BigQuery 表提供结构化上下文,例如表和列的说明、同义词、标记和示例查询。如需了解详情,请参阅为 BigQuery 数据源定义数据代理上下文

bigquery_table_reference_1 = geminidataanalytics.BigQueryTableReference()
bigquery_table_reference_1.project_id = "my_project_id_1"
bigquery_table_reference_1.dataset_id = "my_dataset_id_1"
bigquery_table_reference_1.table_id = "my_table_id_1"
bigquery_table_reference_1.schema = geminidataanalytics.Schema()
bigquery_table_reference_1.schema.description = "my_table_description"
bigquery_table_reference_1.schema.fields = [
  geminidataanalytics.Field(
    name="my_column_name",
    description="my_column_description"
  )
]

bigquery_table_reference_2 = geminidataanalytics.BigQueryTableReference()
bigquery_table_reference_2.project_id = "my_project_id_2"
bigquery_table_reference_2.dataset_id = "my_dataset_id_2"
bigquery_table_reference_2.table_id = "my_table_id_2"

bigquery_table_reference_3 = geminidataanalytics.BigQueryTableReference()
bigquery_table_reference_3.project_id = "my_project_id_3"
bigquery_table_reference_3.dataset_id = "my_dataset_id_3"
bigquery_table_reference_3.table_id = "my_table_id_3"

# Connect to your data source
datasource_references = geminidataanalytics.DatasourceReferences()
datasource_references.bq.table_references = [bigquery_table_reference_1, bigquery_table_reference_2, bigquery_table_reference_3]

按如下所示替换示例值:

  • my_project_id_1:包含您要连接到的 BigQuery 数据集和表的 Google Cloud 项目的 ID。如需连接到公共数据集,请指定 bigquery-public-data
  • my_dataset_id_1:BigQuery 数据集的 ID。 例如 san_francisco
  • my_table_id_1:BigQuery 表的 ID。例如 street_trees
  • my_table_description:对表内容和用途的可选说明。
  • my_column_name:表中要为其提供可选说明的列的名称。
  • my_column_description:对列内容和用途的可选说明。

连接到数据洞察数据

以下示例代码定义了与数据洞察数据源的连接。

studio_datasource_id = "my_datasource_id"

studio_references = geminidataanalytics.StudioDatasourceReference()
studio_references.datasource_id = studio_datasource_id

## Connect to your data source
datasource_references.studio.studio_references = [studio_references]

在上面的示例中,将 my_datasource_id 替换为数据源 ID。

连接到 AlloyDB 数据

以下示例代码使用 Python SDK 定义了与 AlloyDB 数据库的连接。

# Define the AlloyDB database reference
alloydb_database_reference = geminidataanalytics.AlloyDbDatabaseReference()
alloydb_database_reference.project_id = "PROJECT_ID"
alloydb_database_reference.region = "REGION"
alloydb_database_reference.cluster_id = "CLUSTER_ID"
alloydb_database_reference.instance_id = "INSTANCE_ID"
alloydb_database_reference.database_id = "DATABASE"
alloydb_database_reference.table_ids = ["TABLE_1_ID", "TABLE_2_ID"]

# Optional: Include this if you have created advanced context for the agent
agent_context_reference = geminidataanalytics.AgentContextReference()
agent_context_reference.context_set_id = f"projects/{billing_project}/locations/{location}/contextSets/your_context_set_id"

# Add the AlloyDB reference to the DatasourceReferences object
datasource_references = geminidataanalytics.DatasourceReferences()
datasource_references.alloydb.database_reference = alloydb_database_reference
# Optional:
# datasource_references.alloydb.agent_context_reference = agent_context_reference

按如下所示替换示例值:

  • PROJECT_ID:包含 AlloyDB 集群的 Google Cloud 项目的 ID。
  • REGION:AlloyDB 集群所在的区域,例如 us-central1
  • CLUSTER_ID:AlloyDB 集群的 ID。
  • INSTANCE_ID:AlloyDB 实例的 ID。
  • DATABASE:目标数据库的名称,例如 financial
  • TABLE_1_IDTABLE_2_ID:数据代理建议使用的数据库中的表列表,例如 "loan""client""disp"
  • your_context_set_id:如果您为代理创建了高级上下文,则为上下文集的 ID。

连接到 Cloud SQL for MySQL 数据

如需连接到 Cloud SQL for MySQL 实例,您可以使用 geminidataanalytics SDK 中的 CloudSqlReference 类。这需要指定项目、实例、数据库,以及(可选)您希望代理考虑的特定表。

在连接到 Cloud SQL for MySQL 数据之前,请确保您已使用 Identity and Access Management (IAM) 向 Cloud SQL for MySQL 进行身份验证。如需了解详情,请参阅向 Cloud SQL for MySQL 和 Cloud SQL for PostgreSQL 进行身份验证

以下示例使用 Python SDK 定义了与 Cloud SQL for MySQL 数据库的连接。

# Import the necessary library
from google.cloud import geminidataanalytics

# Define the Cloud SQL database reference
cloud_sql_database_reference = geminidataanalytics.CloudSqlDatabaseReference(
    project_id="PROJECT_ID",
    instance_id="INSTANCE_ID",
    database_id="DATABASE_ID",
    # Optional: Specify table IDs to guide the agent
    table_ids=["TABLE_1_ID", "TABLE_2_ID"]
)

# Create a CloudSqlReference object
cloud_sql_reference = geminidataanalytics.CloudSqlReference(
    database_reference=cloud_sql_database_reference
    # Optional: Include this if you have created advanced context for the agent
    # agent_context_reference=geminidataanalytics.AgentContextReference(
    # context_set_id=f"projects/{billing_project}/locations/{location}/contextSets/your_context_set_id"
    # )
)

# Add the Cloud SQL reference to the DatasourceReferences object
datasource_references = geminidataanalytics.DatasourceReferences()
datasource_references.cloud_sql = cloud_sql_reference

按如下所示替换示例值:

  • PROJECT_ID:包含 Cloud SQL for MySQL 实例的 Google Cloud 项目的 ID。
  • INSTANCE_ID:Cloud SQL for MySQL 实例的 ID。
  • DATABASE_ID:实例中目标数据库的名称,例如 financial
  • TABLE_1_IDTABLE_2_ID:(可选)数据代理建议使用的数据库中的特定表名称列表,例如 "orders""customers"
  • your_context_set_id:如果您为代理创建了高级上下文,则为上下文集的 ID。

您可以在创建数据代理或与数据代理互动时使用 datasource_references 对象。

连接到 Cloud SQL for PostgreSQL 数据

如需连接到 Cloud SQL for PostgreSQL 实例,您可以使用 geminidataanalytics SDK 中的 CloudSqlReference 类。这需要指定项目、实例、数据库,以及(可选)您希望代理考虑的特定表。

在连接到 Cloud SQL for PostgreSQL 数据之前,请确保您已使用 IAM 向 Cloud SQL for PostgreSQL 进行身份验证。如需了解详情,请参阅向 Cloud SQL for MySQL 和 Cloud SQL for PostgreSQL 进行身份验证

以下示例使用 Python SDK 定义了与 Cloud SQL for PostgreSQL 数据库的连接。

# Import the necessary library
from google.cloud import geminidataanalytics

# Define the Cloud SQL database reference
cloud_sql_database_reference = geminidataanalytics.CloudSqlDatabaseReference(
    project_id="PROJECT_ID",
    instance_id="INSTANCE_ID",
    database_id="DATABASE_ID",
    # Optional: Specify table IDs to guide the agent
    table_ids=["TABLE_1_ID", "TABLE_2_ID"]
)

# Create a CloudSqlReference object
cloud_sql_reference = geminidataanalytics.CloudSqlReference(
    database_reference=cloud_sql_database_reference
    # Optional: Include this if you have created advanced context for the agent
    # agent_context_reference=geminidataanalytics.AgentContextReference(
    # context_set_id=f"projects/{billing_project}/locations/{location}/contextSets/your_context_set_id"
    # )
)

# Add the Cloud SQL reference to the DatasourceReferences object
datasource_references = geminidataanalytics.DatasourceReferences()
datasource_references.cloud_sql = cloud_sql_reference

按如下所示替换示例值:

  • PROJECT_ID:包含 Cloud SQL for PostgreSQL 实例的 Google Cloud 项目的 ID。
  • INSTANCE_ID:Cloud SQL for PostgreSQL 实例的 ID。
  • DATABASE_ID:实例中目标数据库的名称,例如 postgres
  • TABLE_1_IDTABLE_2_ID:(可选)建议数据代理使用的数据库中的特定表名称列表,例如 "users""products"
  • your_context_set_id:如果您为代理创建了高级上下文,则为上下文集的 ID。

您可以在创建数据代理或与数据代理互动时使用 datasource_references 对象。

连接到 Spanner 数据

如需连接到 Spanner 实例,您可以使用 geminidataanalytics SDK 中的 SpannerReference 类。这需要指定项目、实例、数据库,以及(可选)您希望代理考虑的特定表。

在连接到 Spanner 数据之前,请确保用户或服务账号具有 spanner.databaseReader IAM 角色。如需了解详情,请参阅应用 IAM 角色

以下 Python 代码示例使用 Python SDK 定义了与 Spanner 数据库的连接。

# Import the necessary library
from google.cloud import geminidataanalytics

# Define the Spanner database reference
spanner_database_reference = geminidataanalytics.SpannerDatabaseReference(
    project_id="PROJECT_ID",
    instance_id="INSTANCE_ID",
    database_id="DATABASE_ID",
    # Optional: Specify table IDs to guide the agent
    table_ids=["TABLE_1_ID", "TABLE_2_ID"]
)

# Create a SpannerReference object
spanner_reference = geminidataanalytics.SpannerReference(
    database_reference=spanner_database_reference
    # Optional: Include this if you have created advanced context for the agent
    # agent_context_reference=geminidataanalytics.AgentContextReference(
    # context_set_id=f"projects/{billing_project}/locations/{location}/contextSets/your_context_set_id"
    # )
)

# Add the Spanner reference to the DatasourceReferences object
datasource_references = geminidataanalytics.DatasourceReferences()
datasource_references.spanner = spanner_reference

按如下所示替换示例值:

  • PROJECT_ID:包含 Spanner 实例的 Google Cloud 项目的 ID。
  • INSTANCE_ID:Spanner 实例的 ID。
  • DATABASE_ID:实例中目标数据库的名称。
  • TABLE_1_IDTABLE_2_ID:(可选)建议数据代理使用的数据库中的特定表名称列表,例如 "Singers""Albums"
  • your_context_set_id:如果您为代理创建了高级上下文,则为上下文集的 ID。

您可以在创建数据代理或与数据代理互动时使用 datasource_references 对象。

为有状态或无状态聊天设置上下文

Conversational Analytics API 支持多轮对话,让用户可以提出基于之前的上下文的后续问题。以下示例 Python 代码演示了如何为有状态或无状态聊天设置上下文:

  • 有状态聊天: Google Cloud 会存储和管理对话历史记录。有状态聊天本身就是多轮聊天,因为 API 会保留之前消息中的上下文。您只需在每轮中发送当前消息。
  • 无状态聊天:您的应用会管理对话历史记录。您必须在每条新消息中包含完整的对话历史记录。如需查看有关如何在无状态模式下管理多轮对话的详细示例,请参阅创建无状态多轮对话

有状态聊天

以下代码示例为有状态聊天设置了上下文,其中 Google Cloud 会存储和管理对话历史记录。您还可以选择在以下示例代码中添加 published_context.options.analysis.python.enabled = True 行,以启用使用 Python 进行高级分析。

# Set up context for stateful chat
published_context = geminidataanalytics.Context()
published_context.system_instruction = system_instruction
published_context.datasource_references = datasource_references
# Optional: To enable advanced analysis with Python, include the following line:
published_context.options.analysis.python.enabled = True

无状态聊天

以下示例代码为无状态聊天设置了上下文,其中您必须在每条消息中发送完整的对话历史记录。您还可以选择在以下示例代码中添加 inline_context.options.analysis.python.enabled = True 行,以启用使用 Python 进行高级分析。

# Set up context for stateless chat
# datasource_references.looker.credentials = credentials
inline_context = geminidataanalytics.Context()
inline_context.system_instruction = system_instruction
inline_context.datasource_references = datasource_references
# Optional: To enable advanced analysis with Python, include the following line:
inline_context.options.analysis.python.enabled = True

创建数据代理

以下示例 Python 代码会发出 API 请求来创建数据代理,然后您可以使用该代理针对数据进行对话。您可以同步或异步创建代理。该数据代理配置为使用指定的数据源、系统指令和上下文。

您可以选择在创建时提供 kms_key,以使用 CMEK 保护数据代理。CMEK 仅适用于 Looker 数据源。如需了解详情,请参阅客户管理的加密密钥 (CMEK)

同步

  data_agent_id = "data_agent_1"

  # Optional: If using CMEK, replace the empty strings with your KMS key details.
  key_ring = ""    # KEY_RING_NAME
  key_name = ""    # KEY_NAME
  key_project = "" # KMS_PROJECT_ID (Defaults to billing_project if empty)

  data_agent = geminidataanalytics.DataAgent()
  data_agent.data_analytics_agent.published_context = published_context
  data_agent.name = f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}" # Optional

  # Optional: Add CMEK key if details are provided
  if key_ring and key_name:
    if not key_project:
      key_project = billing_project
    kms_key_data_agent = f"projects/{key_project}/locations/{location}/keyRings/{key_ring}/cryptoKeys/{key_name}"
    data_agent.kms_key = kms_key_data_agent # Add for CMEK

  request = geminidataanalytics.CreateDataAgentRequest(
    parent=f"projects/{billing_project}/locations/{location}",
    data_agent_id=data_agent_id, # Optional
    data_agent=data_agent,
  )

  try:
    response = data_agent_client.create_data_agent_sync(request=request)
    print("Data Agent created")
    print(response)
  except Exception as e:
    print(f"Error creating Data Agent: {e}")

异步

  data_agent_id = "data_agent_1"

  # Optional: If using CMEK, replace the empty strings with your KMS key details.
  key_ring = ""    # KEY_RING_NAME
  key_name = ""    # KEY_NAME
  key_project = "" # KMS_PROJECT_ID (Defaults to billing_project if empty)

  data_agent = geminidataanalytics.DataAgent()
  data_agent.data_analytics_agent.published_context = published_context
  data_agent.name = f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}" # Optional

  # Optional: Add CMEK key if details are provided
  if key_ring and key_name:
    if not key_project:
      key_project = billing_project
    kms_key_data_agent = f"projects/{key_project}/locations/{location}/keyRings/{key_ring}/cryptoKeys/{key_name}"
    data_agent.kms_key = kms_key_data_agent # Add for CMEK

  request = geminidataanalytics.CreateDataAgentRequest(
    parent=f"projects/{billing_project}/locations/{location}",
    data_agent_id=data_agent_id, # Optional
    data_agent=data_agent,
  )

  try:
    data_agent_client.create_data_agent(request=request)
    print("Data Agent created")
  except Exception as e:
    print(f"Error creating Data Agent: {e}")

在上面的示例中,按如下所示替换值:

  • data_agent_1:数据智能体的唯一标识符。
  • KEY_RING_NAME:如果使用 CMEK,则为 Cloud KMS 密钥环的名称。
  • KEY_NAME:如果使用 CMEK,则为 Cloud KMS 密钥的名称。
  • KMS_PROJECT_ID:如果使用 CMEK,则为密钥托管所在的项目 ID。

创建对话

以下示例 Python 代码会发出 API 请求来创建对话。

您可以选择在创建时提供 kms_key,以使用 CMEK 保护对话。CMEK 仅适用于 Looker 数据源。如需了解详情,请参阅客户管理的加密密钥 (CMEK)

# Initialize request arguments
data_agent_id = "data_agent_1"
conversation_id = "conversation_1"


# Optional: If using CMEK, replace the empty strings with your KMS key details.
key_ring = ""    # KEY_RING_NAME
key_name = ""    # KEY_NAME
key_project = "" # KMS_PROJECT_ID (Defaults to billing_project if empty)

if key_project == "":
  key_project = billing_project
kms_key_conversation = f"projects/{key_project}/locations/{location}/keyRings/{key_ring}/cryptoKeys/{key_name}"

conversation = geminidataanalytics.Conversation()
conversation.agents = [f'projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}']
conversation.name = f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}"

# Optional: Add CMEK key if details are provided
if key_ring and key_name:
  if not key_project:
    key_project = billing_project
  kms_key_conversation = f"projects/{key_project}/locations/{location}/keyRings/{key_ring}/cryptoKeys/{key_name}"
  conversation.kms_key = kms_key_conversation # Add for CMEK

request = geminidataanalytics.CreateConversationRequest(
  parent=f"projects/{billing_project}/locations/{location}",
  conversation_id=conversation_id,
  conversation=conversation,
)

# Make the request
response = data_chat_client.create_conversation(request=request)

# Handle the response
print(response)

按如下所示替换示例值:

  • data_agent_1:数据代理的 ID,如创建数据代理部分的示例代码块中所定义。
  • conversation_1:对话的唯一标识符。
  • KEY_RING_NAME:如果使用 CMEK,则为 Cloud KMS 密钥环的名称。
  • KEY_NAME:如果使用 CMEK,则为 Cloud KMS 密钥的名称。
  • KMS_PROJECT_ID:如果使用 CMEK,则为密钥托管所在的项目 ID。

管理数据智能体及对话

以下代码示例展示了如何使用 Conversational Analytics API 管理数据智能体及对话。您可以执行以下任务:

获取数据智能体

以下示例 Python 代码演示了如何发出 API 请求来检索您之前创建的数据智能体。

# Initialize request arguments
data_agent_id = "data_agent_1"
request = geminidataanalytics.GetDataAgentRequest(
  name=f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
)

# Make the request
response = data_agent_client.get_data_agent(request=request)

# Handle the response
print(response)

在上面示例中,将值 data_agent_1 替换为您要检索的数据代理的唯一标识符。

列出数据智能体

以下代码演示了如何通过调用 list_data_agents 方法来列出给定项目的所有数据智能体。如需列出所有智能体,您必须拥有项目的 geminidataanalytics.dataAgents.list 权限。如需详细了解有哪些 IAM 角色包含此权限,请参阅预定义角色列表。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
request = geminidataanalytics.ListDataAgentsRequest(
  parent=f"projects/{billing_project}/locations/{location}",
)

# Make the request
page_result = data_agent_client.list_data_agents(request=request)

# Handle the response
for response in page_result:
  print(response)

YOUR-BILLING-PROJECT 替换为您的结算项目的 ID。

列出可访问的数据代理

以下代码演示了如何通过调用 list_accessible_data_agents 方法来列出给定项目的所有可访问的数据代理。

billing_project = "YOUR-BILLING-PROJECT"
creator_filter = "YOUR-CREATOR-FILTER"
location = "global"
request = geminidataanalytics.ListAccessibleDataAgentsRequest(
  parent=f"projects/{billing_project}/locations/{location}",
  creator_filter=creator_filter
)

# Make the request
page_result = data_agent_client.list_accessible_data_agents(request=request)

# Handle the response
for response in page_result:
  print(response)

按如下所示替换示例值:

  • YOUR-BILLING-PROJECT:结算项目的 ID。
  • YOUR-CREATOR-FILTER:要根据数据代理的创建者应用的过滤条件。可能的值包括 NONE(默认)、CREATOR_ONLYNOT_CREATOR_ONLY

更新数据智能体

以下示例代码演示了如何通过对数据智能体资源调用 update_data_agent_syncupdate_data_agent 方法来同步或异步更新数据智能体。该请求需要一个 DataAgent 对象,其中包含您要更改的字段的新值;还需要一个 update_mask 参数,该参数接收一个 FieldMask 对象作为输入来指定要更新的字段。

如需更新数据智能体,您必须拥有该智能体的 geminidataanalytics.dataAgents.update IAM 权限。如需详细了解有哪些 IAM 角色包含此权限,请参阅预定义角色列表。

同步

data_agent_id = "data_agent_1"
billing_project = "YOUR-BILLING-PROJECT"
data_agent = geminidataanalytics.DataAgent()
data_agent.data_analytics_agent.published_context = published_context
data_agent.name = f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
data_agent.description = "Updated description of the data agent."

update_mask = field_mask_pb2.FieldMask(paths=['description', 'data_analytics_agent.published_context'])

request = geminidataanalytics.UpdateDataAgentRequest(
  data_agent=data_agent,
  update_mask=update_mask,
)

try:
  # Make the request
  response = data_agent_client.update_data_agent_sync(request=request)
  print("Data Agent Updated")
  print(response)
except Exception as e:
  print(f"Error updating Data Agent: {e}")

异步

data_agent_id = "data_agent_1"
billing_project = "YOUR-BILLING-PROJECT"
data_agent = geminidataanalytics.DataAgent()
data_agent.data_analytics_agent.published_context = published_context
data_agent.name = f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
data_agent.description = "Updated description of the data agent."

update_mask = field_mask_pb2.FieldMask(paths=['description', 'data_analytics_agent.published_context'])

request = geminidataanalytics.UpdateDataAgentRequest(
  data_agent=data_agent,
  update_mask=update_mask,
)

try:
  # Make the request
  data_agent_client.update_data_agent(request=request)
  print("Data Agent Updated")
except Exception as e:
  print(f"Error updating Data Agent: {e}")

按如下所示替换示例值:

  • data_agent_1:要更新的数据智能体的 ID。
  • YOUR-BILLING-PROJECT:结算项目的 ID。
  • Updated description of the data agent.:更新后的数据智能体的说明。

获取数据智能体的 IAM 政策

以下示例代码演示了如何使用 get_iam_policy 方法来获取数据智能体的 IAM 政策。该请求指定了数据智能体资源路径。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
data_agent_id = "data_agent_1"

resource = f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
request = iam_policy_pb2.GetIamPolicyRequest(
      resource=resource,
    )
try:
  response = data_agent_client.get_iam_policy(request=request)
  print("IAM Policy fetched successfully!")
  print(f"Response: {response}")
except Exception as e:
  print(f"Error setting IAM policy: {e}")

按如下所示替换示例值:

  • YOUR-BILLING-PROJECT:结算项目的 ID。
  • data_agent_1:要获取其 IAM 政策的数据智能体的 ID。

为数据智能体设置 IAM 政策

如需共享智能体,您可以使用 set_iam_policy 方法向用户分配特定智能体的 IAM 角色。该请求包含用于指定应向哪些用户分配哪些角色的绑定。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
data_agent_id = "data_agent_1"
role = "roles/geminidataanalytics.dataAgentEditor"
users = "EMAIL_ADDRESS"

resource = f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"

# Construct the IAM policy
binding = policy_pb2.Binding(
  role=role,
  members= [f"user:{i.strip()}" for i in users.split(",")]
)

policy = policy_pb2.Policy(bindings=[binding])

# Create the request
request = iam_policy_pb2.SetIamPolicyRequest(
  resource=resource,
  policy=policy
)

# Send the request
try:
  response = data_agent_client.set_iam_policy(request=request)
  print("IAM Policy set successfully!")
  print(f"Response: {response}")
except Exception as e:
  print(f"Error setting IAM policy: {e}")

按如下所示替换示例值:

  • YOUR-BILLING-PROJECT:结算项目的 ID。
  • data_agent_1:要为其设置 IAM 政策的数据智能体的 ID。
  • EMAIL_ADDRESSES:要向其授予指定角色的用户的邮箱列表(以英文逗号分隔)。

删除数据智能体

以下示例代码演示了如何使用 delete_data_agent_syncdelete_data_agent 方法同步或异步软删除数据智能体。软删除智能体时,虽然该智能体会被删除,但在 30 天内仍可检索到。该请求指定了数据智能体资源网址。

同步

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
data_agent_id = "data_agent_1"

request = geminidataanalytics.DeleteDataAgentRequest(
  name=f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
)

try:
  # Make the request
  data_agent_client.delete_data_agent_sync(request=request)
  print("Data Agent Deleted")
except Exception as e:
  print(f"Error deleting Data Agent: {e}")

异步

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
data_agent_id = "data_agent_1"

request = geminidataanalytics.DeleteDataAgentRequest(
  name=f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
)

try:
  # Make the request
  data_agent_client.delete_data_agent(request=request)
  print("Data Agent Deleted")
except Exception as e:
  print(f"Error deleting Data Agent: {e}")

按如下所示替换示例值:

  • YOUR-BILLING-PROJECT:结算项目的 ID。
  • data_agent_1:要删除的数据智能体的 ID。

获取对话

以下示例代码演示了如何使用 get_conversation 方法来获取有关现有对话的信息。该请求指定了对话资源路径。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
conversation_id = "conversation_1"

request = geminidataanalytics.GetConversationRequest(
  name = f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}"
)

# Make the request
response = data_chat_client.get_conversation(request=request)

# Handle the response
print(response)

按如下所示替换示例值:

  • YOUR-BILLING-PROJECT:结算项目的 ID。
  • conversation_1:要提取的对话的 ID。

列出对话

以下示例代码演示了如何通过调用 list_conversations 方法来列出给定项目的对话。该请求指定了父级资源网址,即项目和位置(例如 projects/my-project/locations/global)。

默认情况下,此方法会返回您创建的对话。管理员(具有 cloudaicompanion.topicAdmin IAM 角色的用户)可以查看项目中的所有对话。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
request = geminidataanalytics.ListConversationsRequest(
  parent=f"projects/{billing_project}/locations/{location}",
)

# Make the request
response = data_chat_client.list_conversations(request=request)

# Handle the response
print(response)

YOUR-BILLING-PROJECT 替换为启用了所需 API 的结算项目的 ID。

列出对话中的消息

以下示例代码演示了如何使用 list_messages 方法来提取对话中的所有消息。该请求指定了对话资源路径。

如需列出消息,您必须拥有对话的 cloudaicompanion.topics.get 权限

billing_project = "YOUR-BILLING-PROJECT"
location = "global"

conversation_id = "conversation_1"

request = geminidataanalytics.ListMessagesRequest(
  parent=f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}",
)

# Make the request
response = data_chat_client.list_messages(request=request)

# Handle the response
print(response)

按如下所示替换示例值:

  • YOUR-BILLING-PROJECT:结算项目的 ID。
  • conversation_1:要列出其消息的对话的 ID。

删除对话

以下示例代码演示了如何删除一个对话。管理员(具有 cloudaicompanion.topicAdmin Identity and Access Management 角色的用户)或具有 cloudaicompanion.topics.delete Identity and Access Management 权限的用户可以删除项目中的对话。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
conversation_id = "conversation_1"

request = geminidataanalytics.DeleteConversationRequest(
  name = f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}"
)

# Make the request
response = data_chat_client.delete_conversation(request=request)

# Handle the response
print(response)

按如下所示替换示例值:

  • YOUR-BILLING-PROJECT:结算项目的 ID。
  • conversation_1:要列出其消息的对话的 ID。

使用 API 提问

创建数据代理并(对于有状态聊天)创建对话后,您可以向代理发送查询。以下代码示例展示了如何调用 data_chat_client.chat 方法。这些示例使用了您在为有状态或无状态聊天设置上下文中配置的上下文变量(例如数据源和系统指令)。

当您发送查询时,API 会返回一个 Message 对象流。此流可以包含不同类型的消息,包括文本、数据表和图表。文本消息可以提供有关代理推理的深入分析、报告其进度或给出最终答案。每条文本消息的用途都由其 TextType 值指示:

  • THOUGHT:显示智能体在规划如何回答您的查询时的内部思考过程。THOUGHT 消息可让您逐步了解智能体的推理和决策流程,包含两部分:parts[0] 是思考摘要,用于简要总结完整的思考文本;parts[1] 是完整的思考文本。
  • PROGRESS:报告智能体在执行操作(例如检索数据或调用工具)时的进度。此值仅针对 Looker 数据源返回,包含两部分:parts[0] 是摘要,parts[1] 是完整进度文本。
  • FINAL_RESPONSE:提供查询的最终答案。

以下代码示例使用 定义辅助函数中定义的 show_message 辅助函数来处理和显示流中的每条消息。如需了解在使用 Looker 数据源时如何在界面中呈现这些消息,请参阅呈现 Looker 数据源的代理回答

有状态聊天

发送包含 Conversation 引用的有状态聊天请求

您可以通过引用之前创建的 Conversation 资源,向数据代理发送有状态的聊天请求。

# Create a request that contains a single user message (your question)
question = "Which species of tree is most prevalent?"
messages = [geminidataanalytics.Message()]
messages[0].user_message.text = question

data_agent_id = "data_agent_1"
conversation_id = "conversation_1"

# Create a conversation_reference
conversation_reference = geminidataanalytics.ConversationReference()
conversation_reference.conversation = f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}"
conversation_reference.data_agent_context.data_agent = f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
# conversation_reference.data_agent_context.credentials = credentials

# Form the request
request = geminidataanalytics.ChatRequest(
    parent = f"projects/{billing_project}/locations/{location}",
    messages = messages,
    conversation_reference = conversation_reference
)

# Make the request
stream = data_chat_client.chat(request=request, timeout=300 #custom timeout up to 600s)

# Handle the response
for response in stream:
    show_message(response)

按如下所示替换示例值:

  • Which species of tree is most prevalent?:要发送给数据智能体的自然语言问题。
  • data_agent_1:数据代理的唯一标识符,如创建数据代理中所定义。
  • conversation_1:对话的唯一标识符,如创建对话中所定义。

无状态聊天

以下代码示例演示了如何在为无状态聊天设置上下文时向数据代理发送查询。您可以通过引用之前定义的 DataAgent 资源或在请求中使用内嵌上下文来发送无状态查询。

发送包含 DataAgent 引用的无状态聊天请求

您可以通过引用之前创建的 DataAgent 资源,向数据代理发送请求。

# Create a request that contains a single user message (your question)
question = "Which species of tree is most prevalent?"
messages = [geminidataanalytics.Message()]
messages[0].user_message.text = question

data_agent_id = "data_agent_1"

data_agent_context = geminidataanalytics.DataAgentContext()
data_agent_context.data_agent = f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
# data_agent_context.credentials = credentials

# Form the request
request = geminidataanalytics.ChatRequest(
    parent=f"projects/{billing_project}/locations/{location}",
    messages=messages,
    data_agent_context = data_agent_context
)

# Make the request
stream = data_chat_client.chat(request=request, timeout=300 #custom timeout up to 600s)

# Handle the response
for response in stream:
    show_message(response)

按如下所示替换示例值:

  • Which species of tree is most prevalent?:要发送给数据智能体的自然语言问题。
  • data_agent_1:数据代理的唯一标识符,如创建数据代理中所定义。

发送包含内嵌上下文的无状态对话请求

以下示例代码演示了如何使用 inline_context 参数直接在无状态聊天请求中提供上下文。

# Create a request that contains a single user message (your question)
question = "Which species of tree is most prevalent?"
messages = [geminidataanalytics.Message()]
messages[0].user_message.text = question

request = geminidataanalytics.ChatRequest(
    inline_context=inline_context,
    parent=f"projects/{billing_project}/locations/{location}",
    messages=messages,
)

# Make the request
stream = data_chat_client.chat(request=request, timeout=300 #custom timeout up to 600s)

# Handle the response
for response in stream:
    show_message(response)

在上面的示例中,将 Which species of tree is most prevalent? 替换为要发送给数据代理的自然语言问题。

创建无状态多轮对话

如需在无状态对话中提出后续问题,您的应用必须通过在每次新请求中发送完整的消息历史记录来管理对话的上下文。以下示例展示了如何通过引用数据代理或使用内嵌上下文直接提供数据源来创建多轮对话。

# List that is used to track previous turns and is reused across requests
conversation_messages = []

data_agent_id = "data_agent_1"

# Use data agent context
data_agent_context = geminidataanalytics.DataAgentContext()
data_agent_context.data_agent = f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
# data_agent_context.credentials = credentials

# Helper function for calling the API
def multi_turn_Conversation(msg):

  message = geminidataanalytics.Message()
  message.user_message.text = msg

  # Send a multi-turn request by including previous turns and the new message
  conversation_messages.append(message)

  request = geminidataanalytics.ChatRequest(
    parent=f"projects/{billing_project}/locations/{location}",
    messages=conversation_messages,
    # Use data agent context
    data_agent_context=data_agent_context,
    # Use inline context
    # inline_context=inline_context,
  )

  # Make the request
  stream = data_chat_client.chat(request=request, timeout=300 #custom timeout up to 600s)

  # Handle the response
  for response in stream:
    show_message(response)
    conversation_messages.append(response)

# Send the first turn request
multi_turn_Conversation("Which species of tree is most prevalent?")

# Send follow-up turn request
multi_turn_Conversation("Can you show me the results as a bar chart?")

在上面的示例中,按如下所示替换示例值:

  • data_agent_1:数据代理的唯一标识符,如创建数据代理部分的示例代码块中所定义。
  • Which species of tree is most prevalent?:要发送给数据智能体的自然语言问题。
  • Can you show me the results as a bar chart?:基于或优化上一个问题的后续问题。

定义辅助函数

以下示例代码包含前面代码示例中使用的辅助函数定义。这些函数有助于解析来自 API 的回答并显示结果。

import json as json_lib
import textwrap
import time

import altair as alt
import IPython
import pandas as pd
import proto
import requests

from google.protobuf.json_format import MessageToDict, MessageToJson
from IPython.display import display, HTML
from pygments import highlight, lexers, formatters
from google.protobuf import field_mask_pb2
from google.iam.v1 import policy_pb2
from google.iam.v1 import iam_policy_pb2

def handle_text_response(resp):
  parts = resp.parts
  full_text = "".join(parts)
  if "\n" not in full_text and len(full_text) > 80:
    wrapped_text = textwrap.fill(full_text, width=80)
    print(wrapped_text)
  else:
    print(full_text)

def display_schema(data):
  fields = getattr(data, "fields")
  df = pd.DataFrame({
    "Column": map(lambda field: getattr(field, 'name'), fields),
    "Type": map(lambda field: getattr(field, 'type'), fields),
    "Description": map(lambda field: getattr(field, 'description', '-'), fields),
    "Mode": map(lambda field: getattr(field, 'mode'), fields)
  })
  display(df)

def display_section_title(text):
  display(HTML('<h2>{}</h2>'.format(text)))

def format_looker_table_ref(table_ref):
  return 'lookmlModel: {}, explore: {}, lookerInstanceUri: {}'.format(table_ref.lookml_model, table_ref.explore, table_ref.looker_instance_uri)

def format_bq_table_ref(table_ref):
  return '{}.{}.{}'.format(table_ref.project_id, table_ref.dataset_id, table_ref.table_id)

def display_datasource(datasource):
  source_name = ''
  if 'studio_datasource_id' in datasource:
    source_name = getattr(datasource, 'studio_datasource_id')
  elif 'looker_explore_reference' in datasource:
    source_name = format_looker_table_ref(getattr(datasource, 'looker_explore_reference'))
  else:
    source_name = format_bq_table_ref(getattr(datasource, 'bigquery_table_reference'))

  print(source_name)
  display_schema(datasource.schema)

def handle_schema_response(resp):
  if 'query' in resp:
    print(resp.query.question)
  elif 'result' in resp:
    display_section_title('Schema resolved')
    print('Data sources:')
    for datasource in resp.result.datasources:
      display_datasource(datasource)

def handle_data_response(resp):
  if "query" in resp:
    query = resp.query
    display_section_title("Retrieval query")
    print(f"Query name: {query.name}")
    if "question" in query:
      print(f"Question: {query.question}")
    if "datasources" in query:
      print("Data sources:")
      for datasource in query.datasources:
        display_datasource(datasource)
  elif "generated_sql" in resp:
    display_section_title("SQL generated")
    print(resp.generated_sql)
  elif "result" in resp:
    display_section_title("Data retrieved")

    fields = [field.name for field in resp.result.schema.fields]
    d = {}
    for el in resp.result.data:
      for field in fields:
        if field in d:
          d[field].append(el[field])
        else:
          d[field] = [el[field]]

    display(pd.DataFrame(d))

def handle_chart_response(resp):
  def _value_to_dict(v):
    if isinstance(v, proto.marshal.collections.maps.MapComposite):
      return _map_to_dict(v)
    elif isinstance(v, proto.marshal.collections.RepeatedComposite):
      return [_value_to_dict(el) for el in v]
    elif isinstance(v, (int, float, str, bool)):
      return v
    else:
      return MessageToDict(v)

  def _map_to_dict(d):
    out = {}
    for k in d:
      if isinstance(d[k], proto.marshal.collections.maps.MapComposite):
        out[k] = _map_to_dict(d[k])
      else:
        out[k] = _value_to_dict(d[k])
    return out

  if 'query' in resp:
    print(resp.query.instructions)
  elif 'result' in resp:
    vegaConfig = resp.result.vega_config
    vegaConfig_dict = _map_to_dict(vegaConfig)
    alt.Chart.from_json(json_lib.dumps(vegaConfig_dict)).display();

def show_message(msg):
  m = msg.system_message
  if 'text' in m:
    handle_text_response(getattr(m, 'text'))
  elif 'schema' in m:
    handle_schema_response(getattr(m, 'schema'))
  elif 'data' in m:
    handle_data_response(getattr(m, 'data'))
  elif 'chart' in m:
    handle_chart_response(getattr(m, 'chart'))
  print('\n')