使用 HTTP 和 Python 构建数据代理

本页面将引导您使用 Python 向 Conversational Analytics API(通过 geminidataanalytics.googleapis.com 访问)发出 HTTP 请求。

本页面上的示例 Python 代码展示了如何完成以下任务:

此页面末尾包含完整版示例代码,以及用于流式传输 API 响应的辅助函数

配置初始设置和身份验证

以下示例 Python 代码会执行以下任务:

  • 导入所需的 Python 库
  • 使用 Google Cloud CLI 获取用于 HTTP 身份验证的访问令牌
  • 定义结算项目、位置和系统指令的变量
import json
import json as json_lib
import textwrap

import altair as alt
import google.auth
from google.auth.transport.requests import Request
import IPython
from IPython.display import display, HTML
import pandas as pd
from pygments import highlight, lexers, formatters
import requests

from google.colab import auth
auth.authenticate_user()

access_token = !gcloud auth application-default print-access-token
headers = {
        "Authorization": f"Bearer {access_token[0]}",
        "Content-Type": "application/json",
        "x-server-timeout": "300", # Custom timeout up to 600s
}

billing_project = 'YOUR-BILLING-PROJECT'
location = "global"
system_instruction = 'YOUR-SYSTEM-INSTRUCTIONS'

按如下所示替换示例值:

  • YOUR-BILLING-PROJECT启用了所需 API 的结算项目的 ID。
  • YOUR-SYSTEM-INSTRUCTIONS:用于指导智能体行为并根据您的数据需求自定义智能体的系统指令。例如,您可以使用系统指令来定义业务术语、控制回答长度或设置数据格式。最好使用撰写有效的系统指令部分推荐的 YAML 格式定义系统指令,以提供详尽的结构化指导。

向 Looker 进行身份验证

如果您打算连接到 Looker 数据源,则需要向 Looker 实例进行身份验证。

使用 API 密钥

以下 Python 代码示例演示了如何使用 API 密钥向 Looker 实例进行代理身份验证。

looker_credentials = {
    "oauth": {
        "secret": {
            "client_id": "YOUR-LOOKER-CLIENT-ID",
            "client_secret": "YOUR-LOOKER-CLIENT-SECRET",
        }
    }
}

按如下所示替换示例值:

  • YOUR-LOOKER-CLIENT-ID:您生成的 Looker API 密钥的客户端 ID。
  • YOUR-LOOKER-CLIENT-SECRET:您生成的 Looker API 密钥的客户端密钥。

使用访问令牌

以下 Python 代码示例演示了如何使用访问令牌向 Looker 实例进行代理身份验证。

looker_credentials = {
    "oauth": {
        "token": {
            "access_token": "YOUR-TOKEN",
        }
    }
}

按如下所示替换示例值:

  • YOUR-TOKEN:您生成的用于向 Looker 进行身份验证的 access_token 值。

向 AlloyDB for PostgreSQL 进行身份验证

如果您计划连接到 AlloyDB 数据源,请使用 gcloud CLI 向 AlloyDB 实例进行身份验证。

  1. 通过在 AlloyDB 实例上启用 alloydb.iam_authentication 标志,在 AlloyDB 上启用 Identity and Access Management (IAM) 身份验证。
  2. 授予 IAM 角色。调用对话式分析 API 的正文(用户或服务账号)需要在您的Google Cloud 项目中拥有适当的 IAM 权限。这通常包括 alloydb.databaseUser 等角色。
  3. 创建相应的数据库用户。创建与 IAM 主账号对应的 AlloyDB 用户,并使用用户或服务账号电子邮件地址作为用户名。

如需了解详情,请参阅 IAM 数据库身份验证

向 Cloud SQL for MySQL 和 Cloud SQL for PostgreSQL 进行身份验证

如果您打算连接到 Cloud SQL for MySQL 或 Cloud SQL for PostgreSQL 数据源,则必须向 Cloud SQL 实例进行身份验证。

  1. 启用 IAM 数据库身份验证。确保 Cloud SQL for MySQL 实例中的 cloudsql.iam_authentication 标志设置为 on。您可以在创建实例时或修补现有实例时添加此参数。

    gcloud sql instances patch INSTANCE_NAME --database-flags
    cloudsql.iam_authentication=on
    
  2. 授予所需的 IAM 角色。

    尝试连接的主账号(用户或服务账号)需要对项目拥有 cloudsql.instances.logincloudsql.instances.connect IAM 权限。如需授予这些角色和权限,请执行以下任一操作:

    1. 授予 roles/cloudsql.instanceUser 角色。这包括登录实例 (cloudsql.instances.login) 和使用代理连接 (cloudsql.instances.connect) 的权限。
    2. 或者,授予至少包含 cloudsql.instances.logincloudsql.instances.connect 的自定义角色。
  3. IAM 主账号添加为数据库用户。在 Cloud SQL 实例中创建一个与 IAM 主账号的电子邮件地址对应的用户。

    对于服务账号,请运行以下命令:

    gcloud sql users create SERVICE_ACCOUNT_EMAIL --instance=INSTANCE_NAME
    --type=cloud_iam_service_account
    

    对于用户账号,请运行以下命令:

    gcloud sql users create USER_EMAIL --instance=INSTANCE_NAME
    --type=cloud_iam_user
    
  4. 授予数据库权限。通过 IAM 进行连接可对用户进行数据库身份验证,但不会自动授予数据库中的权限。您需要使用标准 PostgreSQL 命令(例如 GRANT)为这个新数据库用户授予对特定对象(例如表和架构)的权限。

  5. 以超级用户(例如默认的 postgres 用户)身份连接,然后运行以下命令:

    -- Example: Connect using psql as the 'postgres' user
    
    GRANT SELECT ON ALL TABLES IN SCHEMA public TO
    "user-or-service-account@example.com";
    
  6. 使用 IAM 身份验证进行连接。

    客户端通常通过 Cloud SQL Auth 代理或连接器库进行连接,这些工具会处理令牌交换。数据库用户名是 IAM 正文的电子邮件地址。不使用密码。而是使用 OAuth 2.0 令牌,该令牌通常由 Cloud SQL Auth 代理或 Google Cloud CLI 自动处理。

    以下示例使用 psql 和 gcloud CLI 生成令牌:

    PGPASSWORD=$(gcloud sql generate-login-token) psql --host=HOST_IP \
    --username=IAM_PRINCIPAL_EMAIL --dbname=DATABASE_NAME\
    

    如需了解详情,请参阅 IAM 身份验证

    按如下所示替换示例值:

    • INSTANCE_NAME:您要修补以启用 IAM 数据库身份验证的 Cloud SQL 实例的 ID。
    • SERVICE_ACCOUNT_EMAIL:服务账号的电子邮件地址。
    • USER_EMAIL:用户账号的电子邮件地址。
    • HOST_IP:Cloud SQL 实例的 IP 地址。
    • IAM_PRINCIPAL_EMAIL:IAM 主账号(例如服务账号或用户)的电子邮件地址。
    • DATABASE_NAME:要连接的数据库的名称。

向 Spanner 进行身份验证

如果您计划连接到 Spanner 数据源,则需要对 Spanner 实例进行身份验证。

  1. 授予 IAM 角色。调用对话式分析 API 的正文(用户或服务账号)需要在您的Google Cloud 项目中拥有适当的 IAM 权限。这通常包括 databaseUser 等角色。
  2. 使用数据定义语言 (DDL) 在 Spanner 中创建相应的抽象数据库角色。
  3. 使用 IAM 政策允许 IAM 主账号代入这些数据库角色。

如需了解详情,请参阅精细访问权限控制权限

连接到数据源

以下部分介绍了如何为代理的数据源定义连接详细信息。您的代理可以通过以下方式连接到数据:

连接到 Looker 数据

以下示例代码定义了与 Looker 探索的连接。如需与 Looker 实例建立连接,请验证您是否已生成 Looker API 密钥,如使用 Conversational Analytics API 对数据源进行身份验证并连接到数据源中所述。 您一次最多可以使用对话式分析 API 连接到 5 个 Looker 探索。

连接到 Looker 数据源时,请注意以下事项:

  • 您可以在对话中查询任何包含的探索。
  • 代理一次只能查询一个探索。无法同时对多个探索进行查询。
  • 在一次对话中,代理可以查询多个探索。
  • 在包含多部分问题的对话中,或者在包含后续问题的对话中,代理可以在一次对话中查询多个探索。

    例如:某用户连接了两个探索,一个名为 cat-explore,另一个名为 dog-explore。用户输入问题“猫的数量多还是狗的数量多?”这会创建两个查询:一个用于统计 cat-explore 中的猫数,另一个用于统计 dog-explore 中的狗数。在完成这两个查询后,代理会比较这两个查询中的数字。

以下示例代码定义了与多个 Looker 探索的连接。为了提高代理的性能,您可以选择性地为探索提供黄金查询作为结构化上下文。如需了解详情,请参阅为 Looker 数据源定义数据代理上下文

looker_instance_uri = "https://your_company.looker.com"
looker_data_source = {
    "looker": {
        "explore_references": [
            {
                "looker_instance_uri": looker_instance_uri,
                "lookml_model": "your_model",
                "explore": "your_explore",
            },
            {
                "looker_instance_uri": looker_instance_uri,
                "lookml_model": "your_model_2",
                "explore": "your_explore_2",
            },
            # Add up to 5 total Explore references
        ],
       # Do not include the following line during agent creation
       "credentials": looker_credentials
    }
}

按如下所示替换示例值:

  • https://your_company.looker.com:Looker 实例的完整网址。
  • your_model:包含您要连接的探索的 LookML 模型的名称。
  • your_explore:您希望数据代理查询的 Looker 探索的名称。
  • your_model_2:包含您要连接的探索的第二个 LookML 模型的名称。您可以针对最多 5 个探索重复使用此变量,以添加更多模型。
  • your_explore_2:您希望数据代理查询的其他 Looker 探索的名称。您可以重复使用此变量,最多可添加 5 个探索。

连接到 BigQuery 数据

借助 Conversational Analytics API,您可以连接的 BigQuery 表的数量没有硬性限制。不过,连接到大量表格可能会降低准确率,或者导致您超出模型的输入令牌限制。

以下示例代码定义了与多个 BigQuery 表的连接,并包含可选的结构化上下文字段的示例。为了提高代理的性能,您可以选择性地为 BigQuery 表提供结构化上下文,例如表和列的说明、同义词、标记和示例查询。如需了解详情,请参阅为 BigQuery 数据源定义数据代理上下文

bigquery_data_sources = {
    "bq": {
        "tableReferences": [
            {
                "projectId": "my_project_id",
                "datasetId": "my_dataset_id",
                "tableId": "my_table_id",
                "schema": {
                    "description": "my_table_description",
                    "fields": [{
                        "name": "my_column_name",
                        "description": "my_column_description"
                    }]
                }
            },
            {
                "projectId": "my_project_id_2",
                "datasetId": "my_dataset_id_2",
                "tableId": "my_table_id_2"
            },
            {
                "projectId": "my_project_id_3",
                "datasetId": "my_dataset_id_3",
                "tableId": "my_table_id_3"
            },
        ]
    }
}

按如下所示替换示例值:

  • my_project_id:包含您要连接到的 BigQuery 数据集和表的 Google Cloud 项目的 ID。如需连接到公共数据集,请指定 bigquery-public-data
  • my_dataset_id:BigQuery 数据集的 ID。
  • my_table_id:BigQuery 表的 ID。
  • my_table_description:对表内容和用途的可选说明。
  • my_column_name:表中要为其提供可选说明的列的名称。
  • my_column_description:对列内容和用途的可选说明。

连接到数据洞察数据

以下示例代码定义了与数据洞察数据源的连接。

looker_studio_data_source = {
    "studio":{
        "studio_references": [
            {
              "studio_datasource_id": "studio_datasource_id"
            }
        ]
    }
}

studio_datasource_id 替换为数据源 ID。

连接到 AlloyDB 数据

对话式分析 API 使用 alloydb 字段中的 database_reference 字段连接到 AlloyDB 集群。

以下示例定义了与 AlloyDB 数据库的连接。

alloydb_data_source = {
    "alloydb": {
        "database_reference":
          {
            "project_id":"PROJECT_ID",
            "region":"REGION",
            "cluster_id":"CLUSTER_ID",
            "instance_id":"INSTANCE_ID",
            "database_id":"DATABASE",
            "table_ids":["TABLE_1_ID", "TABLE_2_ID"]
          }
          # Optional: Include this if you have pre-authored context for the agent
          # "agent_context_reference": {
          #     "context_set_id": f"projects/billing_project/locations/location/contextSets/your_context_set_id"
          # }

    }
}

按如下所示替换示例值:

  • PROJECT_ID:包含 AlloyDB 集群的 Google Cloud 项目的 ID。
  • REGION:AlloyDB 集群所在的区域,例如 us-central1
  • CLUSTER_ID:AlloyDB 集群的 ID。
  • INSTANCE_ID:AlloyDB 实例的 ID。
  • DATABASE:目标数据库的名称,例如 financial
  • TABLE_1_IDTABLE_2_ID:可选。数据代理建议使用的数据库中的表列表,例如 "loan""client""disp"
  • billing_project:启用了所需 API 的结算项目的 ID。
  • location:上下文集所在的位置,例如 global。
  • your_context_set_id:如果您为代理创建了高级情境,则为情境集的 ID。

连接到 Cloud SQL for MySQL 和 Cloud SQL for PostgreSQL 数据

对话式分析 API 使用 sql 字段中的 database_reference 字段连接到您的 Cloud SQL 实例。

以下示例定义了与 Cloud SQL for MySQL 或 Cloud SQL for PostgreSQL 数据库的连接。

sql_data_source = {
    "cloud_sql_reference": {
        "database_reference":
          {
            "project_id":"PROJECT_ID",
            "region":"REGION",
            "cluster_id":"CLUSTER_ID",
            "instance_id":"INSTANCE_ID",
            "database_id":"DATABASE_ID",
            "table_ids":["TABLE_1_ID", "TABLE_2_ID"]
          }
          # Optional: Include this if you have pre-authored context for the agent
          # "agent_context_reference": {
          #     "context_set_id": f"projects/billing_project/locations/location/contextSets/your_context_set_id"
          # }
    }
}

按如下所示替换示例值:

  • PROJECT_ID:包含 Cloud SQL 实例的 Google Cloud 项目的 ID。
  • REGION:Cloud SQL 实例所在的区域,例如 us-central1
  • CLUSTER_ID:Cloud SQL 集群的 ID。
  • INSTANCE_ID:Cloud SQL 实例的 ID。
  • DATABASE_ID:目标数据库的名称,例如 financial
  • TABLE_1_IDTABLE_2_ID:可选。数据代理建议使用的数据库中的表列表,例如 "loan""client""disp"
  • billing_project:启用了所需 API 的结算项目的 ID。
  • location:上下文集所在的位置,例如 global。
  • your_context_set_id:如果您为代理创建了高级情境,则为情境集的 ID。

连接到 Spanner 数据

对话式分析 API 使用 spanner 字段中的 database_reference 字段连接到您的 Spanner 实例。

以下示例定义了与 Spanner 数据库的连接。

spanner_data_sources = {
    "spanner_reference": {
        "database_reference": {
            "project_id": "PROJECT_ID",
            "region": location,
            "engine": GOOGLE_SQL,
            "instance_id": "INSTANCE_ID",
            "database_id": "DATABASE",
            "table_ids":["TABLE_1_ID", "TABLE_2_ID"]
        },
        # Optional: Include this if you have pre-authored context for the agent
        # "agent_context_reference": {
        #     "context_set_id": f"projects/billing_project/locations/location/contextSets/your_context_set_id"
        # }
    }
}

按如下所示替换示例值:

  • PROJECT_ID:包含 Spanner 实例的 Google Cloud 项目的 ID。
  • INSTANCE_ID:Spanner 实例的 ID。
  • DATABASE:目标数据库的名称,例如 financial
  • TABLE_1_IDTABLE_2_ID:建议数据代理使用的数据库中的表列表,例如 "loan""client""disp"
  • billing_project:已启用所需 API 的结算项目的 ID。
  • location:上下文集所在的位置,例如 global。
  • your_context_set_id:如果您为代理创建了高级情境,则为情境集的 ID。

创建数据代理

以下示例代码演示了如何通过向数据智能体创建端点发送 HTTP POST 请求来同步或异步创建数据智能体。请求载荷包含以下详细信息:

  • 代理的完整资源名称。此值包含项目 ID、位置和代理的唯一标识符。
  • 数据代理的说明。
  • 数据代理的上下文,包括系统说明(在配置初始设置和身份验证中定义)和代理使用的数据源(在连接到数据源中定义)。

您可以选择在创建数据代理时提供 kms_key,以使用客户管理的加密密钥 (CMEK) 保护数据代理。CMEK 仅适用于 Looker 数据源。如需了解详情,请参阅客户管理的加密密钥 (CMEK)

您还可以选择在请求载荷中添加 options 参数,以启用使用 Python 进行高级分析。 如需详细了解 options 参数以及您可以为对话配置的选项,请参阅 REST 资源:projects.locations.dataAgents

同步

data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents:createSync"

data_agent_id = "data_agent_1"

# Optional: If using CMEK, replace the empty strings with your KMS key details.
kms_project_id = ""  # KMS_PROJECT_ID (Defaults to billing_project if empty)
key_ring_name = ""   # KEY_RING_NAME
key_name = ""        # KEY_NAME

data_agent_payload = {
      "name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional
      "description": "This is the description of data_agent_1.", # Optional
      "data_analytics_agent": {
          "published_context": {
              "datasource_references": bigquery_data_sources,
              "system_instruction": system_instruction,
              # Optional: To enable advanced analysis with Python, include the following options block:
              "options": {
                  "analysis": {
                      "python": {
                          "enabled": True
                      }
                  }
              }
          }
      }
  }

# If key details are provided, construct and add the kms_key field.
if key_ring_name and key_name:
  if not kms_project_id:
    kms_project_id = billing_project
  data_agent_payload["kms_key"] = f"projects/{kms_project_id}/locations/{location}/keyRings/{key_ring_name}/cryptoKeys/{key_name}"

params = {"data_agent_id": data_agent_id} # Optional

data_agent_response = requests.post(
    data_agent_url, params=params, json=data_agent_payload, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agent created successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error creating Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

异步

data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents"

data_agent_id = "data_agent_1"

# Optional: If using CMEK, replace the empty strings with your KMS key details.
kms_project_id = ""  # KMS_PROJECT_ID (Defaults to billing_project if empty)
key_ring_name = ""   # KEY_RING_NAME
key_name = ""        # KEY_NAME

data_agent_payload = {
      "name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional
      "description": "This is the description of data_agent_1.", # Optional
      "data_analytics_agent": {
          "published_context": {
              "datasource_references": bigquery_data_sources,
              "system_instruction": system_instruction,
              # Optional: To enable advanced analysis with Python, include the following options block:
              "options": {
                  "analysis": {
                      "python": {
                          "enabled": True
                      }
                  }
              }
          }
      }
  }

# If key details are provided, construct and add the kms_key field.
if key_ring_name and key_name:
  if not kms_project_id:
    kms_project_id = billing_project
  data_agent_payload["kms_key"] = f"projects/{kms_project_id}/locations/{location}/keyRings/{key_ring_name}/cryptoKeys/{key_name}"

params = {"data_agent_id": data_agent_id} # Optional

data_agent_response = requests.post(
    data_agent_url, params=params, json=data_agent_payload, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agent created successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error creating Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

按如下所示替换示例值:

  • data_agent_1:数据智能体的唯一标识符。此值用于代理的资源名称中,并作为 data_agent_id 网址查询参数。
  • KMS_PROJECT_ID:如果使用 CMEK,则为密钥托管所在的项目 ID。如果未提供,则默认为您的结算项目。
  • KEY_RING_NAME:如果使用 CMEK,则为 Cloud KMS 密钥环的名称。
  • KEY_NAME:如果使用 CMEK,则为 Cloud KMS 密钥的名称。
  • This is the description of data_agent_1.:数据智能体的说明。

创建对话

以下示例代码演示了如何创建与数据代理的对话。

您可以在创建对话时提供 kms_key,选择性地使用 CMEK 保护对话。CMEK 仅适用于 Looker 数据源。如需了解详情,请参阅客户管理的加密密钥 (CMEK)

conversation_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/conversations"

data_agent_id = "data_agent_1"
conversation_id = "conversation_1"

# Optional: If using CMEK, replace the empty strings with your KMS key details.
kms_project_id = ""  # KMS_PROJECT_ID (Defaults to billing_project if empty)
key_ring_name = ""   # KEY_RING_NAME
key_name = ""        # KEY_NAME

conversation_payload = {
    "agents": [
        f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
    ],
    "name": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}",
}

# If key details are provided, construct and add the kms_key field.
if key_ring_name and key_name:
  if not kms_project_id:
    kms_project_id = billing_project
  conversation_payload["kms_key"] = f"projects/{kms_project_id}/locations/{location}/keyRings/{key_ring_name}/cryptoKeys/{key_name}"

params = {
    "conversation_id": conversation_id
}

conversation_response = requests.post(conversation_url, headers=headers, params=params, json=conversation_payload)

if conversation_response.status_code == 200:
    print("Conversation created successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error creating Conversation: {conversation_response.status_code}")
    print(conversation_response.text)

按如下所示替换示例值:

  • data_agent_1:数据代理的 ID,如创建数据代理部分的示例代码块中所定义。
  • conversation_1:对话的唯一标识符。
  • KMS_PROJECT_ID:如果使用 CMEK,则为密钥托管所在的项目 ID。如果未提供,则默认为您的结算项目。
  • KEY_RING_NAME:如果使用 CMEK,则为 Cloud KMS 密钥环的名称。
  • KEY_NAME:如果使用 CMEK,则为 Cloud KMS 密钥的名称。

管理数据智能体及对话

以下代码示例展示了如何使用 Conversational Analytics API 管理数据智能体及对话。您可以执行以下任务:

获取数据智能体

以下示例代码演示了如何通过向数据智能体资源网址发送 HTTP GET 请求来获取现有数据智能体。

data_agent_id = "data_agent_1"
data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"

data_agent_response = requests.get(
    data_agent_url, headers=headers
)

if data_agent_response.status_code == 200:
    print("Fetched Data Agent successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error: {data_agent_response.status_code}")
    print(data_agent_response.text)

在上面的示例中,将 data_agent_1 替换为要获取的数据智能体的 ID。

列出数据智能体

以下代码演示了如何通过向 dataAgents 端点发送 HTTP GET 请求来列出给定项目的所有数据智能体。

如需列出所有智能体,您必须拥有项目的 geminidataanalytics.dataAgents.list 权限。如需详细了解有哪些 IAM 角色包含此权限,请参阅预定义角色列表。

billing_project = "YOUR-BILLING-PROJECT"
data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents"

data_agent_response = requests.get(
    data_agent_url, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agents Listed successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Listing Data Agents: {data_agent_response.status_code}")

YOUR-BILLING-PROJECT 替换为您的结算项目的 ID。

列出可访问的数据代理

以下代码演示了如何通过向 dataAgents:listAccessible 端点发送 HTTP GET 请求来列出给定项目的所有可访问的数据代理。

billing_project = "YOUR-BILLING-PROJECT"
creator_filter = "YOUR-CREATOR-FILTER"
data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents:listAccessible"

params = {
    "creator_filter": creator_filter
}

data_agent_response = requests.get(
    data_agent_url, headers=headers, params=params
)

if data_agent_response.status_code == 200:
    print("Accessible Data Agents Listed successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Listing Accessible Data Agents: {data_agent_response.status_code}")

按如下所示替换示例值:

  • YOUR-BILLING-PROJECT:结算项目的 ID。
  • YOUR-CREATOR-FILTER:要根据数据代理的创建者应用的过滤条件。可能的值包括 NONE(默认)、CREATOR_ONLYNOT_CREATOR_ONLY

更新数据智能体

以下示例代码演示了如何通过向数据智能体资源网址发送 HTTP PATCH 请求来同步或异步更新数据智能体。请求载荷包含您要更改的字段的新值;请求参数包含 updateMask 参数,用于指定要更新的字段。

同步

data_agent_id = "data_agent_1"
billing_project = "YOUR-BILLING-PROJECT"

data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:updateSync"

payload = {
    "description": "Updated description of the data agent.",
    "data_analytics_agent": {
        "published_context": {
            "datasource_references": bigquery_data_sources,
            "system_instruction": system_instruction
        }
    },
}

fields = ["description", "data_analytics_agent"]
params = {
    "updateMask": ",".join(fields)
}

data_agent_response = requests.patch(
    data_agent_url, headers=headers, params=params, json=payload
)

if data_agent_response.status_code == 200:
    print("Data Agent updated successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Updating Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

异步

data_agent_id = "data_agent_1"
billing_project = "YOUR-BILLING-PROJECT"

data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"

payload = {
    "description": "Updated description of the data agent.",
    "data_analytics_agent": {
        "published_context": {
            "datasource_references": bigquery_data_sources,
            "system_instruction": system_instruction
        }
    },
}

fields = ["description", "data_analytics_agent"]
params = {
    "updateMask": ",".join(fields)
}

data_agent_response = requests.patch(
    data_agent_url, headers=headers, params=params, json=payload
)

if data_agent_response.status_code == 200:
    print("Data Agent updated successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Updating Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

按如下所示替换示例值:

  • data_agent_1:要更新的数据智能体的 ID。
  • YOUR-BILLING-PROJECT:结算项目的 ID。
  • Updated description of the data agent.:数据智能体的新说明。

获取数据智能体的 IAM 政策

以下示例代码演示了如何通过向数据智能体网址发送 HTTP POST 请求来获取数据智能体的 IAM 政策。请求载荷包含数据智能体路径。

billing_project = "YOUR-BILLING-PROJECT"
data_agent_id = "data_agent_1"

data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:getIamPolicy"

# Request body
payload = {
    "resource": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
}

data_agent_response = requests.post(
    data_agent_url, headers=headers, json=payload
)

if data_agent_response.status_code == 200:
    print("IAM Policy fetched successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error fetching IAM policy: {data_agent_response.status_code}")
    print(data_agent_response.text)

按如下所示替换示例值:

  • YOUR-BILLING-PROJECT:结算项目的 ID。
  • data_agent_1:要获取其 IAM 政策的数据智能体的 ID。

为数据智能体设置 IAM 政策

如需共享智能体,您可以使用 setIamPolicy 方法向用户分配特定智能体的 IAM 角色。以下示例代码演示了如何使用包含绑定的载荷对数据智能体网址进行 POST 调用。绑定指定了应向哪些用户分配哪些角色。

billing_project = "YOUR-BILLING-PROJECT"
data_agent_id = "data_agent_1"
role = "roles/geminidataanalytics.dataAgentEditor"
users = "222larabrown@gmail.com, cloudysanfrancisco@gmail.com"

data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:setIamPolicy"

# Request body
payload = {
    "policy": {
        "bindings": [
            {
                "role": role,
                "members": [
                    f"user:{i.strip()}" for i in users.split(",")
                ]
            }
        ]
    }
}

data_agent_response = requests.post(
    data_agent_url, headers=headers, json=payload
)

if data_agent_response.status_code == 200:
    print("IAM Policy set successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error setting IAM policy: {data_agent_response.status_code}")
    print(data_agent_response.text)

按如下所示替换示例值:

  • YOUR-BILLING-PROJECT:结算项目的 ID。
  • data_agent_1:要为其设置 IAM 政策的数据智能体的 ID。
  • 222larabrown@gmail.com, cloudysanfrancisco@gmail.com:要向其授予指定角色的用户的邮箱列表(以英文逗号分隔)。

删除数据智能体

以下示例代码演示了如何通过向数据智能体资源网址发送 HTTP DELETE 请求来同步或异步软删除数据智能体。软删除意味着,虽然该智能体会被删除,但在 30 天内仍可检索到。

同步

billing_project = "YOUR-BILLING-PROJECT"
data_agent_id = "data_agent_1"

data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:deleteSync"

data_agent_response = requests.delete(
    data_agent_url, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agent deleted successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Deleting Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

异步

billing_project = "YOUR-BILLING-PROJECT"
data_agent_id = "data_agent_1"

data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"

data_agent_response = requests.delete(
    data_agent_url, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agent deleted successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Deleting Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

按如下所示替换示例值:

  • YOUR-BILLING-PROJECT:结算项目的 ID。
  • data_agent_1:要删除的数据智能体的 ID。

获取对话

以下示例代码演示了如何通过向对话资源网址发送 HTTP GET 请求来提取现有对话。

billing_project = "YOUR-BILLING-PROJECT"
conversation_id = "conversation_1"

conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations/{conversation_id}"

conversation_response = requests.get(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation fetched successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while fetching conversation: {conversation_response.status_code}")
    print(conversation_response.text)

按如下所示替换示例值:

  • YOUR-BILLING-PROJECT:结算项目的 ID。
  • conversation_1:要提取的对话的 ID。

列出对话

以下示例代码演示了如何通过向 conversations 端点发送 HTTP GET 请求来列出给定项目的对话。

默认情况下,此方法会返回您创建的对话。管理员(具有 cloudaicompanion.topicAdmin IAM 角色的用户)可以查看项目中的所有对话。

billing_project = "YOUR-BILLING-PROJECT"
conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations"

conversation_response = requests.get(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation fetched successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while fetching conversation: {conversation_response.status_code}")
    print(conversation_response.text)

YOUR-BILLING-PROJECT 替换为启用了所需 API 的结算项目的 ID。

列出对话中的消息

以下示例代码演示了如何通过向对话的 messages 端点发送 HTTP GET 请求来列出对话中的所有消息。

如需列出消息,您必须拥有对话的 cloudaicompanion.topics.get 权限

billing_project = "YOUR-BILLING-PROJECT"

conversation_id = "conversation_1"

conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations/{conversation_id}/messages"

conversation_response = requests.get(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation fetched successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while fetching conversation: {conversation_response.status_code}")
    print(conversation_response.text)

按如下所示替换示例值:

  • YOUR-BILLING-PROJECT:结算项目的 ID。
  • conversation_1:要列出其消息的对话的 ID。

删除对话

以下示例代码演示了如何通过向对话资源网址发送 HTTP DELETE 请求来删除对话。管理员(具有 cloudaicompanion.topicAdmin Identity and Access Management 角色的用户)或具有 cloudaicompanion.topics.delete Identity and Access Management 权限的用户可以删除项目中的对话。

billing_project = "YOUR-BILLING-PROJECT"
conversation_id = "conversation_1"

conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations/{conversation_id}"

conversation_response = requests.delete(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation deleted successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while deleting conversation: {conversation_response.status_code}")
    print(conversation_response.text)

按如下所示替换示例值:

  • YOUR-BILLING-PROJECT:结算项目的 ID。
  • conversation_1:要删除的对话的 ID。

使用 API 提问

创建数据代理和(对于有状态的聊天)对话后,您可以向代理发送查询。

当您发送查询时,该 API 会返回 Message 对象的流。此数据流可以包含不同类型的消息,包括文本、数据表和图表。文本消息可以提供有关代理推理的深入信息、报告其进度或给出最终答案。每条文本消息的用途由其 TextType 值指示:

  • THOUGHT:显示智能体在规划如何回答您的查询时所进行的内部思考过程。THOUGHT 消息可让您逐步了解代理的推理和决策流程,包含两部分:parts[0] 是思考摘要,用于简要总结完整的思考文本;parts[1] 是完整的思考文本。
  • PROGRESS:报告智能体在执行操作(例如检索数据或调用工具)时的进度。此值仅针对 Looker 数据源返回,包含两部分:parts[0] 是摘要,parts[1] 是完整的进度文本。
  • FINAL_RESPONSE:提供查询的最终答案。

下一部分中的示例使用用于流式传输聊天回答的辅助 Python 函数来处理和显示 API 流式传输回答中的每条消息。如需了解在使用 Looker 数据源时如何在界面中呈现这些消息,请参阅呈现 Looker 数据源的代理回答

通过有状态和无状态聊天提问

您可以通过以下主要模式与 API 进行交互:

  • 有状态聊天: Google Cloud 会存储和管理对话历史记录。有状态聊天本身就是多轮聊天,因为 API 会保留之前消息中的上下文。您只需在每轮中发送当前消息。
  • 无状态聊天:您的应用会管理对话历史记录。您必须在每条新消息中添加相关的先前消息。如需查看有关如何在无状态模式下管理多轮对话的详细示例,请参阅创建无状态多轮对话

以下示例演示了如何通过向 :chat 端点发出 POST 请求,使用 API 进行有状态和无状态聊天。

有状态聊天

发送包含对话引用的有状态聊天请求

以下示例代码演示了如何使用您在之前的步骤中定义的对话向 API 提问。此示例使用 get_stream 辅助函数来流式传输回答。

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

data_agent_id = "data_agent_1"
conversation_id = "conversation_1"

# Construct the payload
chat_payload = {
    "parent": f"projects/{billing_project}/locations/global",
    "messages": [
        {
            "userMessage": {
                "text": "Make a bar graph for the top 5 states by the total number of airports"
            }
        }
    ],
    "conversation_reference": {
        "conversation": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}",
        "data_agent_context": {
            "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
            # "credentials": looker_credentials
        }
    }
}

# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)

按如下所示替换示例值:

  • data_agent_1:数据代理的 ID,如创建数据代理部分的示例代码块中所定义。
  • conversation_1:对话的唯一标识符。
  • Make a bar graph for the top 5 states by the total number of airports 用作示例提示。

无状态聊天

以下示例展示了如何使用数据代理引用或内嵌上下文来发送无状态请求。

发送包含数据代理引用的无状态聊天请求

以下示例代码演示了如何使用您在之前的步骤中定义的数据代理向 API 提出无状态问题。此示例使用 get_stream 辅助函数来流式传输回答。

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

data_agent_id = "data_agent_1"

# Construct the payload
chat_payload = {
    "parent": f"projects/{billing_project}/locations/global",
    "messages": [
        {
            "userMessage": {
                "text": "Make a bar graph for the top 5 states by the total number of airports"
            }
        }
    ],
    "data_agent_context": {
        "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
        # "credentials": looker_credentials
    }
}

# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)

按如下所示替换示例值:

  • data_agent_1:数据代理的 ID,如创建数据代理部分的示例代码块中所定义。
  • Make a bar graph for the top 5 states by the total number of airports 用作示例提示。

发送包含内嵌上下文的无状态对话请求

以下示例代码演示了如何使用内嵌上下文向 API 提出无状态问题。此示例使用 get_stream 辅助函数来流式传输回答,并使用 BigQuery 数据源作为示例。

您还可以选择在请求载荷中添加 options 参数,以启用使用 Python 进行高级分析。 如需详细了解 options 参数以及您可以为对话配置的选项,请参阅 REST 资源:projects.locations.dataAgents 页面。

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"

# Construct the payload
chat_payload = {
    "parent": f"projects/{billing_project}/locations/global",
    "messages": [
        {
            "userMessage": {
                "text": "Make a bar graph for the top 5 states by the total number of airports"
            }
        }
    ],
    "inline_context": {
        "datasource_references": bigquery_data_sources,
          # Optional: To enable advanced analysis with Python, include the following options block:
          "options": {
              "analysis": {
                  "python": {
                      "enabled": True
                  }
              }
          }
    }
}

# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)

以下示例代码演示了如何使用内嵌上下文向 API 提出无状态问题,并以 AlloyDB 数据源为例。

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

# The natural language question to ask the data agent
user_prompt = "what is the average loan amount in the US?"  # Replace with your question

# Construct the payload
chat_payload = {
    "parent": f"projects/{billing_project}/locations/global",
    "messages": [
        {
            "userMessage": {
                "text": user_prompt
            }
        }
    ],
    "data_agent_context": {
          "data_agent": f"projects/{billing_project}/locations/global/dataAgents/{data_agent_id}",
    }
}

print(f"Sending prompt to :chat: '{user_prompt}'")
print(f"Endpoint: {chat_url}")
print(f"Payload: {json.dumps(chat_payload, indent=2)}")

get_stream(chat_url, chat_payload)

创建无状态多轮对话

如需在无状态对话中提出后续问题,您的应用必须通过在每次新请求中发送完整的消息记录来管理对话的上下文。以下部分展示了如何定义和调用辅助函数来创建多轮对话:

发送多轮请求

以下 multi_turn_Conversation 辅助函数通过将消息存储在列表中来管理对话上下文。这样,您可以发送基于之前的轮次的后续问题。在函数的载荷中,您可以引用数据代理,也可以使用内嵌上下文直接提供数据源。

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"

# List that is used to track previous turns and is reused across requests
conversation_messages = []

data_agent_id = "data_agent_1"

# Helper function for calling the API
def multi_turn_Conversation(msg):

  userMessage = {
      "userMessage": {
          "text": msg
      }
  }

  # Send a multi-turn request by including previous turns and the new message
  conversation_messages.append(userMessage)

  # Construct the payload
  chat_payload = {
      "parent": f"projects/{billing_project}/locations/global",
      "messages": conversation_messages,
      # Use a data agent reference
      "data_agent_context": {
          "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
          # "credentials": looker_credentials
      },
      # Use inline context
      # "inline_context": {
      #     "datasource_references": bigquery_data_sources,
      # }
  }

  # Call the get_stream_multi_turn helper function to stream the response
  get_stream_multi_turn(chat_url, chat_payload, conversation_messages)

在上述示例中,将 data_agent_1 替换为数据代理的 ID,如创建数据代理部分的示例代码块中所定义。

您可以在每轮对话中调用 multi_turn_Conversation 辅助函数。以下示例代码展示了如何发送初始请求,然后发送基于之前回答的后续请求。

# Send first-turn request
multi_turn_Conversation("Which species of tree is most prevalent?")

# Send follow-up-turn request
multi_turn_Conversation("Can you show me the results as a bar chart?")

在上面的示例中,按如下所示替换示例值:

  • Which species of tree is most prevalent?:要发送给数据智能体的自然语言问题。
  • Can you show me the results as a bar chart?:基于或优化上一个问题的后续问题。

处理回答

以下 get_stream_multi_turn 函数用于处理流式 API 响应。此函数类似于 get_stream 辅助函数,但它会将回答存储在 conversation_messages 列表中,以便保存下一轮对话的对话上下文。

def get_stream_multi_turn(url, json, conversation_messages):
    s = requests.Session()

    acc = ''

    with s.post(url, json=json, headers=headers, stream=True) as resp:
        for line in resp.iter_lines():
            if not line:
                continue

            decoded_line = str(line, encoding='utf-8')

            if decoded_line == '[{':
                acc = '{'
            elif decoded_line == '}]':
                acc += '}'
            elif decoded_line == ',':
                continue
            else:
                acc += decoded_line

            if not is_json(acc):
                continue

            data_json = json_lib.loads(acc)
            # Store the response that will be used in the next iteration
            conversation_messages.append(data_json)

            if not 'systemMessage' in data_json:
                if 'error' in data_json:
                    handle_error(data_json['error'])
                continue

            if 'text' in data_json['systemMessage']:
                handle_text_response(data_json['systemMessage']['text'])
            elif 'schema' in data_json['systemMessage']:
                handle_schema_response(data_json['systemMessage']['schema'])
            elif 'data' in data_json['systemMessage']:
                handle_data_response(data_json['systemMessage']['data'])
            elif 'chart' in data_json['systemMessage']:
                handle_chart_response(data_json['systemMessage']['chart'])
            else:
                colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter())
                print(colored_json)
            print('\n')
            acc = ''

端到端代码示例

以下可展开的代码示例包含本指南中涵盖的所有任务。

使用 HTTP 和 Python 为 BigQuery 和 Looker 构建数据代理

    import json
    import json as json_lib
    import textwrap

    import altair as alt
    import google.auth
    from google.auth.transport.requests import Request
    import IPython
    from IPython.display import display, HTML
    import pandas as pd
    from pygments import formatters, highlight, lexers
    import requests

    from google.colab import auth
    auth.authenticate_user()

    access_token = !gcloud auth application-default print-access-token
    headers = {
        "Authorization": f"Bearer {access_token[0]}",
        "Content-Type": "application/json",
        "x-server-timeout": "300", # Custom timeout up to 600s
    }

    ################### Data source details ###################

    billing_project = "your_billing_project"
    location = "global"
    system_instruction = "Help the user in analyzing their data"


    # BigQuery data source
    bigquery_data_sources = {
        "bq": {
        "tableReferences": [
            {
            "projectId": "bigquery-public-data",
            "datasetId": "san_francisco",
            "tableId": "street_trees"
            }
        ]
        }
    }

    # Looker data source
    looker_credentials = {
        "oauth": {
            "secret": {
            "client_id": "your_looker_client_id",
            "client_secret": "your_looker_client_secret",
            }
        }
    }

    # To use access_token for authentication, uncomment the following looker_credentials code block and comment out the previous looker_credentials code block.
    # looker_credentials = {
    #     "oauth": {
    #         "token": {
    #           "access_token": "your_looker_access_token",
    #         }
    #     }
    # }

    looker_data_source = {
        "looker": {
        "explore_references": [
            {
                "looker_instance_uri": looker_instance_uri,
                "lookml_model": "your_model",
                "explore": "your_explore",
            },
            {
                "looker_instance_uri": looker_instance_uri,
                "lookml_model": "your_model_2",
                "explore": "your_explore_2",
            },
            # Add up to 5 total Explore references
        ],
        # Do not include the following line during agent creation
        # "credentials": looker_credentials
    }

    # Looker Studio data source
    looker_studio_data_source = {
        "studio":{
            "studio_references": [
                {
                  "studio_datasource_id": "studio_datasource_id"
                }
            ]
        }
    }

    ################### Create data agent ###################
    data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents"

    data_agent_id = "data_agent_1"

    data_agent_payload = {
        "name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional
        "description": "This is the description of data_agent.", # Optional

        "data_analytics_agent": {
            "published_context": {
                "datasource_references": bigquery_data_sources,
                "system_instruction": system_instruction,
                # Optional: To enable advanced analysis with Python, include the following options block:
                "options": {
                    "analysis": {
                        "python": {
                            "enabled": True
                        }
                    }
                }
            }
        }
    }

    params = {"data_agent_id": data_agent_id} # Optional

    data_agent_response = requests.post(
        data_agent_url, params=params, json=data_agent_payload, headers=headers
    )

    if data_agent_response.status_code == 200:
        print("Data Agent created successfully!")
        print(json.dumps(data_agent_response.json(), indent=2))
    else:
        print(f"Error creating Data Agent: {data_agent_response.status_code}")
        print(data_agent_response.text)


    ################### Create conversation ###################

    conversation_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/conversations"

    data_agent_id = "data_agent_1"
    conversation_id = "conversation _1"

    conversation_payload = {
        "agents": [
            f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
        ],
        "name": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}"
    }
    params = {
        "conversation_id": conversation_id
    }

    conversation_response = requests.post(conversation_url, headers=headers, params=params, json=conversation_payload)

    if conversation_response.status_code == 200:
        print("Conversation created successfully!")
        print(json.dumps(conversation_response.json(), indent=2))
    else:
        print(f"Error creating Conversation: {conversation_response.status_code}")
        print(conversation_response.text)


    ################### Chat with the API by using conversation (stateful) ####################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

    data_agent_id = "data_agent_1"
    conversation_id = "conversation _1"

    # Construct the payload
    chat_payload = {
        "parent": f"projects/{billing_project}/locations/global",
        "messages": [
            {
                "userMessage": {
                    "text": "Make a bar graph for the top 5 states by the total number of airports"
                }
            }
        ],
        "conversation_reference": {
            "conversation": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}",
            "data_agent_context": {
                "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
                # "credentials": looker_credentials
            }
        }
    }

    # Call the get_stream function to stream the response
    get_stream(chat_url, chat_payload)

    ################### Chat with the API by using dataAgents (stateless) ####################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

    data_agent_id = "data_agent_1"

    # Construct the payload
    chat_payload = {
        "parent": f"projects/{billing_project}/locations/global",
        "messages": [
            {
                "userMessage": {
                    "text": "Make a bar graph for the top 5 states by the total number of airports"
                }
            }
        ],
        "data_agent_context": {
            "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
            # "credentials": looker_credentials
        }
    }

    # Call the get_stream function to stream the response
    get_stream(chat_url, chat_payload)

    ################### Chat with the API by using inline context (stateless) ####################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"

    # Construct the payload
    chat_payload = {
        "parent": f"projects/{billing_project}/locations/global",
        "messages": [
            {
                "userMessage": {
                    "text": "Make a bar graph for the top 5 states by the total number of airports"
                }
            }
        ],
        "inline_context": {
            "datasource_references": bigquery_data_sources,
            # Optional - if wanting to use advanced analysis with python
            "options": {
                "analysis": {
                    "python": {
                        "enabled": True
                    }
                }
            }
        }
    }

    # Call the get_stream function to stream the response
    get_stream(chat_url, chat_payload)

    ################### Multi-turn conversation ###################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"

    # List that is used to track previous turns and is reused across requests
    conversation_messages = []

    data_agent_id = "data_agent_1"

    # Helper function for calling the API
    def multi_turn_Conversation(msg):

      userMessage = {
          "userMessage": {
              "text": msg
          }
      }

      # Send a multi-turn request by including previous turns and the new message
      conversation_messages.append(userMessage)

      # Construct the payload
      chat_payload = {
          "parent": f"projects/{billing_project}/locations/global",
          "messages": conversation_messages,
          # Use a data agent reference
          "data_agent_context": {
              "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
              # "credentials": looker_credentials
          },
          # Use inline context
          # "inline_context": {
          #     "datasource_references": bigquery_data_sources,
          # }
      }

      # Call the get_stream_multi_turn helper function to stream the response
      get_stream_multi_turn(chat_url, chat_payload, conversation_messages)

    # Send first-turn request
    multi_turn_Conversation("Which species of tree is most prevalent?")

    # Send follow-up-turn request
    multi_turn_Conversation("Can you show me the results as a bar chart?")
    

定义辅助函数

以下示例代码包含前面代码示例中使用的辅助函数定义。这些函数有助于解析来自 API 的回答并显示结果。

用于流式传输聊天回答的辅助 Python 函数

    def is_json(str):
      try:
          json_object = json_lib.loads(str)
      except ValueError as e:
          return False
      return True

    def handle_text_response(resp):
      parts = resp['parts']
      full_text = "".join(parts)
      if "\n" not in full_text and len(full_text) > 80:
        wrapped_text = textwrap.fill(full_text, width=80)
        print(wrapped_text)
      else:
        print(full_text)

    def get_property(data, field_name, default = ''):
      return data[field_name] if field_name in data else default

    def display_schema(data):
      fields = data['fields']
      df = pd.DataFrame({
        "Column": map(lambda field: get_property(field, 'name'), fields),
        "Type": map(lambda field: get_property(field, 'type'), fields),
        "Description": map(lambda field: get_property(field, 'description', '-'), fields),
        "Mode": map(lambda field: get_property(field, 'mode'), fields)
      })
      display(df)

    def display_section_title(text):
      display(HTML('<h2>{}</h2>'.format(text)))

    def format_bq_table_ref(table_ref):
      return '{}.{}.{}'.format(table_ref['projectId'], table_ref['datasetId'], table_ref['tableId'])

    def format_looker_table_ref(table_ref):
      return 'lookmlModel: {}, explore: {}, lookerInstanceUri: {}'.format(table_ref['lookmlModel'], table_ref['explore'], table_ref['lookerInstanceUri'])

    def display_datasource(datasource):
      source_name = ''

      if 'studioDatasourceId' in datasource:
        source_name = datasource['studioDatasourceId']
      elif 'lookerExploreReference' in datasource:
        source_name = format_looker_table_ref(datasource['lookerExploreReference'])
      else:
        source_name = format_bq_table_ref(datasource['bigqueryTableReference'])

      print(source_name)
      display_schema(datasource['schema'])

    def handle_schema_response(resp):
      if 'query' in resp:
        print(resp['query']['question'])
      elif 'result' in resp:
        display_section_title('Schema resolved')
        print('Data sources:')
        for datasource in resp['result']['datasources']:
          display_datasource(datasource)

    def handle_data_response(resp):
      if 'query' in resp:
        query = resp['query']
        display_section_title('Retrieval query')
        print('Query name: {}'.format(query['name']))
        if 'question' in query:
          print('Question: {}'.format(query['question']))
        if 'datasources' in query:
          print('Data sources:')
          for datasource in query['datasources']:
            display_datasource(datasource)
      elif 'generatedSql' in resp:
        display_section_title('SQL generated')
        print(resp['generatedSql'])
      elif 'result' in resp:
        display_section_title('Data retrieved')

        fields = map(lambda field: get_property(field, 'name'), resp['result']['schema']['fields'])
        dict = {}

        for field in fields:
          dict[field] = map(lambda el: get_property(el, field), resp['result']['data'])

        display(pd.DataFrame(dict))

    def handle_chart_response(resp):
      if 'query' in resp:
        print(resp['query']['instructions'])
      elif 'result' in resp:
        vegaConfig = resp['result']['vegaConfig']
        alt.Chart.from_json(json_lib.dumps(vegaConfig)).display();

    def handle_error(resp):
      display_section_title('Error')
      print('Code: {}'.format(resp['code']))
      print('Message: {}'.format(resp['message']))

    def get_stream(url, json):
      s = requests.Session()

      acc = ''

      with s.post(url, json=json, headers=headers, stream=True) as resp:
        for line in resp.iter_lines():
          if not line:
            continue

          decoded_line = str(line, encoding='utf-8')

          if decoded_line == '[{':
            acc = '{'
          elif decoded_line == '}]':
            acc += '}'
          elif decoded_line == ',':
            continue
          else:
            acc += decoded_line

          if not is_json(acc):
            continue

          data_json = json_lib.loads(acc)

          if not 'systemMessage' in data_json:
            if 'error' in data_json:
                handle_error(data_json['error'])
            continue

          if 'text' in data_json['systemMessage']:
            handle_text_response(data_json['systemMessage']['text'])
          elif 'schema' in data_json['systemMessage']:
            handle_schema_response(data_json['systemMessage']['schema'])
          elif 'data' in data_json['systemMessage']:
            handle_data_response(data_json['systemMessage']['data'])
          elif 'chart' in data_json['systemMessage']:
            handle_chart_response(data_json['systemMessage']['chart'])
          else:
            colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter())
            print(colored_json)
            print('\n')
            acc = ''

    def get_stream_multi_turn(url, json, conversation_messages):
        s = requests.Session()

        acc = ''

        with s.post(url, json=json, headers=headers, stream=True) as resp:
            for line in resp.iter_lines():
                if not line:
                    continue

                decoded_line = str(line, encoding='utf-8')

                if decoded_line == '[{':
                    acc = '{'
                elif decoded_line == '}]':
                    acc += '}'
                elif decoded_line == ',':
                    continue
                else:
                    acc += decoded_line

                if not is_json(acc):
                    continue

                data_json = json_lib.loads(acc)
                # Store the response that will be used in the next iteration
                conversation_messages.append(data_json)

                if not 'systemMessage' in data_json:
                    if 'error' in data_json:
                        handle_error(data_json['error'])
                    continue

                if 'text' in data_json['systemMessage']:
                    handle_text_response(data_json['systemMessage']['text'])
                elif 'schema' in data_json['systemMessage']:
                    handle_schema_response(data_json['systemMessage']['schema'])
                elif 'data' in data_json['systemMessage']:
                    handle_data_response(data_json['systemMessage']['data'])
                elif 'chart' in data_json['systemMessage']:
                    handle_chart_response(data_json['systemMessage']['chart'])
                else:
                    colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter())
                    print(colored_json)
                print('\n')
                acc = ''