HTTP と Python を使用してデータ エージェントを構築する

このページでは、Python を使用して会話分析 APIgeminidataanalytics.googleapis.com を介してアクセス)に HTTP リクエストを行う方法について説明します。

このページの Python コードサンプルは、次のタスクを行う方法を示しています。

このページの最後に、サンプルコードの完全版と、API レスポンスのストリーミングに使用されるヘルパー関数が記載されています。

初期設定と認証を構成する

以下の Python サンプルコードは、次のタスクを実行します。

  • 必要な Python ライブラリをインポートする
  • Google Cloud CLI を使用して HTTP 認証用のアクセス トークンを取得する
  • 課金プロジェクトとシステム指示の変数を定義する
import json
import json as json_lib
import textwrap

import altair as alt
import google.auth
from google.auth.transport.requests import Request
import IPython
from IPython.display import display, HTML
import pandas as pd
from pygments import highlight, lexers, formatters
import requests

from google.colab import auth
auth.authenticate_user()

access_token = !gcloud auth application-default print-access-token
headers = {
        "Authorization": f"Bearer {access_token[0]}",
        "Content-Type": "application/json",
        "x-server-timeout": "300", # Custom timeout up to 600s
}

billing_project = 'YOUR-BILLING-PROJECT'
system_instruction = 'YOUR-SYSTEM-INSTRUCTIONS'

サンプル値を次のように置き換えます。

  • YOUR-BILLING-PROJECT: 必要な API を有効にした課金プロジェクトの ID。
  • YOUR-SYSTEM-INSTRUCTIONS: エージェントの動作をガイドし、データのニーズに合わせてカスタマイズするためのシステム指示。たとえば、システム指示を使用してビジネス用語を定義したり、回答の長さを制御したり、データの形式を設定できます。理想的には、効果的なシステム指示を記述するで推奨されている YAML 形式でシステム指示を定義し、構造化された詳細なガイダンスを提供します。

Looker で認証を行う

Looker データソースに接続する場合は、Looker インスタンスで認証を行う必要があります。

API キーの使用

次の Python コードサンプルは、API キーを使用してエージェントを Looker インスタンスに対して認証する方法を示しています。

looker_credentials = {
    "oauth": {
        "secret": {
            "client_id": "YOUR-LOOKER-CLIENT-ID",
            "client_secret": "YOUR-LOOKER-CLIENT-SECRET",
        }
    }
}

サンプル値を次のように置き換えます。

  • YOUR-LOOKER-CLIENT-ID: 生成された Looker API キーのクライアント ID。
  • YOUR-LOOKER-CLIENT-SECRET: 生成された Looker API キーのクライアント シークレット。

アクセス トークンを使用する

次の Python コードサンプルは、アクセス トークンを使用してエージェントを Looker インスタンスに対して認証する方法を示しています。

looker_credentials = {
    "oauth": {
        "token": {
            "access_token": "YOUR-TOKEN",
        }
    }
}

サンプル値を次のように置き換えます。

  • YOUR-TOKEN: Looker の認証用に生成する access_token 値。

データソースに接続する

以降のセクションでは、エージェントのデータソースの接続の詳細を定義する方法について説明します。エージェントは、LookerBigQueryLooker Studio のデータに接続できます。

Looker データに接続する

次のサンプルコードは、Looker Explore への接続を定義します。Looker インスタンスとの接続を確立するには、会話分析 API を使用してデータソースの認証と接続を行うで説明されているように、Looker API キーが生成されていることを確認します。Conversational Analytics API を使用すると、一度に最大 5 つの Looker Explore に接続できます。

Looker データソースに接続する場合は、次の点に注意してください。

  • 会話で、含まれている Explore をクエリできます。
  • エージェントが一度にクエリできる Explore は 1 つのみです。複数のデータ探索にまたがって同時にクエリを実行することはできません。
  • エージェントは、同じ会話で複数の探索に対してクエリを実行できます。
  • エージェントは、複数の部分から構成される質問を含む会話や、フォローアップの質問を含む会話で、複数の探索をクエリできます。

    たとえば、ユーザーが 2 つの Explore(cat-exploredog-explore)を接続します。ユーザーが「猫の数と犬の数ではどちらが多いですか?」という質問を入力します。これにより、cat-explore の猫の数をカウントするクエリと、dog-explore の犬の数をカウントするクエリの 2 つが作成されます。エージェントは、両方のクエリを完了した後、両方のクエリの数値を比較します。

looker_instance_uri = "https://your_company.looker.com"
looker_data_source = {
    "looker": {
        "explore_references": [
            {
                "looker_instance_uri": looker_instance_uri,
                "lookml_model": "your_model",
                "explore": "your_explore",
            },
            {
                "looker_instance_uri": looker_instance_uri,
                "lookml_model": "your_model_2",
                "explore": "your_explore_2",
            },
            # Add up to 5 total Explore references
        ],
       # Do not include the following line during agent creation
       "credentials": looker_credentials
    }
}

サンプル値を次のように置き換えます。

  • https://your_company.looker.com: Looker インスタンスの完全な URL。
  • your_model: 接続する Explore を含む LookML モデルの名前。
  • your_explore: データ エージェントがクエリを実行する Looker Explore の名前。
  • your_model_2: 接続する Explore を含む 2 番目の LookML モデルの名前。この変数は、最大 5 つの Explore の追加モデルに対して繰り返すことができます。
  • your_explore_2: データ エージェントがクエリを実行する追加の Looker Explore の名前。この変数を繰り返して、最大 5 つの探索を含めることができます。

BigQuery データに接続する

Conversational Analytics API では、接続できる BigQuery テーブルの数に上限はありません。ただし、多数のテーブルに接続すると、精度が低下したり、モデルの入力トークン上限を超えたりする可能性があります。

次のサンプルコードは、複数の BigQuery テーブルへの接続を定義し、オプションの構造化コンテキスト フィールドの例を示しています。エージェントのパフォーマンスを向上させるために、テーブルと列の説明、同義語、タグ、クエリの例など、BigQuery テーブルの構造化コンテキストを必要に応じて指定できます。詳細については、BigQuery データソースのデータ エージェント コンテキストを定義するをご覧ください。

bigquery_data_sources = {
    "bq": {
        "tableReferences": [
            {
                "projectId": "my_project_id",
                "datasetId": "my_dataset_id",
                "tableId": "my_table_id",
                "schema": {
                    "description": "my_table_description",
                    "fields": [{
                        "name": "my_column_name",
                        "description": "my_column_description"
                    }]
                }
            },
            {
                "projectId": "my_project_id_2",
                "datasetId": "my_dataset_id_2",
                "tableId": "my_table_id_2"
            },
            {
                "projectId": "my_project_id_3",
                "datasetId": "my_dataset_id_3",
                "tableId": "my_table_id_3"
            },
        ]
    }
}

サンプル値を次のように置き換えます。

  • my_project_id: 接続する BigQuery データセットとテーブルを含む Google Cloud プロジェクトの ID。公開データセットに接続するには、bigquery-public-data を指定します。
  • my_dataset_id: BigQuery データセットの ID。
  • my_table_id: BigQuery テーブルの ID。
  • my_table_description: テーブルの内容と目的の説明(省略可)。
  • my_column_name: 省略可能な説明を指定するテーブルの列の名前。
  • my_column_description: 列の内容と目的の説明(省略可)。

Looker Studio データに接続する

次のサンプルコードは、Looker Studio データソースへの接続を定義します。

looker_studio_data_source = {
    "studio":{
        "studio_references": [
            {
              "studio_datasource_id": "studio_datasource_id"
            }
        ]
    }
}

studio_datasource_id は、データソース ID に置き換えます。

データ エージェントを作成する

次のサンプルコードは、データ エージェントの作成エンドポイントに HTTP POST リクエストを送信してデータ エージェントを作成する方法を示しています。リクエスト ペイロードには次の詳細が含まれます。

  • エージェントの完全なリソース名。この値には、プロジェクト ID、ロケーション、およびエージェントの固有識別子が含まれます。
  • データ エージェントの説明。
  • データ エージェントのコンテキスト。システムの説明(初期設定と認証を構成するで定義)とエージェントが使用するデータソース(データソースに接続するで定義)を含みます。

リクエスト ペイロードに options パラメータを含めることで、Python を使用した高度な分析を有効にすることもできます。options パラメータと、会話用に構成できるオプションの詳細については、REST リソース: projects.locations.dataAgents をご覧ください。

data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents"

data_agent_id = "data_agent_1"

data_agent_payload = {
      "name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional
      "description": "This is the description of data_agent_1.", # Optional

      "data_analytics_agent": {
          "published_context": {
              "datasource_references": bigquery_data_sources,
              "system_instruction": system_instruction,
              # Optional: To enable advanced analysis with Python, include the following options block:
              "options": {
                  "analysis": {
                      "python": {
                          "enabled": True
                      }
                  }
              }
          }
      }
  }

params = {"data_agent_id": data_agent_id} # Optional

data_agent_response = requests.post(
    data_agent_url, params=params, json=data_agent_payload, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agent created successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error creating Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

サンプル値を次のように置き換えます。

  • data_agent_1: データ エージェントが一意に識別されるようにする識別子。この値は、エージェントのリソース名と data_agent_id URL クエリ パラメータとして使用されます。
  • This is the description of data_agent_1.: データ エージェントの説明。

会話を作成する

次のサンプルコードは、データ エージェントとの会話を作成する方法を示しています。

conversation_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/conversations"

data_agent_id = "data_agent_1"
conversation_id = "conversation_1"

conversation_payload = {
    "agents": [
        f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
    ],
    "name": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}"
}
params = {
    "conversation_id": conversation_id
}

conversation_response = requests.post(conversation_url, headers=headers, params=params, json=conversation_payload)

if conversation_response.status_code == 200:
    print("Conversation created successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error creating Conversation: {conversation_response.status_code}")
    print(conversation_response.text)

サンプル値を次のように置き換えます。

データ エージェントと会話を管理する

次のコードサンプルは、Conversational Analytics API を使用してデータ エージェントと会話を管理する方法を示しています。次のタスクに有用です。

データ エージェントを取得する

次のサンプルコードは、データ エージェント リソース URL に HTTP GET リクエストを送信して既存のデータ エージェントを取得する方法を示しています。

data_agent_id = "data_agent_1"
data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"

data_agent_response = requests.get(
    data_agent_url, headers=headers
)

if data_agent_response.status_code == 200:
    print("Fetched Data Agent successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error: {data_agent_response.status_code}")
    print(data_agent_response.text)

前の例で、data_agent_1 は取得するデータ エージェントの ID に置き換えます。

データ エージェントの一覧を取得する

次のコードは、dataAgents エンドポイントに HTTP GET リクエストを送信して、特定のプロジェクトのすべてのデータ エージェントを一覧表示する方法を示しています。

すべてのエージェントを一覧表示するには、プロジェクトに対する geminidataanalytics.dataAgents.list 権限が必要です。この権限を含む IAM ロールの詳細については、事前定義ロールのリストをご覧ください。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents"

data_agent_response = requests.get(
    data_agent_url, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agents Listed successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Listing Data Agents: {data_agent_response.status_code}")

YOUR-BILLING-PROJECT は、課金プロジェクトの ID に置き換えます。

アクセス可能なデータ エージェントの一覧を取得する

次のコードは、dataAgents:listAccessible エンドポイントに HTTP GET リクエストを送信して、特定のプロジェクトで使用可能なすべてのデータ エージェントを一覧表示する方法を示しています。

billing_project = "YOUR-BILLING-PROJECT"
creator_filter = "YOUR-CREATOR-FILTER"
location = "global"
data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents:listAccessible"

params = {
    "creator_filter": creator_filter
}

data_agent_response = requests.get(
    data_agent_url, headers=headers, params=params
)

if data_agent_response.status_code == 200:
    print("Accessible Data Agents Listed successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Listing Accessible Data Agents: {data_agent_response.status_code}")

サンプル値を次のように置き換えます。

  • YOUR-BILLING-PROJECT: 課金プロジェクトの ID。
  • YOUR-CREATOR-FILTER: データ エージェントの作成者に基づいて適用するフィルタ。有効な値は NONE(デフォルト)、CREATOR_ONLYNOT_CREATOR_ONLY です。

データ エージェントを更新する

次のサンプルコードは、データ エージェントのリソース URL に HTTP PATCH リクエストを送信してデータ エージェントを更新する方法を示しています。リクエスト ペイロードには、変更するフィールドの新しい値が含まれます。リクエスト パラメータには、更新するフィールドを指定する updateMask パラメータが含まれます。

data_agent_id = "data_agent_1"
billing_project = "YOUR-BILLING-PROJECT"
location = "global"

data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"

payload = {
    "description": "Updated description of the data agent.",
    "data_analytics_agent": {
        "published_context": {
            "datasource_references": bigquery_data_sources,
            "system_instruction": system_instruction
        }
    },
}

fields = ["description", "data_analytics_agent"]
params = {
    "updateMask": ",".join(fields)
}

data_agent_response = requests.patch(
    data_agent_url, headers=headers, params=params, json=payload
)

if data_agent_response.status_code == 200:
    print("Data Agent updated successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Updating Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

サンプル値を次のように置き換えます。

  • data_agent_1: 更新するデータ エージェントの ID。
  • YOUR-BILLING-PROJECT: 課金プロジェクトの ID。
  • Updated description of the data agent.: データ エージェントの新しい説明。

データ エージェントの IAM ポリシーを取得する

次のサンプルコードは、データ エージェントの URL に HTTP POST リクエストを送信して、データ エージェントの IAM ポリシーを取得する方法を示しています。リクエスト ペイロードにはデータ エージェントのパスが含まれます。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
data_agent_id = "data_agent_1"

data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:getIamPolicy"

# Request body
payload = {
    "resource": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
}

data_agent_response = requests.post(
    data_agent_url, headers=headers, json=payload
)

if data_agent_response.status_code == 200:
    print("IAM Policy fetched successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error fetching IAM policy: {data_agent_response.status_code}")
    print(data_agent_response.text)

サンプル値を次のように置き換えます。

  • YOUR-BILLING-PROJECT: 課金プロジェクトの ID。
  • data_agent_1: IAM ポリシーを取得するデータ エージェントの ID。

データ エージェントの IAM ポリシーを設定する

エージェントを共有するには、setIamPolicy メソッドを使用して、特定のエージェントのユーザーに IAM ロールを割り当てます。次のサンプルコードは、バインディングを含むペイロードを使用してデータ エージェントの URL に POST 呼び出しを行う方法を示しています。バインディングでは、どのユーザーにどのロールを割り当てるかを指定します。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
data_agent_id = "data_agent_1"
role = "roles/geminidataanalytics.dataAgentEditor"
users = "222larabrown@gmail.com, cloudysanfrancisco@gmail.com"

data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:setIamPolicy"

# Request body
payload = {
    "policy": {
        "bindings": [
            {
                "role": role,
                "members": [
                    f"user:{i.strip()}" for i in users.split(",")
                ]
            }
        ]
    }
}

data_agent_response = requests.post(
    data_agent_url, headers=headers, json=payload
)

if data_agent_response.status_code == 200:
    print("IAM Policy set successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error setting IAM policy: {data_agent_response.status_code}")
    print(data_agent_response.text)

サンプル値を次のように置き換えます。

  • YOUR-BILLING-PROJECT: 課金プロジェクトの ID。
  • data_agent_1: IAM ポリシーを設定するデータ エージェントの ID。
  • 222larabrown@gmail.com, cloudysanfrancisco@gmail.com: 指定したロールを付与するユーザーのメールアドレスのカンマ区切りのリスト。

データ エージェントを削除する

次のサンプルコードは、データ エージェントのリソース URL に HTTP DELETE リクエストを送信してデータ エージェントを削除する方法を示しています。削除(復元可能)とは、エージェントが削除されても、30 日以内であれば取得できることを意味します。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
data_agent_id = "data_agent_1"

data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"

data_agent_response = requests.delete(
    data_agent_url, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agent deleted successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Deleting Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

サンプル値を次のように置き換えます。

  • YOUR-BILLING-PROJECT: 課金プロジェクトの ID。
  • data_agent_1: 削除するデータ エージェントの ID。

会話を取得する

次のサンプルコードは、会話リソース URL に HTTP GET リクエストを送信して既存の会話を取得する方法を示しています。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
conversation_id = "conversation_1"

conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations/{conversation_id}"

conversation_response = requests.get(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation fetched successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while fetching conversation: {conversation_response.status_code}")
    print(conversation_response.text)

サンプル値を次のように置き換えます。

  • YOUR-BILLING-PROJECT: 課金プロジェクトの ID。
  • conversation_1: 取得する会話の ID。

スレッドのリストを取得する

次のサンプルコードは、conversations エンドポイントに HTTP GET リクエストを送信して、特定のプロジェクトの会話を一覧表示する方法を示しています。

デフォルトでは、このメソッドは作成した会話を返します。管理者(cloudaicompanion.topicAdmin IAM ロールを持つユーザー)は、プロジェクト内のすべての会話を表示できます。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations"

conversation_response = requests.get(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation fetched successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while fetching conversation: {conversation_response.status_code}")
    print(conversation_response.text)

YOUR-BILLING-PROJECT は、必要な API を有効にした課金プロジェクトの ID に置き換えます。

会話内のメッセージを一覧で取得する

次のサンプルコードは、会話の messages エンドポイントに HTTP GET リクエストを送信して、会話内のすべてのメッセージを一覧表示する方法を示しています。

メッセージを一覧表示するには、会話に対する cloudaicompanion.topics.get 権限が必要です。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"

conversation_id = "conversation_1"

conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations/{conversation_id}/messages"

conversation_response = requests.get(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation fetched successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while fetching conversation: {conversation_response.status_code}")
    print(conversation_response.text)

サンプル値を次のように置き換えます。

  • YOUR-BILLING-PROJECT: 課金プロジェクトの ID。
  • conversation_1: メッセージを一覧表示する会話の ID。

会話を削除する

次のサンプルコードは、会話リソース URL に HTTP DELETE リクエストを送信して会話を削除する方法を示しています。管理者(cloudaicompanion.topicAdmin Identity and Access Management ロールを持つユーザー)または cloudaicompanion.topics.delete Identity and Access Management 権限を持つユーザーは、プロジェクト内の会話を削除できます。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
conversation_id = "conversation_1"

conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations/{conversation_id}"

conversation_response = requests.delete(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation deleted successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while deleting conversation: {conversation_response.status_code}")
    print(conversation_response.text)

サンプル値を次のように置き換えます。

  • YOUR-BILLING-PROJECT: 課金プロジェクトの ID。
  • conversation_1: 削除する会話の ID。

API を使用して質問する

データ エージェント会話(ステートフル チャットの場合)を作成したら、エージェントにクエリを送信できます。

クエリを送信すると、API は Message オブジェクトのストリームを返します。このストリームには、テキスト、データテーブル、グラフなど、さまざまな種類のメッセージが含まれます。テキスト メッセージでは、エージェントの推論に関する分析情報を提供したり、進行状況を報告したり、最終的な回答を提供することができます。各テキスト メッセージの目的は、TextType 値で示されます。

  • THOUGHT: クエリへの回答方法を計画する際のエージェントの内部思考プロセスを示します。THOUGHT メッセージは、エージェントの推論と意思決定のプロセスに関する分析情報をステップごとに提供します。これは次の 2 つの部分から構成されています。parts[0] は思考の要約で、思考の全文を簡潔に要約したものです。parts[1] は思考の全文です。
  • PROGRESS: データの取得や呼び出されるツールなど、アクションに対するエージェントの進行状況をレポートします。この値は Looker データソースに対してのみ返され、次の 2 つの部分から構成されます。parts[0] は概要、parts[1] は進行状況の全文です。
  • FINAL_RESPONSE: クエリに対する最終的な回答を提供します。

次のセクションの例では、チャット レスポンスをストリーミングするヘルパー Python 関数で定義されているヘルパー関数を使用して、API のストリーミング レスポンス内の各メッセージを処理して表示します。Looker データソースを使用しているときに、これらのメッセージをユーザー インターフェースにレンダリングする方法については、Looker データソースのレスポンスをレンダリングするをご覧ください。

ステートフル チャットとステートレス チャットで質問する

API は、次の主なモードで操作できます。

  • ステートフル リクエスト: Google Cloud が会話履歴を保存して管理します。ステートフル チャットは、API が以前のメッセージのコンテキストを保持するため、本質的にマルチターンです。各ターンで現在のメッセージのみを送信します。
  • ステートレス チャット: アプリケーションが会話履歴を管理します。新しいメッセージには、関連する以前のメッセージを含める必要があります。ステートレス モードでマルチターン会話を管理する方法の詳細な例については、ステートレス マルチターン会話を作成するをご覧ください。

次の例は、:chat エンドポイントに POST リクエストを行うことで、ステートフル チャットとステートレス チャットに API を使用する方法を示しています。

ステートフル チャット

会話参照を含むステートフル チャット リクエストを送信する

次のサンプルコードは、前の手順で定義した会話を使用して API に質問する方法を示しています。このサンプルでは、get_stream ヘルパー関数を使用してレスポンスをストリーミングします。

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

data_agent_id = "data_agent_1"
conversation_id = "conversation_1"

# Construct the payload
chat_payload = {
    "parent": f"projects/{billing_project}/locations/global",
    "messages": [
        {
            "userMessage": {
                "text": "Make a bar graph for the top 5 states by the total number of airports"
            }
        }
    ],
    "conversation_reference": {
        "conversation": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}",
        "data_agent_context": {
            "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
            # "credentials": looker_credentials
        }
    }
}

# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)

サンプル値を次のように置き換えます。

  • data_agent_1: データ エージェントを作成するのサンプルコード ブロックで定義されているデータ エージェントの ID。
  • conversation_1: 会話の固有識別子。
  • サンプル プロンプトとして Make a bar graph for the top 5 states by the total number of airports を使用しました。

ステートレス チャット

次の例は、データ エージェント参照またはインライン コンテキストを使用してステートレス リクエストを送信する方法を示しています。

データ エージェント参照を含むステートレス チャット リクエストを送信する

次のサンプルコードは、前の手順で定義したデータ エージェントを使用して、API にステートレスな質問をする方法を示しています。このサンプルでは、get_stream ヘルパー関数を使用してレスポンスをストリーミングします。

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

data_agent_id = "data_agent_1"

# Construct the payload
chat_payload = {
    "parent": f"projects/{billing_project}/locations/global",
    "messages": [
        {
            "userMessage": {
                "text": "Make a bar graph for the top 5 states by the total number of airports"
            }
        }
    ],
    "data_agent_context": {
        "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
        # "credentials": looker_credentials
    }
}

# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)

サンプル値を次のように置き換えます。

  • data_agent_1: データ エージェントを作成するのサンプルコード ブロックで定義されているデータ エージェントの ID。
  • サンプル プロンプトとして Make a bar graph for the top 5 states by the total number of airports を使用しました。

インライン コンテキストを含むステートレス チャット リクエストを送信する

次のサンプルコードは、インライン コンテキストを使用して API にステートレスな質問をする方法を示しています。このサンプルでは、get_stream ヘルパー関数を使用してレスポンスをストリーミングし、BigQuery データソースを例として使用します。

リクエスト ペイロードに options パラメータを含めることで、Python を使用した高度な分析を有効にすることもできます。options パラメータと、会話用に構成できるオプションの詳細については、REST リソース: projects.locations.dataAgents をご覧ください。

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"

# Construct the payload
chat_payload = {
    "parent": f"projects/{billing_project}/locations/global",
    "messages": [
        {
            "userMessage": {
                "text": "Make a bar graph for the top 5 states by the total number of airports"
            }
        }
    ],
    "inline_context": {
        "datasource_references": bigquery_data_sources,
          # Optional: To enable advanced analysis with Python, include the following options block:
          "options": {
              "analysis": {
                  "python": {
                      "enabled": True
                  }
              }
          }
    }
}

# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)

ステートレス マルチターンの会話を作成する

ステートレスな会話でフォローアップの質問をするには、アプリケーションで新しいリクエストごとにメッセージ履歴全体を送信して、会話のコンテキストを管理する必要があります。以降のセクションでは、マルチターン会話を作成するためにヘルパー関数を定義して呼び出す方法について説明します。

マルチターンのリクエストを送信する

次の multi_turn_Conversation ヘルパー関数は、メッセージをリストに保存して会話コンテキストを管理します。これにより、前のターンを基にしたフォローアップの質問を送信できます。関数のペイロードで、データ エージェントを参照するか、インライン コンテキストを使用してデータソースを直接指定できます。

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"

# List that is used to track previous turns and is reused across requests
conversation_messages = []

data_agent_id = "data_agent_1"

# Helper function for calling the API
def multi_turn_Conversation(msg):

  userMessage = {
      "userMessage": {
          "text": msg
      }
  }

  # Send a multi-turn request by including previous turns and the new message
  conversation_messages.append(userMessage)

  # Construct the payload
  chat_payload = {
      "parent": f"projects/{billing_project}/locations/global",
      "messages": conversation_messages,
      # Use a data agent reference
      "data_agent_context": {
          "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
          # "credentials": looker_credentials
      },
      # Use inline context
      # "inline_context": {
      #     "datasource_references": bigquery_data_sources,
      # }
  }

  # Call the get_stream_multi_turn helper function to stream the response
  get_stream_multi_turn(chat_url, chat_payload, conversation_messages)

前の例で、data_agent_1 は、データ エージェントを作成するのサンプルコード ブロックで定義されているデータ エージェントの ID に置き換えます。

会話のターンごとに multi_turn_Conversation ヘルパー関数を呼び出すことができます。次のサンプルコードは、最初のリクエストを送信し、前のレスポンスを基にフォローアップ リクエストを送信する方法を示しています。

# Send first-turn request
multi_turn_Conversation("Which species of tree is most prevalent?")

# Send follow-up-turn request
multi_turn_Conversation("Can you show me the results as a bar chart?")

前の例では、サンプル値を次のように置き換えます。

  • Which species of tree is most prevalent?: データ エージェントに送信する自然言語の質問。
  • Can you show me the results as a bar chart?: 前の質問を基にする、または前の質問を絞り込むフォローアップの質問。

レスポンスを処理する

次の get_stream_multi_turn 関数は、ストリーミング API レスポンスを処理します。この関数は get_stream ヘルパー関数と似ていますが、レスポンスを conversation_messages リストに保存して、次のターンの会話コンテキストを保存します。

def get_stream_multi_turn(url, json, conversation_messages):
    s = requests.Session()

    acc = ''

    with s.post(url, json=json, headers=headers, stream=True) as resp:
        for line in resp.iter_lines():
            if not line:
                continue

            decoded_line = str(line, encoding='utf-8')

            if decoded_line == '[{':
                acc = '{'
            elif decoded_line == '}]':
                acc += '}'
            elif decoded_line == ',':
                continue
            else:
                acc += decoded_line

            if not is_json(acc):
                continue

            data_json = json_lib.loads(acc)
            # Store the response that will be used in the next iteration
            conversation_messages.append(data_json)

            if not 'systemMessage' in data_json:
                if 'error' in data_json:
                    handle_error(data_json['error'])
                continue

            if 'text' in data_json['systemMessage']:
                handle_text_response(data_json['systemMessage']['text'])
            elif 'schema' in data_json['systemMessage']:
                handle_schema_response(data_json['systemMessage']['schema'])
            elif 'data' in data_json['systemMessage']:
                handle_data_response(data_json['systemMessage']['data'])
            elif 'chart' in data_json['systemMessage']:
                handle_chart_response(data_json['systemMessage']['chart'])
            else:
                colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter())
                print(colored_json)
            print('\n')
            acc = ''

エンドツーエンドのコードサンプル

次の展開可能なコードサンプルには、このガイドで説明するすべてのタスクが含まれています。

HTTP と Python を使用してデータ エージェントを構築する

    import json
    import json as json_lib
    import textwrap

    import altair as alt
    import google.auth
    from google.auth.transport.requests import Request
    import IPython
    from IPython.display import display, HTML
    import pandas as pd
    from pygments import formatters, highlight, lexers
    import requests

    from google.colab import auth
    auth.authenticate_user()

    access_token = !gcloud auth application-default print-access-token
    headers = {
        "Authorization": f"Bearer {access_token[0]}",
        "Content-Type": "application/json",
        "x-server-timeout": "300", # Custom timeout up to 600s
    }

    ################### Data source details ###################

    billing_project = "your_billing_project"
    location = "global"
    system_instruction = "Help the user in analyzing their data"


    # BigQuery data source
    bigquery_data_sources = {
        "bq": {
        "tableReferences": [
            {
            "projectId": "bigquery-public-data",
            "datasetId": "san_francisco",
            "tableId": "street_trees"
            }
        ]
        }
    }

    # Looker data source
    looker_credentials = {
        "oauth": {
            "secret": {
            "client_id": "your_looker_client_id",
            "client_secret": "your_looker_client_secret",
            }
        }
    }

    # To use access_token for authentication, uncomment the following looker_credentials code block and comment out the previous looker_credentials code block.
    # looker_credentials = {
    #     "oauth": {
    #         "token": {
    #           "access_token": "your_looker_access_token",
    #         }
    #     }
    # }

    looker_data_source = {
        "looker": {
        "explore_references": [
            {
                "looker_instance_uri": looker_instance_uri,
                "lookml_model": "your_model",
                "explore": "your_explore",
            },
            {
                "looker_instance_uri": looker_instance_uri,
                "lookml_model": "your_model_2",
                "explore": "your_explore_2",
            },
            # Add up to 5 total Explore references
        ],
        # Do not include the following line during agent creation
        # "credentials": looker_credentials
    }

    # Looker Studio data source
    looker_studio_data_source = {
        "studio":{
            "studio_references": [
                {
                  "studio_datasource_id": "studio_datasource_id"
                }
            ]
        }
    }

    ################### Create data agent ###################
    data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents"

    data_agent_id = "data_agent_1"

    data_agent_payload = {
        "name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional
        "description": "This is the description of data_agent.", # Optional

        "data_analytics_agent": {
            "published_context": {
                "datasource_references": bigquery_data_sources,
                "system_instruction": system_instruction,
                # Optional: To enable advanced analysis with Python, include the following options block:
                "options": {
                    "analysis": {
                        "python": {
                            "enabled": True
                        }
                    }
                }
            }
        }
    }

    params = {"data_agent_id": data_agent_id} # Optional

    data_agent_response = requests.post(
        data_agent_url, params=params, json=data_agent_payload, headers=headers
    )

    if data_agent_response.status_code == 200:
        print("Data Agent created successfully!")
        print(json.dumps(data_agent_response.json(), indent=2))
    else:
        print(f"Error creating Data Agent: {data_agent_response.status_code}")
        print(data_agent_response.text)


    ################### Create conversation ###################

    conversation_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/conversations"

    data_agent_id = "data_agent_1"
    conversation_id = "conversation _1"

    conversation_payload = {
        "agents": [
            f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
        ],
        "name": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}"
    }
    params = {
        "conversation_id": conversation_id
    }

    conversation_response = requests.post(conversation_url, headers=headers, params=params, json=conversation_payload)

    if conversation_response.status_code == 200:
        print("Conversation created successfully!")
        print(json.dumps(conversation_response.json(), indent=2))
    else:
        print(f"Error creating Conversation: {conversation_response.status_code}")
        print(conversation_response.text)


    ################### Chat with the API by using conversation (stateful) ####################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

    data_agent_id = "data_agent_1"
    conversation_id = "conversation _1"

    # Construct the payload
    chat_payload = {
        "parent": f"projects/{billing_project}/locations/global",
        "messages": [
            {
                "userMessage": {
                    "text": "Make a bar graph for the top 5 states by the total number of airports"
                }
            }
        ],
        "conversation_reference": {
            "conversation": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}",
            "data_agent_context": {
                "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
                # "credentials": looker_credentials
            }
        }
    }

    # Call the get_stream function to stream the response
    get_stream(chat_url, chat_payload)

    ################### Chat with the API by using dataAgents (stateless) ####################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

    data_agent_id = "data_agent_1"

    # Construct the payload
    chat_payload = {
        "parent": f"projects/{billing_project}/locations/global",
        "messages": [
            {
                "userMessage": {
                    "text": "Make a bar graph for the top 5 states by the total number of airports"
                }
            }
        ],
        "data_agent_context": {
            "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
            # "credentials": looker_credentials
        }
    }

    # Call the get_stream function to stream the response
    get_stream(chat_url, chat_payload)

    ################### Chat with the API by using inline context (stateless) ####################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"

    # Construct the payload
    chat_payload = {
        "parent": f"projects/{billing_project}/locations/global",
        "messages": [
            {
                "userMessage": {
                    "text": "Make a bar graph for the top 5 states by the total number of airports"
                }
            }
        ],
        "inline_context": {
            "datasource_references": bigquery_data_sources,
            # Optional - if wanting to use advanced analysis with python
            "options": {
                "analysis": {
                    "python": {
                        "enabled": True
                    }
                }
            }
        }
    }

    # Call the get_stream function to stream the response
    get_stream(chat_url, chat_payload)

    ################### Multi-turn conversation ###################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"

    # List that is used to track previous turns and is reused across requests
    conversation_messages = []

    data_agent_id = "data_agent_1"

    # Helper function for calling the API
    def multi_turn_Conversation(msg):

      userMessage = {
          "userMessage": {
              "text": msg
          }
      }

      # Send a multi-turn request by including previous turns and the new message
      conversation_messages.append(userMessage)

      # Construct the payload
      chat_payload = {
          "parent": f"projects/{billing_project}/locations/global",
          "messages": conversation_messages,
          # Use a data agent reference
          "data_agent_context": {
              "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
              # "credentials": looker_credentials
          },
          # Use inline context
          # "inline_context": {
          #     "datasource_references": bigquery_data_sources,
          # }
      }

      # Call the get_stream_multi_turn helper function to stream the response
      get_stream_multi_turn(chat_url, chat_payload, conversation_messages)

    # Send first-turn request
    multi_turn_Conversation("Which species of tree is most prevalent?")

    # Send follow-up-turn request
    multi_turn_Conversation("Can you show me the results as a bar chart?")
    

次の展開可能なコードサンプルには、チャット レスポンスのストリーミングに使用される Python ヘルパー関数が含まれています。

チャット レスポンスをストリーミングするヘルパー Python 関数

    def is_json(str):
      try:
          json_object = json_lib.loads(str)
      except ValueError as e:
          return False
      return True

    def handle_text_response(resp):
      parts = resp['parts']
      full_text = "".join(parts)
      if "\n" not in full_text and len(full_text) > 80:
        wrapped_text = textwrap.fill(full_text, width=80)
        print(wrapped_text)
      else:
        print(full_text)

    def get_property(data, field_name, default = ''):
      return data[field_name] if field_name in data else default

    def display_schema(data):
      fields = data['fields']
      df = pd.DataFrame({
        "Column": map(lambda field: get_property(field, 'name'), fields),
        "Type": map(lambda field: get_property(field, 'type'), fields),
        "Description": map(lambda field: get_property(field, 'description', '-'), fields),
        "Mode": map(lambda field: get_property(field, 'mode'), fields)
      })
      display(df)

    def display_section_title(text):
      display(HTML('<h2>{}</h2>'.format(text)))

    def format_bq_table_ref(table_ref):
      return '{}.{}.{}'.format(table_ref['projectId'], table_ref['datasetId'], table_ref['tableId'])

    def format_looker_table_ref(table_ref):
      return 'lookmlModel: {}, explore: {}, lookerInstanceUri: {}'.format(table_ref['lookmlModel'], table_ref['explore'], table_ref['lookerInstanceUri'])

    def display_datasource(datasource):
      source_name = ''

      if 'studioDatasourceId' in datasource:
        source_name = datasource['studioDatasourceId']
      elif 'lookerExploreReference' in datasource:
        source_name = format_looker_table_ref(datasource['lookerExploreReference'])
      else:
        source_name = format_bq_table_ref(datasource['bigqueryTableReference'])

      print(source_name)
      display_schema(datasource['schema'])

    def handle_schema_response(resp):
      if 'query' in resp:
        print(resp['query']['question'])
      elif 'result' in resp:
        display_section_title('Schema resolved')
        print('Data sources:')
        for datasource in resp['result']['datasources']:
          display_datasource(datasource)

    def handle_data_response(resp):
      if 'query' in resp:
        query = resp['query']
        display_section_title('Retrieval query')
        print('Query name: {}'.format(query['name']))
        if 'question' in query:
          print('Question: {}'.format(query['question']))
        if 'datasources' in query:
          print('Data sources:')
          for datasource in query['datasources']:
            display_datasource(datasource)
      elif 'generatedSql' in resp:
        display_section_title('SQL generated')
        print(resp['generatedSql'])
      elif 'result' in resp:
        display_section_title('Data retrieved')

        fields = map(lambda field: get_property(field, 'name'), resp['result']['schema']['fields'])
        dict = {}

        for field in fields:
          dict[field] = map(lambda el: get_property(el, field), resp['result']['data'])

        display(pd.DataFrame(dict))

    def handle_chart_response(resp):
      if 'query' in resp:
        print(resp['query']['instructions'])
      elif 'result' in resp:
        vegaConfig = resp['result']['vegaConfig']
        alt.Chart.from_json(json_lib.dumps(vegaConfig)).display();

    def handle_error(resp):
      display_section_title('Error')
      print('Code: {}'.format(resp['code']))
      print('Message: {}'.format(resp['message']))

    def get_stream(url, json):
      s = requests.Session()

      acc = ''

      with s.post(url, json=json, headers=headers, stream=True) as resp:
        for line in resp.iter_lines():
          if not line:
            continue

          decoded_line = str(line, encoding='utf-8')

          if decoded_line == '[{':
            acc = '{'
          elif decoded_line == '}]':
            acc += '}'
          elif decoded_line == ',':
            continue
          else:
            acc += decoded_line

          if not is_json(acc):
            continue

          data_json = json_lib.loads(acc)

          if not 'systemMessage' in data_json:
            if 'error' in data_json:
                handle_error(data_json['error'])
            continue

          if 'text' in data_json['systemMessage']:
            handle_text_response(data_json['systemMessage']['text'])
          elif 'schema' in data_json['systemMessage']:
            handle_schema_response(data_json['systemMessage']['schema'])
          elif 'data' in data_json['systemMessage']:
            handle_data_response(data_json['systemMessage']['data'])
          elif 'chart' in data_json['systemMessage']:
            handle_chart_response(data_json['systemMessage']['chart'])
          else:
            colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter())
            print(colored_json)
            print('\n')
            acc = ''

    def get_stream_multi_turn(url, json, conversation_messages):
        s = requests.Session()

        acc = ''

        with s.post(url, json=json, headers=headers, stream=True) as resp:
            for line in resp.iter_lines():
                if not line:
                    continue

                decoded_line = str(line, encoding='utf-8')

                if decoded_line == '[{':
                    acc = '{'
                elif decoded_line == '}]':
                    acc += '}'
                elif decoded_line == ',':
                    continue
                else:
                    acc += decoded_line

                if not is_json(acc):
                    continue

                data_json = json_lib.loads(acc)
                # Store the response that will be used in the next iteration
                conversation_messages.append(data_json)

                if not 'systemMessage' in data_json:
                    if 'error' in data_json:
                        handle_error(data_json['error'])
                    continue

                if 'text' in data_json['systemMessage']:
                    handle_text_response(data_json['systemMessage']['text'])
                elif 'schema' in data_json['systemMessage']:
                    handle_schema_response(data_json['systemMessage']['schema'])
                elif 'data' in data_json['systemMessage']:
                    handle_data_response(data_json['systemMessage']['data'])
                elif 'chart' in data_json['systemMessage']:
                    handle_chart_response(data_json['systemMessage']['chart'])
                else:
                    colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter())
                    print(colored_json)
                print('\n')
                acc = ''