HTTP と Python を使用してデータ エージェントを構築する

このページでは、Python を使用して会話分析 APIgeminidataanalytics.googleapis.com を介してアクセス)に HTTP リクエストを行う方法について説明します。

このページの Python コードサンプルは、次のタスクを行う方法を示しています。

このページの最後に、サンプルコードの完全版と、API レスポンスのストリーミングに使用されるヘルパー関数が記載されています。

初期設定と認証を構成する

以下の Python サンプルコードは、次のタスクを実行します。

  • 必要な Python ライブラリをインポートする
  • Google Cloud CLI を使用して HTTP 認証用のアクセス トークンを取得する
  • 課金プロジェクト、ロケーション、システム指示の変数を定義する
import json
import json as json_lib
import textwrap

import altair as alt
import google.auth
from google.auth.transport.requests import Request
import IPython
from IPython.display import display, HTML
import pandas as pd
from pygments import highlight, lexers, formatters
import requests

from google.colab import auth
auth.authenticate_user()

access_token = !gcloud auth application-default print-access-token
headers = {
        "Authorization": f"Bearer {access_token[0]}",
        "Content-Type": "application/json",
        "x-server-timeout": "300", # Custom timeout up to 600s
}

billing_project = 'YOUR-BILLING-PROJECT'
location = "global"
system_instruction = 'YOUR-SYSTEM-INSTRUCTIONS'

サンプル値を次のように置き換えます。

  • YOUR-BILLING-PROJECT: 必要な API を有効にした課金プロジェクトの ID。
  • YOUR-SYSTEM-INSTRUCTIONS: エージェントの動作をガイドし、データのニーズに合わせてカスタマイズするためのシステム指示。たとえば、システム指示を使用してビジネス用語を定義したり、回答の長さを制御したり、データの形式を設定できます。理想的には、効果的なシステム指示を記述するで推奨されている YAML 形式でシステム指示を定義し、構造化された詳細なガイダンスを提供します。

Looker で認証を行う

Looker データソースに接続する場合は、Looker インスタンスで認証を行う必要があります。

API キーの使用

次の Python コードサンプルは、API キーを使用してエージェントを Looker インスタンスに対して認証する方法を示しています。

looker_credentials = {
    "oauth": {
        "secret": {
            "client_id": "YOUR-LOOKER-CLIENT-ID",
            "client_secret": "YOUR-LOOKER-CLIENT-SECRET",
        }
    }
}

サンプル値を次のように置き換えます。

  • YOUR-LOOKER-CLIENT-ID: 生成された Looker API キーのクライアント ID。
  • YOUR-LOOKER-CLIENT-SECRET: 生成された Looker API キーのクライアント シークレット。

アクセス トークンを使用する

次の Python コードサンプルは、アクセス トークンを使用してエージェントを Looker インスタンスに対して認証する方法を示しています。

looker_credentials = {
    "oauth": {
        "token": {
            "access_token": "YOUR-TOKEN",
        }
    }
}

サンプル値を次のように置き換えます。

  • YOUR-TOKEN: Looker の認証用に生成する access_token 値。

AlloyDB for PostgreSQL に対する認証を行う

AlloyDB データソースに接続する場合は、gcloud CLI を使用して AlloyDB インスタンスで認証を行います。

  1. AlloyDB インスタンスで alloydb.iam_authentication フラグを有効にして、AlloyDB で Identity and Access Management(IAM)認証を有効にします。
  2. IAM ロールを付与します。Conversational Analytics API を呼び出すプリンシパル(ユーザーまたはサービス アカウント)には、Google Cloud プロジェクトで適切な IAM 権限が必要です。通常、これには alloydb.databaseUser などのロールが含まれます。
  3. 対応するデータベース ユーザーを作成します。ユーザーまたはサービス アカウントのメールアドレスをユーザー名として使用して、IAM プリンシパルをミラーリングする AlloyDB ユーザーを作成します。

詳細については、IAM データベース認証をご覧ください。

Cloud SQL for MySQL と Cloud SQL for PostgreSQL に対する認証

Cloud SQL for MySQL または Cloud SQL for PostgreSQL データソースに接続する場合は、Cloud SQL インスタンスに対する認証を行う必要があります。

  1. IAM データベース認証を有効にします。Cloud SQL for MySQL インスタンスで cloudsql.iam_authentication フラグが on に設定されていることを確認します。これは、インスタンスの作成時または既存のインスタンスのパッチ適用時に追加できます。

    gcloud sql instances patch INSTANCE_NAME --database-flags
    cloudsql.iam_authentication=on
    
  2. 必要な IAM ロールを付与します。

    接続を試みるプリンシパル(ユーザーまたはサービス アカウント)には、プロジェクトに対する cloudsql.instances.logincloudsql.instances.connect の IAM 権限が必要です。これらのロールと権限を付与するには、次のいずれかを行います。

    1. roles/cloudsql.instanceUser ロールを付与します。これには、インスタンスにログインする権限(cloudsql.instances.login)と、プロキシを使用して接続する権限(cloudsql.instances.connect)が含まれます。
    2. または、少なくとも cloudsql.instances.logincloudsql.instances.connect を含むカスタムロールを付与します。
  3. IAM プリンシパルをデータベース ユーザーとして追加します。IAM プリンシパルのメールアドレスに対応するユーザーを Cloud SQL インスタンスに作成します。

    サービス アカウントの場合は、次のコマンドを実行します。

    gcloud sql users create SERVICE_ACCOUNT_EMAIL --instance=INSTANCE_NAME
    --type=cloud_iam_service_account
    

    ユーザー アカウントの場合は、次のコマンドを実行します。

    gcloud sql users create USER_EMAIL --instance=INSTANCE_NAME
    --type=cloud_iam_user
    
  4. データベース権限を付与します。IAM で接続すると、ユーザーはデータベースに対して認証されますが、データベースで権限が自動的に付与されることはありません。標準の PostgreSQL コマンド(GRANT など)を使用して、この新しいデータベース ユーザーにテーブルやスキーマなどの特定のオブジェクトに対する権限を付与する必要があります。

  5. デフォルトの postgres ユーザーなどのスーパーユーザーとして接続し、次のコマンドを実行します。

    -- Example: Connect using psql as the 'postgres' user
    
    GRANT SELECT ON ALL TABLES IN SCHEMA public TO
    "user-or-service-account@example.com";
    
  6. IAM 認証を使用して接続します。

    通常、クライアントは Cloud SQL Auth Proxy またはコネクタ ライブラリを介して接続し、トークンの交換を処理します。データベース ユーザー名は、IAM プリンシパルのメールアドレスです。パスワードは使用されません。代わりに、通常は Cloud SQL Auth Proxy または Google Cloud CLI によって自動的に処理される OAuth 2.0 トークンが使用されます。

    次の例では、psql と gcloud CLI を使用してトークンを生成します。

    PGPASSWORD=$(gcloud sql generate-login-token) psql --host=HOST_IP \
    --username=IAM_PRINCIPAL_EMAIL --dbname=DATABASE_NAME\
    

    詳細については、IAM 認証をご覧ください。

    サンプル値を次のように置き換えます。

    • INSTANCE_NAME: IAM データベース認証を有効にするためにパッチを適用する Cloud SQL インスタンスの ID。
    • SERVICE_ACCOUNT_EMAIL: サービス アカウントのメールアドレス。
    • USER_EMAIL: ユーザー アカウントのメールアドレス。
    • HOST_IP: Cloud SQL インスタンスの IP アドレス。
    • IAM_PRINCIPAL_EMAIL: IAM プリンシパル(サービス アカウントやユーザーなど)のメールアドレス。
    • DATABASE_NAME: 接続先のデータベースの名前。

Spanner で認証する

Spanner データソースに接続する場合は、Spanner インスタンスで認証を行う必要があります。

  1. IAM ロールを付与します。Conversational Analytics API を呼び出すプリンシパル(ユーザーまたはサービス アカウント)には、Google Cloud プロジェクトで適切な IAM 権限が必要です。通常、これには databaseUser などのロールが含まれます。
  2. データ定義言語(DDL)を使用して、Spanner に対応する抽象データベース ロールを作成します。
  3. IAM ポリシーを使用して、IAM プリンシパルがこれらのデータベース ロールを引き受けるようにします。

詳細については、きめ細かいアクセス制御の権限をご覧ください。

データソースに接続する

以降のセクションでは、エージェントのデータソースの接続の詳細を定義する方法について説明します。エージェントは、次の方法でデータに接続できます。

Looker データに接続する

次のサンプルコードは、Looker Explore への接続を定義します。Looker インスタンスとの接続を確立するには、会話分析 API を使用してデータソースの認証と接続を行うで説明されているように、Looker API キーが生成されていることを確認します。Conversational Analytics API を使用すると、一度に最大 5 つの Looker Explore に接続できます。

Looker データソースに接続する場合は、次の点に注意してください。

  • 会話で、含まれている Explore をクエリできます。
  • エージェントが一度にクエリできる Explore は 1 つのみです。複数のデータ探索にまたがって同時にクエリを実行することはできません。
  • エージェントは、同じ会話で複数の探索に対してクエリを実行できます。
  • エージェントは、複数の部分から構成される質問を含む会話や、フォローアップの質問を含む会話で、複数の探索をクエリできます。

    たとえば、ユーザーが 2 つの Explore(cat-exploredog-explore)を接続します。ユーザーが「猫の数と犬の数ではどちらが多いですか?」という質問を入力します。これにより、cat-explore の猫の数をカウントするクエリと、dog-explore の犬の数をカウントするクエリの 2 つが作成されます。エージェントは、両方のクエリを完了した後、両方のクエリの数値を比較します。

次のサンプルコードは、複数の Looker Explore への接続を定義します。エージェントのパフォーマンスを向上させるために、Explore の構造化コンテキストとしてゴールデン クエリを必要に応じて指定できます。詳細については、Looker データソースのデータ エージェント コンテキストを定義するをご覧ください。

looker_instance_uri = "https://your_company.looker.com"
looker_data_source = {
    "looker": {
        "explore_references": [
            {
                "looker_instance_uri": looker_instance_uri,
                "lookml_model": "your_model",
                "explore": "your_explore",
            },
            {
                "looker_instance_uri": looker_instance_uri,
                "lookml_model": "your_model_2",
                "explore": "your_explore_2",
            },
            # Add up to 5 total Explore references
        ],
       # Do not include the following line during agent creation
       "credentials": looker_credentials
    }
}

サンプル値を次のように置き換えます。

  • https://your_company.looker.com: Looker インスタンスの完全な URL。
  • your_model: 接続する Explore を含む LookML モデルの名前。
  • your_explore: データ エージェントがクエリを実行する Looker Explore の名前。
  • your_model_2: 接続する Explore を含む 2 番目の LookML モデルの名前。この変数は、最大 5 つの Explore の追加モデルに対して繰り返すことができます。
  • your_explore_2: データ エージェントがクエリを実行する追加の Looker Explore の名前。この変数を繰り返して、最大 5 つの探索を含めることができます。

BigQuery データに接続する

Conversational Analytics API では、接続できる BigQuery テーブルの数に上限はありません。ただし、多数のテーブルに接続すると、精度が低下したり、モデルの入力トークン上限を超えたりする可能性があります。

次のサンプルコードは、複数の BigQuery テーブルへの接続を定義し、オプションの構造化コンテキスト フィールドの例を示しています。エージェントのパフォーマンスを向上させるために、テーブルと列の説明、同義語、タグ、クエリの例など、BigQuery テーブルの構造化コンテキストを必要に応じて指定できます。詳細については、BigQuery データソースのデータ エージェント コンテキストを定義するをご覧ください。

bigquery_data_sources = {
    "bq": {
        "tableReferences": [
            {
                "projectId": "my_project_id",
                "datasetId": "my_dataset_id",
                "tableId": "my_table_id",
                "schema": {
                    "description": "my_table_description",
                    "fields": [{
                        "name": "my_column_name",
                        "description": "my_column_description"
                    }]
                }
            },
            {
                "projectId": "my_project_id_2",
                "datasetId": "my_dataset_id_2",
                "tableId": "my_table_id_2"
            },
            {
                "projectId": "my_project_id_3",
                "datasetId": "my_dataset_id_3",
                "tableId": "my_table_id_3"
            },
        ]
    }
}

サンプル値を次のように置き換えます。

  • my_project_id: 接続する BigQuery データセットとテーブルを含む Google Cloud プロジェクトの ID。公開データセットに接続するには、bigquery-public-data を指定します。
  • my_dataset_id: BigQuery データセットの ID。
  • my_table_id: BigQuery テーブルの ID。
  • my_table_description: テーブルの内容と目的の説明(省略可)。
  • my_column_name: 省略可能な説明を指定するテーブルの列の名前。
  • my_column_description: 列の内容と目的の説明(省略可)。

Looker Studio データに接続する

次のサンプルコードは、Looker Studio データソースへの接続を定義します。

looker_studio_data_source = {
    "studio":{
        "studio_references": [
            {
              "studio_datasource_id": "studio_datasource_id"
            }
        ]
    }
}

studio_datasource_id は、データソース ID に置き換えます。

AlloyDB データに接続する

会話型分析 API は、alloydb フィールドの database_reference フィールドを使用して AlloyDB クラスタに接続します。

次の例では、AlloyDB データベースへの接続を定義しています。

alloydb_data_source = {
    "alloydb": {
        "database_reference":
          {
            "project_id":"PROJECT_ID",
            "region":"REGION",
            "cluster_id":"CLUSTER_ID",
            "instance_id":"INSTANCE_ID",
            "database_id":"DATABASE",
            "table_ids":["TABLE_1_ID", "TABLE_2_ID"]
          }
          # Optional: Include this if you have pre-authored context for the agent
          # "agent_context_reference": {
          #     "context_set_id": f"projects/billing_project/locations/location/contextSets/your_context_set_id"
          # }

    }
}

サンプル値を次のように置き換えます。

  • PROJECT_ID: AlloyDB クラスタを含む Google Cloud プロジェクトの ID。
  • REGION: AlloyDB クラスタが存在するリージョン(us-central1 など)。
  • CLUSTER_ID: AlloyDB クラスタの ID。
  • INSTANCE_ID: AlloyDB インスタンスの ID。
  • DATABASE: ターゲット データベースの名前(例: financial)。
  • TABLE_1_IDTABLE_2_ID: 省略可。データ エージェントが使用することを推奨するデータベース内のテーブルのリスト(例: "loan""client""disp")。
  • billing_project: 必要な API を有効にした課金プロジェクトの ID。
  • location: コンテキスト セットが存在するロケーション(例: global)。
  • your_context_set_id: エージェントの高度なコンテキストを作成した場合は、コンテキスト セットの ID。

Cloud SQL for MySQL と Cloud SQL for PostgreSQL のデータに接続する

会話型分析 API は、sql フィールドの database_reference フィールドを使用して Cloud SQL インスタンスに接続します。

次の例では、Cloud SQL for MySQL または Cloud SQL for PostgreSQL データベースへの接続を定義します。

sql_data_source = {
    "cloud_sql_reference": {
        "database_reference":
          {
            "project_id":"PROJECT_ID",
            "region":"REGION",
            "cluster_id":"CLUSTER_ID",
            "instance_id":"INSTANCE_ID",
            "database_id":"DATABASE_ID",
            "table_ids":["TABLE_1_ID", "TABLE_2_ID"]
          }
          # Optional: Include this if you have pre-authored context for the agent
          # "agent_context_reference": {
          #     "context_set_id": f"projects/billing_project/locations/location/contextSets/your_context_set_id"
          # }
    }
}

サンプル値を次のように置き換えます。

  • PROJECT_ID: Cloud SQL インスタンスを含む Google Cloud プロジェクトの ID。
  • REGION: Cloud SQL インスタンスが存在するリージョン(例: us-central1)。
  • CLUSTER_ID: Cloud SQL クラスタの ID。
  • INSTANCE_ID: Cloud SQL インスタンスの ID。
  • DATABASE_ID: ターゲット データベースの名前(例: financial)。
  • TABLE_1_IDTABLE_2_ID: 省略可。データ エージェントが使用することを推奨するデータベース内のテーブルのリスト(例: "loan""client""disp")。
  • billing_project: 必要な API を有効にした課金プロジェクトの ID。
  • location: コンテキスト セットが存在するロケーション(例: global)。
  • your_context_set_id: エージェントの高度なコンテキストを作成した場合は、コンテキスト セットの ID。

Spanner データに接続する

会話型分析 API は、spanner フィールドの database_reference フィールドを使用して Spanner インスタンスに接続します。

次の例では、Spanner データベースへの接続を定義します。

spanner_data_sources = {
    "spanner_reference": {
        "database_reference": {
            "project_id": "PROJECT_ID",
            "region": location,
            "engine": GOOGLE_SQL,
            "instance_id": "INSTANCE_ID",
            "database_id": "DATABASE",
            "table_ids":["TABLE_1_ID", "TABLE_2_ID"]
        },
        # Optional: Include this if you have pre-authored context for the agent
        # "agent_context_reference": {
        #     "context_set_id": f"projects/billing_project/locations/location/contextSets/your_context_set_id"
        # }
    }
}

サンプル値を次のように置き換えます。

  • PROJECT_ID: Spanner インスタンスが含まれている Google Cloud プロジェクトの ID。
  • INSTANCE_ID: Spanner インスタンスの ID。
  • DATABASE: ターゲット データベースの名前(例: financial)。
  • TABLE_1_IDTABLE_2_ID: データ エージェントが使用することを推奨するデータベース内のテーブルのリスト("loan""client""disp" など)。
  • billing_project: 必要な API を有効にした課金プロジェクトの ID。
  • location: コンテキスト セットが存在するロケーション(例: global)。
  • your_context_set_id: エージェントの高度なコンテキストを作成した場合は、コンテキスト セットの ID。

データ エージェントを作成する

次のサンプルコードは、データ エージェントの作成エンドポイントに HTTP POST リクエストを送信して、データ エージェントを同期または非同期で作成する方法を示しています。リクエスト ペイロードには次の詳細が含まれます。

  • エージェントの完全なリソース名。この値には、プロジェクト ID、ロケーション、およびエージェントの固有識別子が含まれます。
  • データ エージェントの説明。
  • データ エージェントのコンテキスト。システムの説明(初期設定と認証を構成するで定義)とエージェントが使用するデータソース(データソースに接続するで定義)を含みます。

必要に応じて、作成時に kms_key を指定して、顧客管理の暗号鍵(CMEK)でデータ エージェントを保護できます。CMEK は Looker データソースでのみサポートされています。詳細については、顧客管理の暗号鍵(CMEK)をご覧ください。

リクエスト ペイロードに options パラメータを含めることで、Python を使用した高度な分析を有効にすることもできます。options パラメータと、会話用に構成できるオプションの詳細については、REST リソース: projects.locations.dataAgents をご覧ください。

同期

data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents:createSync"

data_agent_id = "data_agent_1"

# Optional: If using CMEK, replace the empty strings with your KMS key details.
kms_project_id = ""  # KMS_PROJECT_ID (Defaults to billing_project if empty)
key_ring_name = ""   # KEY_RING_NAME
key_name = ""        # KEY_NAME

data_agent_payload = {
      "name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional
      "description": "This is the description of data_agent_1.", # Optional
      "data_analytics_agent": {
          "published_context": {
              "datasource_references": bigquery_data_sources,
              "system_instruction": system_instruction,
              # Optional: To enable advanced analysis with Python, include the following options block:
              "options": {
                  "analysis": {
                      "python": {
                          "enabled": True
                      }
                  }
              }
          }
      }
  }

# If key details are provided, construct and add the kms_key field.
if key_ring_name and key_name:
  if not kms_project_id:
    kms_project_id = billing_project
  data_agent_payload["kms_key"] = f"projects/{kms_project_id}/locations/{location}/keyRings/{key_ring_name}/cryptoKeys/{key_name}"

params = {"data_agent_id": data_agent_id} # Optional

data_agent_response = requests.post(
    data_agent_url, params=params, json=data_agent_payload, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agent created successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error creating Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

非同期

data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents"

data_agent_id = "data_agent_1"

# Optional: If using CMEK, replace the empty strings with your KMS key details.
kms_project_id = ""  # KMS_PROJECT_ID (Defaults to billing_project if empty)
key_ring_name = ""   # KEY_RING_NAME
key_name = ""        # KEY_NAME

data_agent_payload = {
      "name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional
      "description": "This is the description of data_agent_1.", # Optional
      "data_analytics_agent": {
          "published_context": {
              "datasource_references": bigquery_data_sources,
              "system_instruction": system_instruction,
              # Optional: To enable advanced analysis with Python, include the following options block:
              "options": {
                  "analysis": {
                      "python": {
                          "enabled": True
                      }
                  }
              }
          }
      }
  }

# If key details are provided, construct and add the kms_key field.
if key_ring_name and key_name:
  if not kms_project_id:
    kms_project_id = billing_project
  data_agent_payload["kms_key"] = f"projects/{kms_project_id}/locations/{location}/keyRings/{key_ring_name}/cryptoKeys/{key_name}"

params = {"data_agent_id": data_agent_id} # Optional

data_agent_response = requests.post(
    data_agent_url, params=params, json=data_agent_payload, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agent created successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error creating Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

サンプル値を次のように置き換えます。

  • data_agent_1: データ エージェントが一意に識別されるようにする識別子。この値は、エージェントのリソース名と data_agent_id URL クエリ パラメータとして使用されます。
  • KMS_PROJECT_ID: CMEK を使用している場合は、鍵がホストされているプロジェクト ID。指定しない場合、デフォルトは課金プロジェクトになります。
  • KEY_RING_NAME: CMEK を使用している場合は、Cloud KMS キーリングの名前。
  • KEY_NAME: CMEK を使用している場合は、Cloud KMS 鍵の名前。
  • This is the description of data_agent_1.: データ エージェントの説明。

会話を作成する

次のサンプルコードは、データ エージェントとの会話を作成する方法を示しています。

必要に応じて、作成時に kms_key を指定して、CMEK で会話を保護できます。CMEK は Looker データソースでのみサポートされています。詳細については、顧客管理の暗号鍵(CMEK)をご覧ください。

conversation_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/conversations"

data_agent_id = "data_agent_1"
conversation_id = "conversation_1"

# Optional: If using CMEK, replace the empty strings with your KMS key details.
kms_project_id = ""  # KMS_PROJECT_ID (Defaults to billing_project if empty)
key_ring_name = ""   # KEY_RING_NAME
key_name = ""        # KEY_NAME

conversation_payload = {
    "agents": [
        f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
    ],
    "name": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}",
}

# If key details are provided, construct and add the kms_key field.
if key_ring_name and key_name:
  if not kms_project_id:
    kms_project_id = billing_project
  conversation_payload["kms_key"] = f"projects/{kms_project_id}/locations/{location}/keyRings/{key_ring_name}/cryptoKeys/{key_name}"

params = {
    "conversation_id": conversation_id
}

conversation_response = requests.post(conversation_url, headers=headers, params=params, json=conversation_payload)

if conversation_response.status_code == 200:
    print("Conversation created successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error creating Conversation: {conversation_response.status_code}")
    print(conversation_response.text)

サンプル値を次のように置き換えます。

  • data_agent_1: データ エージェントを作成するのサンプルコード ブロックで定義されているデータ エージェントの ID。
  • conversation_1: 会話の固有識別子。
  • KMS_PROJECT_ID: CMEK を使用している場合は、鍵がホストされているプロジェクト ID。指定しない場合、デフォルトは課金プロジェクトになります。
  • KEY_RING_NAME: CMEK を使用している場合は、Cloud KMS キーリングの名前。
  • KEY_NAME: CMEK を使用している場合は、Cloud KMS 鍵の名前。

データ エージェントと会話を管理する

次のコードサンプルは、Conversational Analytics API を使用してデータ エージェントと会話を管理する方法を示しています。次のタスクに有用です。

データ エージェントを取得する

次のサンプルコードは、データ エージェント リソース URL に HTTP GET リクエストを送信して既存のデータ エージェントを取得する方法を示しています。

data_agent_id = "data_agent_1"
data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"

data_agent_response = requests.get(
    data_agent_url, headers=headers
)

if data_agent_response.status_code == 200:
    print("Fetched Data Agent successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error: {data_agent_response.status_code}")
    print(data_agent_response.text)

前の例で、data_agent_1 は取得するデータ エージェントの ID に置き換えます。

データ エージェントの一覧を取得する

次のコードは、dataAgents エンドポイントに HTTP GET リクエストを送信して、特定のプロジェクトのすべてのデータ エージェントを一覧表示する方法を示しています。

すべてのエージェントを一覧表示するには、プロジェクトに対する geminidataanalytics.dataAgents.list 権限が必要です。この権限を含む IAM ロールの詳細については、事前定義ロールのリストをご覧ください。

billing_project = "YOUR-BILLING-PROJECT"
data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents"

data_agent_response = requests.get(
    data_agent_url, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agents Listed successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Listing Data Agents: {data_agent_response.status_code}")

YOUR-BILLING-PROJECT は、課金プロジェクトの ID に置き換えます。

アクセス可能なデータ エージェントの一覧を取得する

次のコードは、dataAgents:listAccessible エンドポイントに HTTP GET リクエストを送信して、特定のプロジェクトで使用可能なすべてのデータ エージェントを一覧表示する方法を示しています。

billing_project = "YOUR-BILLING-PROJECT"
creator_filter = "YOUR-CREATOR-FILTER"
data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents:listAccessible"

params = {
    "creator_filter": creator_filter
}

data_agent_response = requests.get(
    data_agent_url, headers=headers, params=params
)

if data_agent_response.status_code == 200:
    print("Accessible Data Agents Listed successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Listing Accessible Data Agents: {data_agent_response.status_code}")

サンプル値を次のように置き換えます。

  • YOUR-BILLING-PROJECT: 課金プロジェクトの ID。
  • YOUR-CREATOR-FILTER: データ エージェントの作成者に基づいて適用するフィルタ。有効な値は NONE(デフォルト)、CREATOR_ONLYNOT_CREATOR_ONLY です。

データ エージェントを更新する

次のサンプルコードは、データ エージェントのリソース URL に HTTP PATCH リクエストを送信して、データ エージェントを同期または非同期で更新する方法を示しています。リクエスト ペイロードには、変更するフィールドの新しい値が含まれます。リクエスト パラメータには、更新するフィールドを指定する updateMask パラメータが含まれます。

同期

data_agent_id = "data_agent_1"
billing_project = "YOUR-BILLING-PROJECT"

data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:updateSync"

payload = {
    "description": "Updated description of the data agent.",
    "data_analytics_agent": {
        "published_context": {
            "datasource_references": bigquery_data_sources,
            "system_instruction": system_instruction
        }
    },
}

fields = ["description", "data_analytics_agent"]
params = {
    "updateMask": ",".join(fields)
}

data_agent_response = requests.patch(
    data_agent_url, headers=headers, params=params, json=payload
)

if data_agent_response.status_code == 200:
    print("Data Agent updated successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Updating Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

非同期

data_agent_id = "data_agent_1"
billing_project = "YOUR-BILLING-PROJECT"

data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"

payload = {
    "description": "Updated description of the data agent.",
    "data_analytics_agent": {
        "published_context": {
            "datasource_references": bigquery_data_sources,
            "system_instruction": system_instruction
        }
    },
}

fields = ["description", "data_analytics_agent"]
params = {
    "updateMask": ",".join(fields)
}

data_agent_response = requests.patch(
    data_agent_url, headers=headers, params=params, json=payload
)

if data_agent_response.status_code == 200:
    print("Data Agent updated successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Updating Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

サンプル値を次のように置き換えます。

  • data_agent_1: 更新するデータ エージェントの ID。
  • YOUR-BILLING-PROJECT: 課金プロジェクトの ID。
  • Updated description of the data agent.: データ エージェントの新しい説明。

データ エージェントの IAM ポリシーを取得する

次のサンプルコードは、データ エージェントの URL に HTTP POST リクエストを送信して、データ エージェントの IAM ポリシーを取得する方法を示しています。リクエスト ペイロードにはデータ エージェントのパスが含まれます。

billing_project = "YOUR-BILLING-PROJECT"
data_agent_id = "data_agent_1"

data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:getIamPolicy"

# Request body
payload = {
    "resource": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
}

data_agent_response = requests.post(
    data_agent_url, headers=headers, json=payload
)

if data_agent_response.status_code == 200:
    print("IAM Policy fetched successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error fetching IAM policy: {data_agent_response.status_code}")
    print(data_agent_response.text)

サンプル値を次のように置き換えます。

  • YOUR-BILLING-PROJECT: 課金プロジェクトの ID。
  • data_agent_1: IAM ポリシーを取得するデータ エージェントの ID。

データ エージェントの IAM ポリシーを設定する

エージェントを共有するには、setIamPolicy メソッドを使用して、特定のエージェントのユーザーに IAM ロールを割り当てます。次のサンプルコードは、バインディングを含むペイロードを使用してデータ エージェントの URL に POST 呼び出しを行う方法を示しています。バインディングでは、どのユーザーにどのロールを割り当てるかを指定します。

billing_project = "YOUR-BILLING-PROJECT"
data_agent_id = "data_agent_1"
role = "roles/geminidataanalytics.dataAgentEditor"
users = "222larabrown@gmail.com, cloudysanfrancisco@gmail.com"

data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:setIamPolicy"

# Request body
payload = {
    "policy": {
        "bindings": [
            {
                "role": role,
                "members": [
                    f"user:{i.strip()}" for i in users.split(",")
                ]
            }
        ]
    }
}

data_agent_response = requests.post(
    data_agent_url, headers=headers, json=payload
)

if data_agent_response.status_code == 200:
    print("IAM Policy set successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error setting IAM policy: {data_agent_response.status_code}")
    print(data_agent_response.text)

サンプル値を次のように置き換えます。

  • YOUR-BILLING-PROJECT: 課金プロジェクトの ID。
  • data_agent_1: IAM ポリシーを設定するデータ エージェントの ID。
  • 222larabrown@gmail.com, cloudysanfrancisco@gmail.com: 指定したロールを付与するユーザーのメールアドレスのカンマ区切りのリスト。

データ エージェントを削除する

次のサンプルコードは、データ エージェントのリソース URL に HTTP DELETE リクエストを送信して、データ エージェントを同期または非同期で削除する方法を示しています。削除(復元可能)とは、エージェントが削除されても、30 日以内であれば取得できることを意味します。

同期

billing_project = "YOUR-BILLING-PROJECT"
data_agent_id = "data_agent_1"

data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:deleteSync"

data_agent_response = requests.delete(
    data_agent_url, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agent deleted successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Deleting Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

非同期

billing_project = "YOUR-BILLING-PROJECT"
data_agent_id = "data_agent_1"

data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"

data_agent_response = requests.delete(
    data_agent_url, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agent deleted successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Deleting Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

サンプル値を次のように置き換えます。

  • YOUR-BILLING-PROJECT: 課金プロジェクトの ID。
  • data_agent_1: 削除するデータ エージェントの ID。

会話を取得する

次のサンプルコードは、会話リソース URL に HTTP GET リクエストを送信して既存の会話を取得する方法を示しています。

billing_project = "YOUR-BILLING-PROJECT"
conversation_id = "conversation_1"

conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations/{conversation_id}"

conversation_response = requests.get(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation fetched successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while fetching conversation: {conversation_response.status_code}")
    print(conversation_response.text)

サンプル値を次のように置き換えます。

  • YOUR-BILLING-PROJECT: 課金プロジェクトの ID。
  • conversation_1: 取得する会話の ID。

スレッドのリストを取得する

次のサンプルコードは、conversations エンドポイントに HTTP GET リクエストを送信して、特定のプロジェクトの会話を一覧表示する方法を示しています。

デフォルトでは、このメソッドは作成した会話を返します。管理者(cloudaicompanion.topicAdmin IAM ロールを持つユーザー)は、プロジェクト内のすべての会話を表示できます。

billing_project = "YOUR-BILLING-PROJECT"
conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations"

conversation_response = requests.get(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation fetched successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while fetching conversation: {conversation_response.status_code}")
    print(conversation_response.text)

YOUR-BILLING-PROJECT は、必要な API を有効にした課金プロジェクトの ID に置き換えます。

会話内のメッセージを一覧で取得する

次のサンプルコードは、会話の messages エンドポイントに HTTP GET リクエストを送信して、会話内のすべてのメッセージを一覧表示する方法を示しています。

メッセージを一覧表示するには、会話に対する cloudaicompanion.topics.get 権限が必要です。

billing_project = "YOUR-BILLING-PROJECT"

conversation_id = "conversation_1"

conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations/{conversation_id}/messages"

conversation_response = requests.get(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation fetched successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while fetching conversation: {conversation_response.status_code}")
    print(conversation_response.text)

サンプル値を次のように置き換えます。

  • YOUR-BILLING-PROJECT: 課金プロジェクトの ID。
  • conversation_1: メッセージを一覧表示する会話の ID。

会話を削除する

次のサンプルコードは、会話リソース URL に HTTP DELETE リクエストを送信して会話を削除する方法を示しています。管理者(cloudaicompanion.topicAdmin Identity and Access Management ロールを持つユーザー)または cloudaicompanion.topics.delete Identity and Access Management 権限を持つユーザーは、プロジェクト内の会話を削除できます。

billing_project = "YOUR-BILLING-PROJECT"
conversation_id = "conversation_1"

conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations/{conversation_id}"

conversation_response = requests.delete(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation deleted successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while deleting conversation: {conversation_response.status_code}")
    print(conversation_response.text)

サンプル値を次のように置き換えます。

  • YOUR-BILLING-PROJECT: 課金プロジェクトの ID。
  • conversation_1: 削除する会話の ID。

API を使用して質問する

データ エージェント会話(ステートフル チャットの場合)を作成したら、エージェントにクエリを送信できます。

クエリを送信すると、API は Message オブジェクトのストリームを返します。このストリームには、テキスト、データテーブル、グラフなど、さまざまな種類のメッセージが含まれます。テキスト メッセージでは、エージェントの推論に関する分析情報を提供したり、進行状況を報告したり、最終的な回答を提供することができます。各テキスト メッセージの目的は、TextType 値で示されます。

  • THOUGHT: クエリへの回答方法を計画する際のエージェントの内部思考プロセスを示します。THOUGHT メッセージは、エージェントの推論と意思決定のプロセスに関する分析情報をステップごとに提供します。これは次の 2 つの部分から構成されています。parts[0] は思考の要約で、思考の全文を簡潔に要約したものです。parts[1] は思考の全文です。
  • PROGRESS: データの取得や呼び出されるツールなど、アクションに対するエージェントの進行状況をレポートします。この値は Looker データソースに対してのみ返され、次の 2 つの部分から構成されます。parts[0] は概要、parts[1] は進行状況の全文です。
  • FINAL_RESPONSE: クエリに対する最終的な回答を提供します。

次のセクションの例では、チャット レスポンスをストリーミングするヘルパー Python 関数で定義されているヘルパー関数を使用して、API のストリーミング レスポンス内の各メッセージを処理して表示します。Looker データソースを使用しているときに、これらのメッセージをユーザー インターフェースにレンダリングする方法については、Looker データソースのレスポンスをレンダリングするをご覧ください。

ステートフル チャットとステートレス チャットで質問する

API は、次の主なモードで操作できます。

  • ステートフル リクエスト: Google Cloud が会話履歴を保存して管理します。ステートフル チャットは、API が以前のメッセージのコンテキストを保持するため、本質的にマルチターンです。各ターンで現在のメッセージのみを送信します。
  • ステートレス チャット: アプリケーションが会話履歴を管理します。新しいメッセージには、関連する以前のメッセージを含める必要があります。ステートレス モードでマルチターン会話を管理する方法の詳細な例については、ステートレス マルチターン会話を作成するをご覧ください。

次の例は、:chat エンドポイントに POST リクエストを行うことで、ステートフル チャットとステートレス チャットに API を使用する方法を示しています。

ステートフル チャット

会話参照を含むステートフル チャット リクエストを送信する

次のサンプルコードは、前の手順で定義した会話を使用して API に質問する方法を示しています。このサンプルでは、get_stream ヘルパー関数を使用してレスポンスをストリーミングします。

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

data_agent_id = "data_agent_1"
conversation_id = "conversation_1"

# Construct the payload
chat_payload = {
    "parent": f"projects/{billing_project}/locations/global",
    "messages": [
        {
            "userMessage": {
                "text": "Make a bar graph for the top 5 states by the total number of airports"
            }
        }
    ],
    "conversation_reference": {
        "conversation": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}",
        "data_agent_context": {
            "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
            # "credentials": looker_credentials
        }
    }
}

# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)

サンプル値を次のように置き換えます。

  • data_agent_1: データ エージェントを作成するのサンプルコード ブロックで定義されているデータ エージェントの ID。
  • conversation_1: 会話の固有識別子。
  • サンプル プロンプトとして Make a bar graph for the top 5 states by the total number of airports を使用しました。

ステートレス チャット

次の例は、データ エージェント参照またはインライン コンテキストを使用してステートレス リクエストを送信する方法を示しています。

データ エージェント参照を含むステートレス チャット リクエストを送信する

次のサンプルコードは、前の手順で定義したデータ エージェントを使用して、API にステートレスな質問をする方法を示しています。このサンプルでは、get_stream ヘルパー関数を使用してレスポンスをストリーミングします。

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

data_agent_id = "data_agent_1"

# Construct the payload
chat_payload = {
    "parent": f"projects/{billing_project}/locations/global",
    "messages": [
        {
            "userMessage": {
                "text": "Make a bar graph for the top 5 states by the total number of airports"
            }
        }
    ],
    "data_agent_context": {
        "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
        # "credentials": looker_credentials
    }
}

# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)

サンプル値を次のように置き換えます。

  • data_agent_1: データ エージェントを作成するのサンプルコード ブロックで定義されているデータ エージェントの ID。
  • サンプル プロンプトとして Make a bar graph for the top 5 states by the total number of airports を使用しました。

インライン コンテキストを含むステートレス チャット リクエストを送信する

次のサンプルコードは、インライン コンテキストを使用して API にステートレスな質問をする方法を示しています。このサンプルでは、get_stream ヘルパー関数を使用してレスポンスをストリーミングし、BigQuery データソースを例として使用します。

リクエスト ペイロードに options パラメータを含めることで、Python を使用した高度な分析を有効にすることもできます。options パラメータと、会話用に構成できるオプションの詳細については、REST リソース: projects.locations.dataAgents をご覧ください。

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"

# Construct the payload
chat_payload = {
    "parent": f"projects/{billing_project}/locations/global",
    "messages": [
        {
            "userMessage": {
                "text": "Make a bar graph for the top 5 states by the total number of airports"
            }
        }
    ],
    "inline_context": {
        "datasource_references": bigquery_data_sources,
          # Optional: To enable advanced analysis with Python, include the following options block:
          "options": {
              "analysis": {
                  "python": {
                      "enabled": True
                  }
              }
          }
    }
}

# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)

次のサンプルコードは、インライン コンテキストと AlloyDB データソースを例として使用して、API にステートレスな質問をする方法を示しています。

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

# The natural language question to ask the data agent
user_prompt = "what is the average loan amount in the US?"  # Replace with your question

# Construct the payload
chat_payload = {
    "parent": f"projects/{billing_project}/locations/global",
    "messages": [
        {
            "userMessage": {
                "text": user_prompt
            }
        }
    ],
    "data_agent_context": {
          "data_agent": f"projects/{billing_project}/locations/global/dataAgents/{data_agent_id}",
    }
}

print(f"Sending prompt to :chat: '{user_prompt}'")
print(f"Endpoint: {chat_url}")
print(f"Payload: {json.dumps(chat_payload, indent=2)}")

get_stream(chat_url, chat_payload)

ステートレス マルチターンの会話を作成する

ステートレスな会話でフォローアップの質問をするには、アプリケーションで新しいリクエストごとにメッセージ履歴全体を送信して、会話のコンテキストを管理する必要があります。以降のセクションでは、マルチターン会話を作成するためにヘルパー関数を定義して呼び出す方法について説明します。

マルチターンのリクエストを送信する

次の multi_turn_Conversation ヘルパー関数は、メッセージをリストに保存して会話コンテキストを管理します。これにより、前のターンを基にしたフォローアップの質問を送信できます。関数のペイロードで、データ エージェントを参照するか、インライン コンテキストを使用してデータソースを直接指定できます。

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"

# List that is used to track previous turns and is reused across requests
conversation_messages = []

data_agent_id = "data_agent_1"

# Helper function for calling the API
def multi_turn_Conversation(msg):

  userMessage = {
      "userMessage": {
          "text": msg
      }
  }

  # Send a multi-turn request by including previous turns and the new message
  conversation_messages.append(userMessage)

  # Construct the payload
  chat_payload = {
      "parent": f"projects/{billing_project}/locations/global",
      "messages": conversation_messages,
      # Use a data agent reference
      "data_agent_context": {
          "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
          # "credentials": looker_credentials
      },
      # Use inline context
      # "inline_context": {
      #     "datasource_references": bigquery_data_sources,
      # }
  }

  # Call the get_stream_multi_turn helper function to stream the response
  get_stream_multi_turn(chat_url, chat_payload, conversation_messages)

前の例で、data_agent_1 は、データ エージェントを作成するのサンプルコード ブロックで定義されているデータ エージェントの ID に置き換えます。

会話のターンごとに multi_turn_Conversation ヘルパー関数を呼び出すことができます。次のサンプルコードは、最初のリクエストを送信し、前のレスポンスを基にフォローアップ リクエストを送信する方法を示しています。

# Send first-turn request
multi_turn_Conversation("Which species of tree is most prevalent?")

# Send follow-up-turn request
multi_turn_Conversation("Can you show me the results as a bar chart?")

前の例では、サンプル値を次のように置き換えます。

  • Which species of tree is most prevalent?: データ エージェントに送信する自然言語の質問。
  • Can you show me the results as a bar chart?: 前の質問を基にする、または前の質問を絞り込むフォローアップの質問。

レスポンスを処理する

次の get_stream_multi_turn 関数は、ストリーミング API レスポンスを処理します。この関数は get_stream ヘルパー関数と似ていますが、レスポンスを conversation_messages リストに保存して、次のターンの会話コンテキストを保存します。

def get_stream_multi_turn(url, json, conversation_messages):
    s = requests.Session()

    acc = ''

    with s.post(url, json=json, headers=headers, stream=True) as resp:
        for line in resp.iter_lines():
            if not line:
                continue

            decoded_line = str(line, encoding='utf-8')

            if decoded_line == '[{':
                acc = '{'
            elif decoded_line == '}]':
                acc += '}'
            elif decoded_line == ',':
                continue
            else:
                acc += decoded_line

            if not is_json(acc):
                continue

            data_json = json_lib.loads(acc)
            # Store the response that will be used in the next iteration
            conversation_messages.append(data_json)

            if not 'systemMessage' in data_json:
                if 'error' in data_json:
                    handle_error(data_json['error'])
                continue

            if 'text' in data_json['systemMessage']:
                handle_text_response(data_json['systemMessage']['text'])
            elif 'schema' in data_json['systemMessage']:
                handle_schema_response(data_json['systemMessage']['schema'])
            elif 'data' in data_json['systemMessage']:
                handle_data_response(data_json['systemMessage']['data'])
            elif 'chart' in data_json['systemMessage']:
                handle_chart_response(data_json['systemMessage']['chart'])
            else:
                colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter())
                print(colored_json)
            print('\n')
            acc = ''

エンドツーエンドのコードサンプル

次の展開可能なコードサンプルには、このガイドで説明するすべてのタスクが含まれています。

HTTP と Python を使用して BigQuery と Looker のデータ エージェントを構築する

    import json
    import json as json_lib
    import textwrap

    import altair as alt
    import google.auth
    from google.auth.transport.requests import Request
    import IPython
    from IPython.display import display, HTML
    import pandas as pd
    from pygments import formatters, highlight, lexers
    import requests

    from google.colab import auth
    auth.authenticate_user()

    access_token = !gcloud auth application-default print-access-token
    headers = {
        "Authorization": f"Bearer {access_token[0]}",
        "Content-Type": "application/json",
        "x-server-timeout": "300", # Custom timeout up to 600s
    }

    ################### Data source details ###################

    billing_project = "your_billing_project"
    location = "global"
    system_instruction = "Help the user in analyzing their data"


    # BigQuery data source
    bigquery_data_sources = {
        "bq": {
        "tableReferences": [
            {
            "projectId": "bigquery-public-data",
            "datasetId": "san_francisco",
            "tableId": "street_trees"
            }
        ]
        }
    }

    # Looker data source
    looker_credentials = {
        "oauth": {
            "secret": {
            "client_id": "your_looker_client_id",
            "client_secret": "your_looker_client_secret",
            }
        }
    }

    # To use access_token for authentication, uncomment the following looker_credentials code block and comment out the previous looker_credentials code block.
    # looker_credentials = {
    #     "oauth": {
    #         "token": {
    #           "access_token": "your_looker_access_token",
    #         }
    #     }
    # }

    looker_data_source = {
        "looker": {
        "explore_references": [
            {
                "looker_instance_uri": looker_instance_uri,
                "lookml_model": "your_model",
                "explore": "your_explore",
            },
            {
                "looker_instance_uri": looker_instance_uri,
                "lookml_model": "your_model_2",
                "explore": "your_explore_2",
            },
            # Add up to 5 total Explore references
        ],
        # Do not include the following line during agent creation
        # "credentials": looker_credentials
    }

    # Looker Studio data source
    looker_studio_data_source = {
        "studio":{
            "studio_references": [
                {
                  "studio_datasource_id": "studio_datasource_id"
                }
            ]
        }
    }

    ################### Create data agent ###################
    data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents"

    data_agent_id = "data_agent_1"

    data_agent_payload = {
        "name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional
        "description": "This is the description of data_agent.", # Optional

        "data_analytics_agent": {
            "published_context": {
                "datasource_references": bigquery_data_sources,
                "system_instruction": system_instruction,
                # Optional: To enable advanced analysis with Python, include the following options block:
                "options": {
                    "analysis": {
                        "python": {
                            "enabled": True
                        }
                    }
                }
            }
        }
    }

    params = {"data_agent_id": data_agent_id} # Optional

    data_agent_response = requests.post(
        data_agent_url, params=params, json=data_agent_payload, headers=headers
    )

    if data_agent_response.status_code == 200:
        print("Data Agent created successfully!")
        print(json.dumps(data_agent_response.json(), indent=2))
    else:
        print(f"Error creating Data Agent: {data_agent_response.status_code}")
        print(data_agent_response.text)


    ################### Create conversation ###################

    conversation_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/conversations"

    data_agent_id = "data_agent_1"
    conversation_id = "conversation _1"

    conversation_payload = {
        "agents": [
            f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
        ],
        "name": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}"
    }
    params = {
        "conversation_id": conversation_id
    }

    conversation_response = requests.post(conversation_url, headers=headers, params=params, json=conversation_payload)

    if conversation_response.status_code == 200:
        print("Conversation created successfully!")
        print(json.dumps(conversation_response.json(), indent=2))
    else:
        print(f"Error creating Conversation: {conversation_response.status_code}")
        print(conversation_response.text)


    ################### Chat with the API by using conversation (stateful) ####################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

    data_agent_id = "data_agent_1"
    conversation_id = "conversation _1"

    # Construct the payload
    chat_payload = {
        "parent": f"projects/{billing_project}/locations/global",
        "messages": [
            {
                "userMessage": {
                    "text": "Make a bar graph for the top 5 states by the total number of airports"
                }
            }
        ],
        "conversation_reference": {
            "conversation": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}",
            "data_agent_context": {
                "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
                # "credentials": looker_credentials
            }
        }
    }

    # Call the get_stream function to stream the response
    get_stream(chat_url, chat_payload)

    ################### Chat with the API by using dataAgents (stateless) ####################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

    data_agent_id = "data_agent_1"

    # Construct the payload
    chat_payload = {
        "parent": f"projects/{billing_project}/locations/global",
        "messages": [
            {
                "userMessage": {
                    "text": "Make a bar graph for the top 5 states by the total number of airports"
                }
            }
        ],
        "data_agent_context": {
            "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
            # "credentials": looker_credentials
        }
    }

    # Call the get_stream function to stream the response
    get_stream(chat_url, chat_payload)

    ################### Chat with the API by using inline context (stateless) ####################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"

    # Construct the payload
    chat_payload = {
        "parent": f"projects/{billing_project}/locations/global",
        "messages": [
            {
                "userMessage": {
                    "text": "Make a bar graph for the top 5 states by the total number of airports"
                }
            }
        ],
        "inline_context": {
            "datasource_references": bigquery_data_sources,
            # Optional - if wanting to use advanced analysis with python
            "options": {
                "analysis": {
                    "python": {
                        "enabled": True
                    }
                }
            }
        }
    }

    # Call the get_stream function to stream the response
    get_stream(chat_url, chat_payload)

    ################### Multi-turn conversation ###################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"

    # List that is used to track previous turns and is reused across requests
    conversation_messages = []

    data_agent_id = "data_agent_1"

    # Helper function for calling the API
    def multi_turn_Conversation(msg):

      userMessage = {
          "userMessage": {
              "text": msg
          }
      }

      # Send a multi-turn request by including previous turns and the new message
      conversation_messages.append(userMessage)

      # Construct the payload
      chat_payload = {
          "parent": f"projects/{billing_project}/locations/global",
          "messages": conversation_messages,
          # Use a data agent reference
          "data_agent_context": {
              "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
              # "credentials": looker_credentials
          },
          # Use inline context
          # "inline_context": {
          #     "datasource_references": bigquery_data_sources,
          # }
      }

      # Call the get_stream_multi_turn helper function to stream the response
      get_stream_multi_turn(chat_url, chat_payload, conversation_messages)

    # Send first-turn request
    multi_turn_Conversation("Which species of tree is most prevalent?")

    # Send follow-up-turn request
    multi_turn_Conversation("Can you show me the results as a bar chart?")
    

ヘルパー関数を定義する

次のサンプルコードには、前のコードサンプルで使用されているヘルパー関数の定義が含まれています。これらの関数は、API からのレスポンスを解析して結果を表示するのに役立ちます。

チャット レスポンスをストリーミングするヘルパー Python 関数

    def is_json(str):
      try:
          json_object = json_lib.loads(str)
      except ValueError as e:
          return False
      return True

    def handle_text_response(resp):
      parts = resp['parts']
      full_text = "".join(parts)
      if "\n" not in full_text and len(full_text) > 80:
        wrapped_text = textwrap.fill(full_text, width=80)
        print(wrapped_text)
      else:
        print(full_text)

    def get_property(data, field_name, default = ''):
      return data[field_name] if field_name in data else default

    def display_schema(data):
      fields = data['fields']
      df = pd.DataFrame({
        "Column": map(lambda field: get_property(field, 'name'), fields),
        "Type": map(lambda field: get_property(field, 'type'), fields),
        "Description": map(lambda field: get_property(field, 'description', '-'), fields),
        "Mode": map(lambda field: get_property(field, 'mode'), fields)
      })
      display(df)

    def display_section_title(text):
      display(HTML('<h2>{}</h2>'.format(text)))

    def format_bq_table_ref(table_ref):
      return '{}.{}.{}'.format(table_ref['projectId'], table_ref['datasetId'], table_ref['tableId'])

    def format_looker_table_ref(table_ref):
      return 'lookmlModel: {}, explore: {}, lookerInstanceUri: {}'.format(table_ref['lookmlModel'], table_ref['explore'], table_ref['lookerInstanceUri'])

    def display_datasource(datasource):
      source_name = ''

      if 'studioDatasourceId' in datasource:
        source_name = datasource['studioDatasourceId']
      elif 'lookerExploreReference' in datasource:
        source_name = format_looker_table_ref(datasource['lookerExploreReference'])
      else:
        source_name = format_bq_table_ref(datasource['bigqueryTableReference'])

      print(source_name)
      display_schema(datasource['schema'])

    def handle_schema_response(resp):
      if 'query' in resp:
        print(resp['query']['question'])
      elif 'result' in resp:
        display_section_title('Schema resolved')
        print('Data sources:')
        for datasource in resp['result']['datasources']:
          display_datasource(datasource)

    def handle_data_response(resp):
      if 'query' in resp:
        query = resp['query']
        display_section_title('Retrieval query')
        print('Query name: {}'.format(query['name']))
        if 'question' in query:
          print('Question: {}'.format(query['question']))
        if 'datasources' in query:
          print('Data sources:')
          for datasource in query['datasources']:
            display_datasource(datasource)
      elif 'generatedSql' in resp:
        display_section_title('SQL generated')
        print(resp['generatedSql'])
      elif 'result' in resp:
        display_section_title('Data retrieved')

        fields = map(lambda field: get_property(field, 'name'), resp['result']['schema']['fields'])
        dict = {}

        for field in fields:
          dict[field] = map(lambda el: get_property(el, field), resp['result']['data'])

        display(pd.DataFrame(dict))

    def handle_chart_response(resp):
      if 'query' in resp:
        print(resp['query']['instructions'])
      elif 'result' in resp:
        vegaConfig = resp['result']['vegaConfig']
        alt.Chart.from_json(json_lib.dumps(vegaConfig)).display();

    def handle_error(resp):
      display_section_title('Error')
      print('Code: {}'.format(resp['code']))
      print('Message: {}'.format(resp['message']))

    def get_stream(url, json):
      s = requests.Session()

      acc = ''

      with s.post(url, json=json, headers=headers, stream=True) as resp:
        for line in resp.iter_lines():
          if not line:
            continue

          decoded_line = str(line, encoding='utf-8')

          if decoded_line == '[{':
            acc = '{'
          elif decoded_line == '}]':
            acc += '}'
          elif decoded_line == ',':
            continue
          else:
            acc += decoded_line

          if not is_json(acc):
            continue

          data_json = json_lib.loads(acc)

          if not 'systemMessage' in data_json:
            if 'error' in data_json:
                handle_error(data_json['error'])
            continue

          if 'text' in data_json['systemMessage']:
            handle_text_response(data_json['systemMessage']['text'])
          elif 'schema' in data_json['systemMessage']:
            handle_schema_response(data_json['systemMessage']['schema'])
          elif 'data' in data_json['systemMessage']:
            handle_data_response(data_json['systemMessage']['data'])
          elif 'chart' in data_json['systemMessage']:
            handle_chart_response(data_json['systemMessage']['chart'])
          else:
            colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter())
            print(colored_json)
            print('\n')
            acc = ''

    def get_stream_multi_turn(url, json, conversation_messages):
        s = requests.Session()

        acc = ''

        with s.post(url, json=json, headers=headers, stream=True) as resp:
            for line in resp.iter_lines():
                if not line:
                    continue

                decoded_line = str(line, encoding='utf-8')

                if decoded_line == '[{':
                    acc = '{'
                elif decoded_line == '}]':
                    acc += '}'
                elif decoded_line == ',':
                    continue
                else:
                    acc += decoded_line

                if not is_json(acc):
                    continue

                data_json = json_lib.loads(acc)
                # Store the response that will be used in the next iteration
                conversation_messages.append(data_json)

                if not 'systemMessage' in data_json:
                    if 'error' in data_json:
                        handle_error(data_json['error'])
                    continue

                if 'text' in data_json['systemMessage']:
                    handle_text_response(data_json['systemMessage']['text'])
                elif 'schema' in data_json['systemMessage']:
                    handle_schema_response(data_json['systemMessage']['schema'])
                elif 'data' in data_json['systemMessage']:
                    handle_data_response(data_json['systemMessage']['data'])
                elif 'chart' in data_json['systemMessage']:
                    handle_chart_response(data_json['systemMessage']['chart'])
                else:
                    colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter())
                    print(colored_json)
                print('\n')
                acc = ''