Créer un agent de données à l'aide de HTTP et Python

Cette page explique comment utiliser Python pour envoyer des requêtes HTTP à l'API Conversational Analytics (accessible via geminidataanalytics.googleapis.com).

L'exemple de code Python sur cette page montre comment effectuer les tâches suivantes :

La version complète de l'exemple de code est disponible à la fin de la page, ainsi que les fonctions d'assistance utilisées pour diffuser la réponse de l'API.

Configurer les paramètres initiaux et l'authentification

L'exemple de code Python suivant effectue les tâches ci-dessous :

  • Importe les bibliothèques Python requises
  • Obtient un jeton d'accès pour l'authentification HTTP à l'aide de Google Cloud CLI
  • Définit les variables pour le projet de facturation, l'emplacement et les instructions système
import json
import json as json_lib
import textwrap

import altair as alt
import google.auth
from google.auth.transport.requests import Request
import IPython
from IPython.display import display, HTML
import pandas as pd
from pygments import highlight, lexers, formatters
import requests

from google.colab import auth
auth.authenticate_user()

access_token = !gcloud auth application-default print-access-token
headers = {
        "Authorization": f"Bearer {access_token[0]}",
        "Content-Type": "application/json",
        "x-server-timeout": "300", # Custom timeout up to 600s
}

billing_project = 'YOUR-BILLING-PROJECT'
location = "global"
system_instruction = 'YOUR-SYSTEM-INSTRUCTIONS'

Remplacez les exemples de valeurs comme suit :

  • YOUR-BILLING-PROJECT : ID du projet de facturation dans lequel vous avez activé les API requises.
  • YOUR-SYSTEM-INSTRUCTIONS : instructions système pour guider le comportement de l'agent et le personnaliser en fonction de vos besoins en données. Par exemple, vous pouvez utiliser des instructions système pour définir des termes métier, contrôler la longueur des réponses ou définir le format des données. Dans l'idéal, définissez les instructions système en utilisant le format YAML recommandé dans Rédiger des instructions système efficaces pour fournir des conseils détaillés et structurés.

S'authentifier auprès de Looker

Si vous prévoyez de vous connecter à une source de données Looker, vous devrez vous authentifier auprès de l'instance Looker.

Utiliser des clés API

L'exemple de code Python suivant montre comment authentifier votre agent auprès d'une instance Looker à l'aide de clés API.

looker_credentials = {
    "oauth": {
        "secret": {
            "client_id": "YOUR-LOOKER-CLIENT-ID",
            "client_secret": "YOUR-LOOKER-CLIENT-SECRET",
        }
    }
}

Remplacez les exemples de valeurs comme suit :

  • YOUR-LOOKER-CLIENT-ID : ID client de la clé API Looker générée.
  • YOUR-LOOKER-CLIENT-SECRET : code secret du client pour la clé API Looker générée.

Utiliser des jetons d'accès

L'exemple de code Python suivant montre comment authentifier votre agent auprès d'une instance Looker à l'aide de jetons d'accès.

looker_credentials = {
    "oauth": {
        "token": {
            "access_token": "YOUR-TOKEN",
        }
    }
}

Remplacez les exemples de valeurs comme suit :

  • YOUR-TOKEN : valeur access_token que vous générez pour vous authentifier auprès de Looker.

S'authentifier auprès d'AlloyDB pour PostgreSQL

Si vous prévoyez de vous connecter à une source de données AlloyDB, utilisez la gcloud CLI pour vous authentifier auprès de l'instance AlloyDB.

  1. Activez l'authentification Identity and Access Management (IAM) sur AlloyDB en activant l'option alloydb.iam_authentication sur l'instance AlloyDB.
  2. Attribuez des rôles IAM. Le compte principal (utilisateur ou compte de service) qui appelle l'API Conversational Analytics doit disposer des autorisations IAM appropriées dans votre projetGoogle Cloud . Cela inclut généralement des rôles tels que alloydb.databaseUser.
  3. Créez un utilisateur de base de données correspondant. Créez un utilisateur AlloyDB qui reflète le compte principal IAM, en utilisant l'adresse e-mail de l'utilisateur ou du compte de service comme nom d'utilisateur.

Pour en savoir plus, consultez Authentification IAM pour les bases de données.

S'authentifier auprès de Cloud SQL pour MySQL et Cloud SQL pour PostgreSQL

Si vous prévoyez de vous connecter à une source de données Cloud SQL pour MySQL ou Cloud SQL pour PostgreSQL, vous devez vous authentifier auprès de l'instance Cloud SQL.

  1. Activez l'authentification IAM pour les bases de données. Assurez-vous que le flag cloudsql.iam_authentication est défini sur on dans votre instance Cloud SQL pour MySQL. Vous pouvez l'ajouter lorsque vous créez une instance ou lorsque vous corrigez une instance existante.

    gcloud sql instances patch INSTANCE_NAME --database-flags
    cloudsql.iam_authentication=on
    
  2. Attribuez les rôles IAM requis.

    Le compte principal (utilisateur ou compte de service) qui tente de se connecter doit disposer des autorisations IAM cloudsql.instances.login et cloudsql.instances.connect sur le projet. Pour accorder ces rôles et autorisations, procédez comme suit :

    1. Attribuez le rôle roles/cloudsql.instanceUser. Cela inclut les autorisations de se connecter à l'instance (cloudsql.instances.login) et de se connecter à l'aide du proxy (cloudsql.instances.connect).
    2. Vous pouvez également attribuer un rôle personnalisé contenant au moins cloudsql.instances.login et cloudsql.instances.connect.
  3. Ajoutez le compte principal IAM en tant qu'utilisateur de la base de données. Créez un utilisateur dans l'instance Cloud SQL qui correspond à l'adresse e-mail du principal IAM.

    Pour un compte de service, exécutez la commande suivante :

    gcloud sql users create SERVICE_ACCOUNT_EMAIL --instance=INSTANCE_NAME
    --type=cloud_iam_service_account
    

    Pour un compte utilisateur, exécutez la commande suivante :

    gcloud sql users create USER_EMAIL --instance=INSTANCE_NAME
    --type=cloud_iam_user
    
  4. Accorder des droits pour une base de données La connexion avec IAM authentifie l'utilisateur auprès de la base de données, mais n'accorde pas automatiquement d'autorisations dans la base de données. Vous devez utiliser des commandes PostgreSQL standards, comme GRANT, pour accorder à ce nouvel utilisateur de base de données des autorisations sur des objets spécifiques tels que des tables et des schémas.

  5. Connectez-vous en tant que superutilisateur (comme l'utilisateur postgres par défaut) et exécutez les commandes suivantes :

    -- Example: Connect using psql as the 'postgres' user
    
    GRANT SELECT ON ALL TABLES IN SCHEMA public TO
    "user-or-service-account@example.com";
    
  6. Connectez-vous à l'aide de l'authentification IAM.

    Les clients se connectent généralement via le proxy d'authentification Cloud SQL ou une bibliothèque de connecteurs, qui gère l'échange de jetons. Le nom d'utilisateur de la base de données correspond à l'adresse e-mail du principal IAM. Les mots de passe ne sont pas utilisés. Un jeton OAuth 2.0, généralement géré automatiquement par le proxy d'authentification Cloud SQL ou par Google Cloud CLI, est utilisé à la place.

    L'exemple suivant utilise psql et la gcloud CLI pour générer un jeton :

    PGPASSWORD=$(gcloud sql generate-login-token) psql --host=HOST_IP \
    --username=IAM_PRINCIPAL_EMAIL --dbname=DATABASE_NAME\
    

    Pour en savoir plus, consultez la page Authentification IAM.

    Remplacez les exemples de valeurs comme suit :

    • INSTANCE_NAME : ID de l'instance Cloud SQL que vous souhaitez corriger pour activer l'authentification IAM pour les bases de données.
    • SERVICE_ACCOUNT_EMAIL : adresse e-mail du compte de service.
    • USER_EMAIL : adresse e-mail du compte utilisateur.
    • HOST_IP : adresse IP de l'instance Cloud SQL.
    • IAM_PRINCIPAL_EMAIL : adresse e-mail du compte principal IAM (compte de service ou utilisateur, par exemple).
    • DATABASE_NAME : nom de la base de données à laquelle se connecter.

S'authentifier auprès de Spanner

Si vous prévoyez de vous connecter à une source de données Spanner, vous devez vous authentifier auprès de l'instance Spanner.

  1. Attribuez des rôles IAM. Le compte principal (utilisateur ou compte de service) qui appelle l'API Conversational Analytics doit disposer des autorisations IAM appropriées dans votre projetGoogle Cloud . Cela inclut généralement des rôles tels que databaseUser.
  2. Créez des rôles de base de données abstraits correspondants dans Spanner à l'aide du langage de définition de données (LDD).
  3. Utilisez des stratégies IAM pour permettre aux comptes principaux IAM d'assumer ces rôles de base de données.

Pour en savoir plus, consultez Privilèges de contrôle des accès ultraprécis.

Vous connecter à une source de données

Les sections suivantes montrent comment définir les détails de connexion pour les sources de données de votre agent. Votre agent peut se connecter aux données de différentes manières :

Se connecter aux données Looker

L'exemple de code suivant définit une connexion à une exploration Looker. Pour établir une connexion à une instance Looker, vérifiez que vous avez généré des clés API Looker, comme décrit sur la page S'authentifier et se connecter à une source de données avec l'API Conversational Analytics. L'API Conversational Analytics vous permet de vous connecter à un maximum de cinq explorations Looker à la fois.

Lorsque vous vous connectez à une source de données Looker, tenez compte des points suivants :

  • Vous pouvez interroger n'importe quelle exploration incluse dans une conversation.
  • Un agent ne peut interroger qu'une seule exploration à la fois. Il n'est pas possible d'effectuer des requêtes dans plusieurs Explorations simultanément.
  • Un agent peut interroger plusieurs explorations dans la même conversation.
  • Un agent peut interroger plusieurs Explorations dans une conversation qui inclut des questions en plusieurs parties ou des questions de suivi.

    Par exemple, un utilisateur connecte deux explorations, l'une appelée cat-explore et l'autre dog-explore. L'utilisateur saisit la question "Qu'est-ce qui est le plus grand : le nombre de chats ou le nombre de chiens ?" Cela créerait deux requêtes : une pour compter le nombre de chats dans cat-explore et une pour compter le nombre de chiens dans dog-explore. L'agent compare les nombres des deux requêtes après les avoir effectuées.

L'exemple de code suivant définit une connexion à plusieurs explorations Looker. Pour améliorer les performances de l'agent, vous pouvez éventuellement fournir des requêtes de référence comme contexte structuré pour vos explorations. Pour en savoir plus, consultez Définir le contexte de l'agent de données pour les sources de données Looker.

looker_instance_uri = "https://your_company.looker.com"
looker_data_source = {
    "looker": {
        "explore_references": [
            {
                "looker_instance_uri": looker_instance_uri,
                "lookml_model": "your_model",
                "explore": "your_explore",
            },
            {
                "looker_instance_uri": looker_instance_uri,
                "lookml_model": "your_model_2",
                "explore": "your_explore_2",
            },
            # Add up to 5 total Explore references
        ],
       # Do not include the following line during agent creation
       "credentials": looker_credentials
    }
}

Remplacez les exemples de valeurs comme suit :

  • https://your_company.looker.com : URL complète de votre instance Looker.
  • your_model : nom du modèle LookML qui inclut l'exploration à laquelle vous souhaitez vous connecter.
  • your_explore : nom de l'exploration Looker que l'agent de données doit interroger.
  • your_model_2 : nom du deuxième modèle LookML qui inclut l'exploration à laquelle vous souhaitez vous connecter. Vous pouvez répéter cette variable pour d'autres modèles, jusqu'à cinq explorations.
  • your_explore_2 : nom de l'exploration Looker supplémentaire que l'agent de données doit interroger. Vous pouvez répéter cette variable pour inclure jusqu'à cinq explorations.

Se connecter aux données BigQuery

L'API Conversational Analytics ne limite pas le nombre de tables BigQuery auxquelles vous pouvez vous connecter. Toutefois, si vous vous connectez à un grand nombre de tables, la précision peut diminuer ou vous risquez de dépasser la limite de jetons d'entrée du modèle.

L'exemple de code suivant définit une connexion à plusieurs tables BigQuery et inclut des exemples de champs de contexte structuré facultatifs. Pour améliorer les performances de l'agent, vous pouvez éventuellement fournir un contexte structuré pour vos tables BigQuery, comme des descriptions de tables et de colonnes, des synonymes, des tags et des exemples de requêtes. Pour en savoir plus, consultez Définir le contexte de l'agent de données pour les sources de données BigQuery.

bigquery_data_sources = {
    "bq": {
        "tableReferences": [
            {
                "projectId": "my_project_id",
                "datasetId": "my_dataset_id",
                "tableId": "my_table_id",
                "schema": {
                    "description": "my_table_description",
                    "fields": [{
                        "name": "my_column_name",
                        "description": "my_column_description"
                    }]
                }
            },
            {
                "projectId": "my_project_id_2",
                "datasetId": "my_dataset_id_2",
                "tableId": "my_table_id_2"
            },
            {
                "projectId": "my_project_id_3",
                "datasetId": "my_dataset_id_3",
                "tableId": "my_table_id_3"
            },
        ]
    }
}

Remplacez les exemples de valeurs comme suit :

  • my_project_id : ID du projet Google Cloud contenant l'ensemble de données et la table BigQuery auxquels vous souhaitez vous connecter. Pour vous connecter à un ensemble de données public, spécifiez bigquery-public-data.
  • my_dataset_id : ID de l'ensemble de données BigQuery.
  • my_table_id : ID de la table BigQuery.
  • my_table_description : description facultative du contenu et de l'objectif de la table.
  • my_column_name : nom d'une colonne de la table pour laquelle vous fournissez une description facultative.
  • my_column_description : description facultative du contenu et de l'objectif de la colonne.

Se connecter aux données Looker Studio

L'exemple de code suivant définit une connexion à une source de données Looker Studio.

looker_studio_data_source = {
    "studio":{
        "studio_references": [
            {
              "studio_datasource_id": "studio_datasource_id"
            }
        ]
    }
}

Remplacez studio_datasource_id par l'ID de la source de données.

Se connecter aux données AlloyDB

L'API Conversational Analytics utilise un champ database_reference dans le champ alloydb pour se connecter à votre cluster AlloyDB.

L'exemple suivant définit une connexion à une base de données AlloyDB.

alloydb_data_source = {
    "alloydb": {
        "database_reference":
          {
            "project_id":"PROJECT_ID",
            "region":"REGION",
            "cluster_id":"CLUSTER_ID",
            "instance_id":"INSTANCE_ID",
            "database_id":"DATABASE",
            "table_ids":["TABLE_1_ID", "TABLE_2_ID"]
          }
          # Optional: Include this if you have pre-authored context for the agent
          # "agent_context_reference": {
          #     "context_set_id": f"projects/billing_project/locations/location/contextSets/your_context_set_id"
          # }

    }
}

Remplacez les exemples de valeurs comme suit :

  • PROJECT_ID : ID du projet Google Cloud contenant le cluster AlloyDB.
  • REGION : région où réside le cluster AlloyDB, par exemple us-central1.
  • CLUSTER_ID : ID du cluster AlloyDB.
  • INSTANCE_ID : ID de l'instance AlloyDB.
  • DATABASE : nom de la base de données cible, par exemple financial.
  • TABLE_1_ID, TABLE_2_ID : facultatif. Liste des tables de la base de données que l'agent de données est invité à utiliser (par exemple, "loan", "client", "disp").
  • billing_project : ID du projet de facturation dans lequel vous avez activé les API requises.
  • location : emplacement de votre ensemble de contexte (par exemple, "global").
  • your_context_set_id : ID de votre ensemble de contexte si vous avez créé un contexte avancé pour l'agent.

Se connecter aux données Cloud SQL pour MySQL et Cloud SQL pour PostgreSQL

L'API Conversational Analytics utilise un champ database_reference dans le champ sql pour se connecter à votre instance Cloud SQL.

L'exemple suivant définit une connexion à une base de données Cloud SQL pour MySQL ou Cloud SQL pour PostgreSQL.

sql_data_source = {
    "cloud_sql_reference": {
        "database_reference":
          {
            "project_id":"PROJECT_ID",
            "region":"REGION",
            "cluster_id":"CLUSTER_ID",
            "instance_id":"INSTANCE_ID",
            "database_id":"DATABASE_ID",
            "table_ids":["TABLE_1_ID", "TABLE_2_ID"]
          }
          # Optional: Include this if you have pre-authored context for the agent
          # "agent_context_reference": {
          #     "context_set_id": f"projects/billing_project/locations/location/contextSets/your_context_set_id"
          # }
    }
}

Remplacez les exemples de valeurs comme suit :

  • PROJECT_ID : ID du projet Google Cloud contenant l'instance Cloud SQL.
  • REGION : région dans laquelle réside l'instance Cloud SQL (par exemple, us-central1).
  • CLUSTER_ID : ID du cluster Cloud SQL.
  • INSTANCE_ID : ID de l'instance Cloud SQL.
  • DATABASE_ID : nom de la base de données cible, par exemple financial.
  • TABLE_1_ID, TABLE_2_ID : facultatif. Liste des tables de la base de données que l'agent de données est invité à utiliser (par exemple, "loan", "client", "disp").
  • billing_project : ID du projet de facturation dans lequel vous avez activé les API requises.
  • location : emplacement de votre ensemble de contexte (par exemple, "global").
  • your_context_set_id : ID de votre ensemble de contexte si vous avez créé un contexte avancé pour l'agent.

Se connecter aux données Spanner

L'API Conversational Analytics utilise un champ database_reference dans le champ spanner pour se connecter à votre instance Spanner.

L'exemple suivant définit une connexion à une base de données Spanner.

spanner_data_sources = {
    "spanner_reference": {
        "database_reference": {
            "project_id": "PROJECT_ID",
            "region": location,
            "engine": GOOGLE_SQL,
            "instance_id": "INSTANCE_ID",
            "database_id": "DATABASE",
            "table_ids":["TABLE_1_ID", "TABLE_2_ID"]
        },
        # Optional: Include this if you have pre-authored context for the agent
        # "agent_context_reference": {
        #     "context_set_id": f"projects/billing_project/locations/location/contextSets/your_context_set_id"
        # }
    }
}

Remplacez les exemples de valeurs comme suit :

  • PROJECT_ID : ID du projet Google Cloud contenant l'instance Spanner.
  • INSTANCE_ID : ID de l'instance Spanner.
  • DATABASE : nom de la base de données cible, par exemple financial.
  • TABLE_1_ID, TABLE_2_ID : liste des tables de la base de données que l'agent de données est invité à utiliser (par exemple, "loan", "client", "disp").
  • billing_project : ID du projet de facturation dans lequel vous avez activé les API requises.
  • location : emplacement de votre ensemble de contexte (par exemple, "global").
  • your_context_set_id : si vous avez créé un contexte avancé pour l'agent, ID de votre ensemble de contexte.

Créer un agent de données

L'exemple de code suivant montre comment créer l'agent de données de manière synchrone ou asynchrone en envoyant une requête HTTP POST au point de terminaison de création de l'agent de données. La charge utile de la requête inclut les informations suivantes :

Vous pouvez éventuellement protéger l'agent de données à l'aide de clés de chiffrement gérées par le client (CMEK) en fournissant un kms_key lors de la création. Le chiffrement CMEK n'est compatible qu'avec les sources de données Looker. Pour en savoir plus, consultez Clés de chiffrement gérées par le client (CMEK).

Vous pouvez également activer l'analyse avancée avec Python en incluant le paramètre options dans la charge utile de la requête. Consultez la ressource REST projects.locations.dataAgents pour en savoir plus sur le paramètre options et les options que vous pouvez configurer pour la conversation.

Synchrone

data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents:createSync"

data_agent_id = "data_agent_1"

# Optional: If using CMEK, replace the empty strings with your KMS key details.
kms_project_id = ""  # KMS_PROJECT_ID (Defaults to billing_project if empty)
key_ring_name = ""   # KEY_RING_NAME
key_name = ""        # KEY_NAME

data_agent_payload = {
      "name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional
      "description": "This is the description of data_agent_1.", # Optional
      "data_analytics_agent": {
          "published_context": {
              "datasource_references": bigquery_data_sources,
              "system_instruction": system_instruction,
              # Optional: To enable advanced analysis with Python, include the following options block:
              "options": {
                  "analysis": {
                      "python": {
                          "enabled": True
                      }
                  }
              }
          }
      }
  }

# If key details are provided, construct and add the kms_key field.
if key_ring_name and key_name:
  if not kms_project_id:
    kms_project_id = billing_project
  data_agent_payload["kms_key"] = f"projects/{kms_project_id}/locations/{location}/keyRings/{key_ring_name}/cryptoKeys/{key_name}"

params = {"data_agent_id": data_agent_id} # Optional

data_agent_response = requests.post(
    data_agent_url, params=params, json=data_agent_payload, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agent created successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error creating Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

Asynchrone

data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents"

data_agent_id = "data_agent_1"

# Optional: If using CMEK, replace the empty strings with your KMS key details.
kms_project_id = ""  # KMS_PROJECT_ID (Defaults to billing_project if empty)
key_ring_name = ""   # KEY_RING_NAME
key_name = ""        # KEY_NAME

data_agent_payload = {
      "name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional
      "description": "This is the description of data_agent_1.", # Optional
      "data_analytics_agent": {
          "published_context": {
              "datasource_references": bigquery_data_sources,
              "system_instruction": system_instruction,
              # Optional: To enable advanced analysis with Python, include the following options block:
              "options": {
                  "analysis": {
                      "python": {
                          "enabled": True
                      }
                  }
              }
          }
      }
  }

# If key details are provided, construct and add the kms_key field.
if key_ring_name and key_name:
  if not kms_project_id:
    kms_project_id = billing_project
  data_agent_payload["kms_key"] = f"projects/{kms_project_id}/locations/{location}/keyRings/{key_ring_name}/cryptoKeys/{key_name}"

params = {"data_agent_id": data_agent_id} # Optional

data_agent_response = requests.post(
    data_agent_url, params=params, json=data_agent_payload, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agent created successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error creating Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

Remplacez les exemples de valeurs comme suit :

  • data_agent_1 : identifiant unique de l'agent de données. Cette valeur est utilisée dans le nom de ressource de l'agent et comme paramètre de requête d'URL data_agent_id.
  • KMS_PROJECT_ID : Si vous utilisez une clé CMEK, ID du projet dans lequel la clé est hébergée. Si vous ne le fournissez pas, la valeur par défaut est votre projet de facturation.
  • KEY_RING_NAME : si vous utilisez CMEK, nom de votre trousseau de clés Cloud KMS.
  • KEY_NAME : si vous utilisez CMEK, nom de votre clé Cloud KMS.
  • This is the description of data_agent_1. : description de l'agent de données.

Créer une conversation

L'exemple de code suivant montre comment créer une conversation avec votre agent de données.

Vous pouvez éventuellement protéger la conversation avec une clé CMEK en fournissant un kms_key lors de la création. Le chiffrement CMEK n'est compatible qu'avec les sources de données Looker. Pour en savoir plus, consultez Clés de chiffrement gérées par le client (CMEK).

conversation_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/conversations"

data_agent_id = "data_agent_1"
conversation_id = "conversation_1"

# Optional: If using CMEK, replace the empty strings with your KMS key details.
kms_project_id = ""  # KMS_PROJECT_ID (Defaults to billing_project if empty)
key_ring_name = ""   # KEY_RING_NAME
key_name = ""        # KEY_NAME

conversation_payload = {
    "agents": [
        f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
    ],
    "name": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}",
}

# If key details are provided, construct and add the kms_key field.
if key_ring_name and key_name:
  if not kms_project_id:
    kms_project_id = billing_project
  conversation_payload["kms_key"] = f"projects/{kms_project_id}/locations/{location}/keyRings/{key_ring_name}/cryptoKeys/{key_name}"

params = {
    "conversation_id": conversation_id
}

conversation_response = requests.post(conversation_url, headers=headers, params=params, json=conversation_payload)

if conversation_response.status_code == 200:
    print("Conversation created successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error creating Conversation: {conversation_response.status_code}")
    print(conversation_response.text)

Remplacez les exemples de valeurs comme suit :

  • data_agent_1 : ID de l'agent de données, tel que défini dans l'exemple de bloc de code de la section Créer un agent de données.
  • conversation_1 : identifiant unique de la conversation.
  • KMS_PROJECT_ID : Si vous utilisez une clé CMEK, ID du projet dans lequel la clé est hébergée. Si vous ne le fournissez pas, la valeur par défaut est votre projet de facturation.
  • KEY_RING_NAME : si vous utilisez CMEK, nom de votre trousseau de clés Cloud KMS.
  • KEY_NAME : si vous utilisez CMEK, nom de votre clé Cloud KMS.

Gérer les agents de données et les conversations

Les exemples de code suivants montrent comment gérer vos agents de données et vos conversations à l'aide de l'API Conversational Analytics. Vous pouvez effectuer les tâches suivantes :

Obtenir un agent de données

L'exemple de code suivant montre comment récupérer un agent de données existant en envoyant une requête HTTP GET à l'URL de la ressource de l'agent de données.

data_agent_id = "data_agent_1"
data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"

data_agent_response = requests.get(
    data_agent_url, headers=headers
)

if data_agent_response.status_code == 200:
    print("Fetched Data Agent successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error: {data_agent_response.status_code}")
    print(data_agent_response.text)

Dans l'exemple précédent, remplacez data_agent_1 par l'ID de l'agent de données que vous souhaitez récupérer.

Lister les agents de données

Le code suivant montre comment lister tous les agents de données d'un projet donné en envoyant une requête HTTP GET au point de terminaison dataAgents.

Pour recenser tous les agents, vous devez disposer de l'autorisation geminidataanalytics.dataAgents.list sur le projet. Pour en savoir plus sur les rôles IAM incluant cette autorisation, consultez la liste des rôles prédéfinis.

billing_project = "YOUR-BILLING-PROJECT"
data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents"

data_agent_response = requests.get(
    data_agent_url, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agents Listed successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Listing Data Agents: {data_agent_response.status_code}")

Remplacez YOUR-BILLING-PROJECT par l'ID de votre projet de facturation.

Lister les agents de données accessibles

Le code suivant montre comment lister tous les agents de données accessibles pour un projet donné en envoyant une requête HTTP GET au point de terminaison dataAgents:listAccessible.

billing_project = "YOUR-BILLING-PROJECT"
creator_filter = "YOUR-CREATOR-FILTER"
data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents:listAccessible"

params = {
    "creator_filter": creator_filter
}

data_agent_response = requests.get(
    data_agent_url, headers=headers, params=params
)

if data_agent_response.status_code == 200:
    print("Accessible Data Agents Listed successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Listing Accessible Data Agents: {data_agent_response.status_code}")

Remplacez les exemples de valeurs comme suit :

  • YOUR-BILLING-PROJECT : ID de votre projet de facturation.
  • YOUR-CREATOR-FILTER : filtre à appliquer en fonction du créateur de l'agent de données. Les valeurs possibles sont NONE (par défaut), CREATOR_ONLY et NOT_CREATOR_ONLY.

Mettre à jour un agent de données

L'exemple de code suivant montre comment mettre à jour un agent de données de manière synchrone ou asynchrone en envoyant une requête HTTP PATCH à l'URL de la ressource de l'agent de données. La charge utile de la requête inclut les nouvelles valeurs des champs que vous souhaitez modifier, et les paramètres de requête incluent un paramètre updateMask qui spécifie les champs à mettre à jour.

Synchrone

data_agent_id = "data_agent_1"
billing_project = "YOUR-BILLING-PROJECT"

data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:updateSync"

payload = {
    "description": "Updated description of the data agent.",
    "data_analytics_agent": {
        "published_context": {
            "datasource_references": bigquery_data_sources,
            "system_instruction": system_instruction
        }
    },
}

fields = ["description", "data_analytics_agent"]
params = {
    "updateMask": ",".join(fields)
}

data_agent_response = requests.patch(
    data_agent_url, headers=headers, params=params, json=payload
)

if data_agent_response.status_code == 200:
    print("Data Agent updated successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Updating Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

Asynchrone

data_agent_id = "data_agent_1"
billing_project = "YOUR-BILLING-PROJECT"

data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"

payload = {
    "description": "Updated description of the data agent.",
    "data_analytics_agent": {
        "published_context": {
            "datasource_references": bigquery_data_sources,
            "system_instruction": system_instruction
        }
    },
}

fields = ["description", "data_analytics_agent"]
params = {
    "updateMask": ",".join(fields)
}

data_agent_response = requests.patch(
    data_agent_url, headers=headers, params=params, json=payload
)

if data_agent_response.status_code == 200:
    print("Data Agent updated successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Updating Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

Remplacez les exemples de valeurs comme suit :

  • data_agent_1 : ID de l'agent de données que vous souhaitez mettre à jour.
  • YOUR-BILLING-PROJECT : ID de votre projet de facturation.
  • Updated description of the data agent. : nouvelle description de l'agent de données.

Obtenir la stratégie IAM d'un agent de données

L'exemple de code suivant montre comment récupérer la stratégie IAM d'un agent de données en envoyant une requête HTTP POST à l'URL de l'agent de données. La charge utile de la requête inclut le chemin d'accès de l'agent de données.

billing_project = "YOUR-BILLING-PROJECT"
data_agent_id = "data_agent_1"

data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:getIamPolicy"

# Request body
payload = {
    "resource": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
}

data_agent_response = requests.post(
    data_agent_url, headers=headers, json=payload
)

if data_agent_response.status_code == 200:
    print("IAM Policy fetched successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error fetching IAM policy: {data_agent_response.status_code}")
    print(data_agent_response.text)

Remplacez les exemples de valeurs comme suit :

  • YOUR-BILLING-PROJECT : ID de votre projet de facturation.
  • data_agent_1 : ID de l'agent de données pour lequel vous souhaitez obtenir la stratégie IAM.

Définir la stratégie IAM pour un agent de données

Pour partager un agent, vous pouvez utiliser la méthode setIamPolicy afin d'attribuer des rôles IAM aux utilisateurs sur un agent spécifique. L'exemple de code suivant montre comment effectuer un appel POST à l'URL de l'agent de données avec une charge utile incluant des liaisons. La liaison spécifie les rôles à attribuer à chaque utilisateur.

billing_project = "YOUR-BILLING-PROJECT"
data_agent_id = "data_agent_1"
role = "roles/geminidataanalytics.dataAgentEditor"
users = "222larabrown@gmail.com, cloudysanfrancisco@gmail.com"

data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:setIamPolicy"

# Request body
payload = {
    "policy": {
        "bindings": [
            {
                "role": role,
                "members": [
                    f"user:{i.strip()}" for i in users.split(",")
                ]
            }
        ]
    }
}

data_agent_response = requests.post(
    data_agent_url, headers=headers, json=payload
)

if data_agent_response.status_code == 200:
    print("IAM Policy set successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error setting IAM policy: {data_agent_response.status_code}")
    print(data_agent_response.text)

Remplacez les exemples de valeurs comme suit :

  • YOUR-BILLING-PROJECT : ID de votre projet de facturation.
  • data_agent_1 : ID de l'agent de données pour lequel vous souhaitez définir la stratégie IAM.
  • 222larabrown@gmail.com, cloudysanfrancisco@gmail.com : liste des adresses e-mail des utilisateurs auxquels vous souhaitez attribuer le rôle spécifié, séparées par une virgule.

Supprimer un agent de données

L'exemple de code suivant montre comment supprimer un agent de données de façon réversible de manière synchrone ou asynchrone en envoyant une requête HTTP DELETE à l'URL de la ressource de l'agent de données. Avec la suppression réversible, l'agent est supprimé mais peut être récupéré dans un délai de 30 jours.

Synchrone

billing_project = "YOUR-BILLING-PROJECT"
data_agent_id = "data_agent_1"

data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:deleteSync"

data_agent_response = requests.delete(
    data_agent_url, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agent deleted successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Deleting Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

Asynchrone

billing_project = "YOUR-BILLING-PROJECT"
data_agent_id = "data_agent_1"

data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"

data_agent_response = requests.delete(
    data_agent_url, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agent deleted successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Deleting Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

Remplacez les exemples de valeurs comme suit :

  • YOUR-BILLING-PROJECT : ID de votre projet de facturation.
  • data_agent_1 : ID de l'agent de données que vous souhaitez supprimer.

Obtenir une conversation

L'exemple de code suivant montre comment récupérer une conversation existante en envoyant une requête HTTP GET à l'URL de la ressource de la conversation.

billing_project = "YOUR-BILLING-PROJECT"
conversation_id = "conversation_1"

conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations/{conversation_id}"

conversation_response = requests.get(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation fetched successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while fetching conversation: {conversation_response.status_code}")
    print(conversation_response.text)

Remplacez les exemples de valeurs comme suit :

  • YOUR-BILLING-PROJECT : ID de votre projet de facturation.
  • conversation_1 : ID de la conversation que vous souhaitez extraire.

Lister les conversations

L'exemple de code suivant montre comment lister les conversations d'un projet donné en envoyant une requête HTTP GET au point de terminaison conversations.

Par défaut, cette méthode renvoie les conversations que vous avez créées. Les administrateurs (utilisateurs disposant du rôle IAM cloudaicompanion.topicAdmin) peuvent voir toutes les conversations du projet.

billing_project = "YOUR-BILLING-PROJECT"
conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations"

conversation_response = requests.get(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation fetched successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while fetching conversation: {conversation_response.status_code}")
    print(conversation_response.text)

Remplacez YOUR-BILLING-PROJECT par l'ID du projet de facturation dans lequel vous avez activé les API requises.

Lister les messages d'une conversation

L'exemple de code suivant montre comment lister tous les messages d'une conversation en envoyant une requête HTTP GET au point de terminaison messages de la conversation.

Pour lister les messages, vous devez disposer de l'autorisation cloudaicompanion.topics.get sur la conversation.

billing_project = "YOUR-BILLING-PROJECT"

conversation_id = "conversation_1"

conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations/{conversation_id}/messages"

conversation_response = requests.get(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation fetched successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while fetching conversation: {conversation_response.status_code}")
    print(conversation_response.text)

Remplacez les exemples de valeurs comme suit :

  • YOUR-BILLING-PROJECT : ID de votre projet de facturation.
  • conversation_1 : ID de la conversation pour laquelle vous souhaitez lister les messages.

Supprimer une conversation

L'exemple de code suivant montre comment supprimer une conversation en envoyant une requête HTTP DELETE à l'URL de la ressource de la conversation. Les administrateurs (utilisateurs disposant du rôle Identity and Access Management cloudaicompanion.topicAdmin) ou les utilisateurs disposant de l'autorisation IAM cloudaicompanion.topics.delete peuvent supprimer les conversations du projet.

billing_project = "YOUR-BILLING-PROJECT"
conversation_id = "conversation_1"

conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations/{conversation_id}"

conversation_response = requests.delete(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation deleted successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while deleting conversation: {conversation_response.status_code}")
    print(conversation_response.text)

Remplacez les exemples de valeurs comme suit :

  • YOUR-BILLING-PROJECT : ID de votre projet de facturation.
  • conversation_1 : ID de la conversation que vous souhaitez supprimer.

Utiliser l'API pour poser des questions

Une fois que vous avez créé un agent de données et, pour le chat avec état, une conversation, vous pouvez envoyer des requêtes à l'agent.

Lorsque vous envoyez une requête, l'API renvoie un flux d'objets Message. Ce flux peut contenir différents types de messages, y compris du texte, des tableaux de données et des graphiques. Les messages peuvent fournir des informations sur le raisonnement de l'agent, rendre compte de sa progression ou donner la réponse finale. L'objectif de chaque message est indiqué par sa valeur TextType :

  • THOUGHT : affiche le processus de réflexion interne de l'agent lorsqu'il planifie sa réponse à votre requête. Les messages THOUGHT fournissent des informations détaillées sur le processus de raisonnement et de prise de décision de l'agent. Ils se composent de deux parties : parts[0], qui est le résumé de la pensée, et parts[1], qui est le texte complet de la pensée.
  • PROGRESS : indique la progression de l'agent sur une action, comme la récupération de données ou un outil en cours d'appel. Cette valeur n'est renvoyée que pour les sources de données Looker et se compose de deux parties : parts[0] correspond au récapitulatif et parts[1] au texte complet de la progression.
  • FINAL_RESPONSE : fournit la réponse finale à votre requête.

Les exemples de la section suivante utilisent les fonctions d'assistance définies dans Fonctions d'assistance Python pour diffuser les réponses du chat afin de traiter et d'afficher chaque message de la réponse diffusée de l'API. Pour savoir comment afficher ces messages dans une interface utilisateur lorsque vous utilisez des sources de données Looker, consultez Afficher les réponses de l'agent pour les sources de données Looker.

Poser des questions avec le chat avec ou sans état

Vous pouvez interagir avec l'API de différentes manières :

  • Chat avec état : Google Cloud stocke et gère l'historique de la conversation. Le chat avec état est intrinsèquement multitour, car l'API conserve le contexte des messages précédents. Vous n'avez besoin d'envoyer que le message actuel pour chaque tour.
  • Chat sans état : votre application gère l'historique de la conversation. Vous devez inclure les précédents messages pertinents dans chaque nouveau message. Pour obtenir des exemples détaillés de la gestion des conversations multitours en mode sans état, consultez la section Créer une conversation multitour sans état.

Les exemples suivants montrent comment utiliser l'API pour le chat avec état et sans état en effectuant une requête POST au point de terminaison :chat.

Chat avec état

Envoyer une requête de chat avec état et référence à une conversation

L'exemple de code suivant montre comment poser des questions à l'API en utilisant la conversation que vous avez définie lors des étapes précédentes. Cet exemple utilise une fonction d'assistance get_stream pour diffuser la réponse.

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

data_agent_id = "data_agent_1"
conversation_id = "conversation_1"

# Construct the payload
chat_payload = {
    "parent": f"projects/{billing_project}/locations/global",
    "messages": [
        {
            "userMessage": {
                "text": "Make a bar graph for the top 5 states by the total number of airports"
            }
        }
    ],
    "conversation_reference": {
        "conversation": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}",
        "data_agent_context": {
            "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
            # "credentials": looker_credentials
        }
    }
}

# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)

Remplacez les exemples de valeurs comme suit :

  • data_agent_1 : ID de l'agent de données, tel que défini dans l'exemple de bloc de code de la section Créer un agent de données.
  • conversation_1 : identifiant unique de la conversation.
  • La valeur Make a bar graph for the top 5 states by the total number of airports a été utilisée comme exemple de prompt.

Chat sans état

Les exemples suivants montrent comment envoyer des requêtes sans état en utilisant une référence d'agent de données ou un contexte intégré.

Envoyer une requête de chat sans état avec référence à un agent de données

L'exemple de code suivant montre comment poser une question sans état à l'API en utilisant l'agent de données que vous avez défini lors des étapes précédentes. Cet exemple utilise une fonction d'assistance get_stream pour diffuser la réponse.

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

data_agent_id = "data_agent_1"

# Construct the payload
chat_payload = {
    "parent": f"projects/{billing_project}/locations/global",
    "messages": [
        {
            "userMessage": {
                "text": "Make a bar graph for the top 5 states by the total number of airports"
            }
        }
    ],
    "data_agent_context": {
        "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
        # "credentials": looker_credentials
    }
}

# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)

Remplacez les exemples de valeurs comme suit :

  • data_agent_1 : ID de l'agent de données, tel que défini dans l'exemple de bloc de code de la section Créer un agent de données.
  • La valeur Make a bar graph for the top 5 states by the total number of airports a été utilisée comme exemple de prompt.

Envoyer une requête de chat sans état avec un contexte intégré

L'exemple de code suivant montre comment poser une question sans état à l'API à l'aide du contexte intégré. Cet exemple utilise une fonction d'assistance get_stream pour diffuser la réponse et une source de données BigQuery comme exemple.

Vous pouvez également activer l'analyse avancée avec Python en incluant le paramètre options dans la charge utile de la requête. Consultez la page Ressource REST : projects.locations.dataAgents pour en savoir plus sur le paramètre options et les options que vous pouvez configurer pour la conversation.

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"

# Construct the payload
chat_payload = {
    "parent": f"projects/{billing_project}/locations/global",
    "messages": [
        {
            "userMessage": {
                "text": "Make a bar graph for the top 5 states by the total number of airports"
            }
        }
    ],
    "inline_context": {
        "datasource_references": bigquery_data_sources,
          # Optional: To enable advanced analysis with Python, include the following options block:
          "options": {
              "analysis": {
                  "python": {
                      "enabled": True
                  }
              }
          }
    }
}

# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)

L'exemple de code suivant montre comment poser une question sans état à l'API à l'aide du contexte intégré et d'une source de données AlloyDB comme exemple.

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

# The natural language question to ask the data agent
user_prompt = "what is the average loan amount in the US?"  # Replace with your question

# Construct the payload
chat_payload = {
    "parent": f"projects/{billing_project}/locations/global",
    "messages": [
        {
            "userMessage": {
                "text": user_prompt
            }
        }
    ],
    "data_agent_context": {
          "data_agent": f"projects/{billing_project}/locations/global/dataAgents/{data_agent_id}",
    }
}

print(f"Sending prompt to :chat: '{user_prompt}'")
print(f"Endpoint: {chat_url}")
print(f"Payload: {json.dumps(chat_payload, indent=2)}")

get_stream(chat_url, chat_payload)

Créer une conversation multitour sans état

Pour qu'il soit possible de poser des questions complémentaires dans une conversation sans état, votre application doit gérer le contexte de la conversation en envoyant l'intégralité de l'historique des messages à chaque nouvelle requête. Les sections suivantes expliquent comment définir et appeler des fonctions d'assistance pour créer une conversation multitour :

Envoyer des requêtes multitours

La fonction d'assistance multi_turn_Conversation suivante gère le contexte de la conversation en stockant les messages dans une liste. Vous pouvez ainsi poser des questions complémentaires qui s'appuient sur les tours précédents. Dans la charge utile de la fonction, vous pouvez faire référence à un agent de données ou fournir directement la source de données en utilisant le contexte intégré.

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"

# List that is used to track previous turns and is reused across requests
conversation_messages = []

data_agent_id = "data_agent_1"

# Helper function for calling the API
def multi_turn_Conversation(msg):

  userMessage = {
      "userMessage": {
          "text": msg
      }
  }

  # Send a multi-turn request by including previous turns and the new message
  conversation_messages.append(userMessage)

  # Construct the payload
  chat_payload = {
      "parent": f"projects/{billing_project}/locations/global",
      "messages": conversation_messages,
      # Use a data agent reference
      "data_agent_context": {
          "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
          # "credentials": looker_credentials
      },
      # Use inline context
      # "inline_context": {
      #     "datasource_references": bigquery_data_sources,
      # }
  }

  # Call the get_stream_multi_turn helper function to stream the response
  get_stream_multi_turn(chat_url, chat_payload, conversation_messages)

Dans l'exemple précédent, remplacez data_agent_1 par l'ID de l'agent de données, tel que défini dans l'exemple de bloc de code de la section Créer un agent de données.

Vous pouvez appeler la fonction d'assistance multi_turn_Conversation pour chaque tour de conversation. L'exemple de code suivant montre comment envoyer une requête initiale, puis une requête complémentaire qui s'appuie sur la réponse précédente.

# Send first-turn request
multi_turn_Conversation("Which species of tree is most prevalent?")

# Send follow-up-turn request
multi_turn_Conversation("Can you show me the results as a bar chart?")

Dans l'exemple précédent, remplacez les exemples de valeurs comme suit :

  • Which species of tree is most prevalent? : question en langage naturel à envoyer à l'agent de données.
  • Can you show me the results as a bar chart? : question complémentaire qui s'appuie sur la question précédente ou la précise.

Traiter les réponses

La fonction get_stream_multi_turn suivante traite la réponse de l'API qui est diffusée. Cette fonction est semblable à la fonction d'assistance get_stream, mais elle stocke la réponse dans la liste conversation_messages afin d'enregistrer le contexte de la conversation pour le prochain tour.

def get_stream_multi_turn(url, json, conversation_messages):
    s = requests.Session()

    acc = ''

    with s.post(url, json=json, headers=headers, stream=True) as resp:
        for line in resp.iter_lines():
            if not line:
                continue

            decoded_line = str(line, encoding='utf-8')

            if decoded_line == '[{':
                acc = '{'
            elif decoded_line == '}]':
                acc += '}'
            elif decoded_line == ',':
                continue
            else:
                acc += decoded_line

            if not is_json(acc):
                continue

            data_json = json_lib.loads(acc)
            # Store the response that will be used in the next iteration
            conversation_messages.append(data_json)

            if not 'systemMessage' in data_json:
                if 'error' in data_json:
                    handle_error(data_json['error'])
                continue

            if 'text' in data_json['systemMessage']:
                handle_text_response(data_json['systemMessage']['text'])
            elif 'schema' in data_json['systemMessage']:
                handle_schema_response(data_json['systemMessage']['schema'])
            elif 'data' in data_json['systemMessage']:
                handle_data_response(data_json['systemMessage']['data'])
            elif 'chart' in data_json['systemMessage']:
                handle_chart_response(data_json['systemMessage']['chart'])
            else:
                colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter())
                print(colored_json)
            print('\n')
            acc = ''

Exemples de code de bout en bout

Les exemples de code suivants peuvent être développés et contiennent toutes les tâches abordées dans ce guide.

Créer un agent de données pour BigQuery et Looker à l'aide de HTTP et Python

    import json
    import json as json_lib
    import textwrap

    import altair as alt
    import google.auth
    from google.auth.transport.requests import Request
    import IPython
    from IPython.display import display, HTML
    import pandas as pd
    from pygments import formatters, highlight, lexers
    import requests

    from google.colab import auth
    auth.authenticate_user()

    access_token = !gcloud auth application-default print-access-token
    headers = {
        "Authorization": f"Bearer {access_token[0]}",
        "Content-Type": "application/json",
        "x-server-timeout": "300", # Custom timeout up to 600s
    }

    ################### Data source details ###################

    billing_project = "your_billing_project"
    location = "global"
    system_instruction = "Help the user in analyzing their data"


    # BigQuery data source
    bigquery_data_sources = {
        "bq": {
        "tableReferences": [
            {
            "projectId": "bigquery-public-data",
            "datasetId": "san_francisco",
            "tableId": "street_trees"
            }
        ]
        }
    }

    # Looker data source
    looker_credentials = {
        "oauth": {
            "secret": {
            "client_id": "your_looker_client_id",
            "client_secret": "your_looker_client_secret",
            }
        }
    }

    # To use access_token for authentication, uncomment the following looker_credentials code block and comment out the previous looker_credentials code block.
    # looker_credentials = {
    #     "oauth": {
    #         "token": {
    #           "access_token": "your_looker_access_token",
    #         }
    #     }
    # }

    looker_data_source = {
        "looker": {
        "explore_references": [
            {
                "looker_instance_uri": looker_instance_uri,
                "lookml_model": "your_model",
                "explore": "your_explore",
            },
            {
                "looker_instance_uri": looker_instance_uri,
                "lookml_model": "your_model_2",
                "explore": "your_explore_2",
            },
            # Add up to 5 total Explore references
        ],
        # Do not include the following line during agent creation
        # "credentials": looker_credentials
    }

    # Looker Studio data source
    looker_studio_data_source = {
        "studio":{
            "studio_references": [
                {
                  "studio_datasource_id": "studio_datasource_id"
                }
            ]
        }
    }

    ################### Create data agent ###################
    data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents"

    data_agent_id = "data_agent_1"

    data_agent_payload = {
        "name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional
        "description": "This is the description of data_agent.", # Optional

        "data_analytics_agent": {
            "published_context": {
                "datasource_references": bigquery_data_sources,
                "system_instruction": system_instruction,
                # Optional: To enable advanced analysis with Python, include the following options block:
                "options": {
                    "analysis": {
                        "python": {
                            "enabled": True
                        }
                    }
                }
            }
        }
    }

    params = {"data_agent_id": data_agent_id} # Optional

    data_agent_response = requests.post(
        data_agent_url, params=params, json=data_agent_payload, headers=headers
    )

    if data_agent_response.status_code == 200:
        print("Data Agent created successfully!")
        print(json.dumps(data_agent_response.json(), indent=2))
    else:
        print(f"Error creating Data Agent: {data_agent_response.status_code}")
        print(data_agent_response.text)


    ################### Create conversation ###################

    conversation_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/conversations"

    data_agent_id = "data_agent_1"
    conversation_id = "conversation _1"

    conversation_payload = {
        "agents": [
            f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
        ],
        "name": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}"
    }
    params = {
        "conversation_id": conversation_id
    }

    conversation_response = requests.post(conversation_url, headers=headers, params=params, json=conversation_payload)

    if conversation_response.status_code == 200:
        print("Conversation created successfully!")
        print(json.dumps(conversation_response.json(), indent=2))
    else:
        print(f"Error creating Conversation: {conversation_response.status_code}")
        print(conversation_response.text)


    ################### Chat with the API by using conversation (stateful) ####################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

    data_agent_id = "data_agent_1"
    conversation_id = "conversation _1"

    # Construct the payload
    chat_payload = {
        "parent": f"projects/{billing_project}/locations/global",
        "messages": [
            {
                "userMessage": {
                    "text": "Make a bar graph for the top 5 states by the total number of airports"
                }
            }
        ],
        "conversation_reference": {
            "conversation": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}",
            "data_agent_context": {
                "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
                # "credentials": looker_credentials
            }
        }
    }

    # Call the get_stream function to stream the response
    get_stream(chat_url, chat_payload)

    ################### Chat with the API by using dataAgents (stateless) ####################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

    data_agent_id = "data_agent_1"

    # Construct the payload
    chat_payload = {
        "parent": f"projects/{billing_project}/locations/global",
        "messages": [
            {
                "userMessage": {
                    "text": "Make a bar graph for the top 5 states by the total number of airports"
                }
            }
        ],
        "data_agent_context": {
            "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
            # "credentials": looker_credentials
        }
    }

    # Call the get_stream function to stream the response
    get_stream(chat_url, chat_payload)

    ################### Chat with the API by using inline context (stateless) ####################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"

    # Construct the payload
    chat_payload = {
        "parent": f"projects/{billing_project}/locations/global",
        "messages": [
            {
                "userMessage": {
                    "text": "Make a bar graph for the top 5 states by the total number of airports"
                }
            }
        ],
        "inline_context": {
            "datasource_references": bigquery_data_sources,
            # Optional - if wanting to use advanced analysis with python
            "options": {
                "analysis": {
                    "python": {
                        "enabled": True
                    }
                }
            }
        }
    }

    # Call the get_stream function to stream the response
    get_stream(chat_url, chat_payload)

    ################### Multi-turn conversation ###################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"

    # List that is used to track previous turns and is reused across requests
    conversation_messages = []

    data_agent_id = "data_agent_1"

    # Helper function for calling the API
    def multi_turn_Conversation(msg):

      userMessage = {
          "userMessage": {
              "text": msg
          }
      }

      # Send a multi-turn request by including previous turns and the new message
      conversation_messages.append(userMessage)

      # Construct the payload
      chat_payload = {
          "parent": f"projects/{billing_project}/locations/global",
          "messages": conversation_messages,
          # Use a data agent reference
          "data_agent_context": {
              "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
              # "credentials": looker_credentials
          },
          # Use inline context
          # "inline_context": {
          #     "datasource_references": bigquery_data_sources,
          # }
      }

      # Call the get_stream_multi_turn helper function to stream the response
      get_stream_multi_turn(chat_url, chat_payload, conversation_messages)

    # Send first-turn request
    multi_turn_Conversation("Which species of tree is most prevalent?")

    # Send follow-up-turn request
    multi_turn_Conversation("Can you show me the results as a bar chart?")
    

Définir des fonctions d'assistance

L'exemple de code suivant contient des définitions de fonctions d'assistance utilisées dans les exemples de code précédents. Ces fonctions permettent d'analyser les réponses de l'API et d'afficher les résultats.

Fonctions Python d'assistance pour diffuser les réponses du chat

    def is_json(str):
      try:
          json_object = json_lib.loads(str)
      except ValueError as e:
          return False
      return True

    def handle_text_response(resp):
      parts = resp['parts']
      full_text = "".join(parts)
      if "\n" not in full_text and len(full_text) > 80:
        wrapped_text = textwrap.fill(full_text, width=80)
        print(wrapped_text)
      else:
        print(full_text)

    def get_property(data, field_name, default = ''):
      return data[field_name] if field_name in data else default

    def display_schema(data):
      fields = data['fields']
      df = pd.DataFrame({
        "Column": map(lambda field: get_property(field, 'name'), fields),
        "Type": map(lambda field: get_property(field, 'type'), fields),
        "Description": map(lambda field: get_property(field, 'description', '-'), fields),
        "Mode": map(lambda field: get_property(field, 'mode'), fields)
      })
      display(df)

    def display_section_title(text):
      display(HTML('<h2>{}</h2>'.format(text)))

    def format_bq_table_ref(table_ref):
      return '{}.{}.{}'.format(table_ref['projectId'], table_ref['datasetId'], table_ref['tableId'])

    def format_looker_table_ref(table_ref):
      return 'lookmlModel: {}, explore: {}, lookerInstanceUri: {}'.format(table_ref['lookmlModel'], table_ref['explore'], table_ref['lookerInstanceUri'])

    def display_datasource(datasource):
      source_name = ''

      if 'studioDatasourceId' in datasource:
        source_name = datasource['studioDatasourceId']
      elif 'lookerExploreReference' in datasource:
        source_name = format_looker_table_ref(datasource['lookerExploreReference'])
      else:
        source_name = format_bq_table_ref(datasource['bigqueryTableReference'])

      print(source_name)
      display_schema(datasource['schema'])

    def handle_schema_response(resp):
      if 'query' in resp:
        print(resp['query']['question'])
      elif 'result' in resp:
        display_section_title('Schema resolved')
        print('Data sources:')
        for datasource in resp['result']['datasources']:
          display_datasource(datasource)

    def handle_data_response(resp):
      if 'query' in resp:
        query = resp['query']
        display_section_title('Retrieval query')
        print('Query name: {}'.format(query['name']))
        if 'question' in query:
          print('Question: {}'.format(query['question']))
        if 'datasources' in query:
          print('Data sources:')
          for datasource in query['datasources']:
            display_datasource(datasource)
      elif 'generatedSql' in resp:
        display_section_title('SQL generated')
        print(resp['generatedSql'])
      elif 'result' in resp:
        display_section_title('Data retrieved')

        fields = map(lambda field: get_property(field, 'name'), resp['result']['schema']['fields'])
        dict = {}

        for field in fields:
          dict[field] = map(lambda el: get_property(el, field), resp['result']['data'])

        display(pd.DataFrame(dict))

    def handle_chart_response(resp):
      if 'query' in resp:
        print(resp['query']['instructions'])
      elif 'result' in resp:
        vegaConfig = resp['result']['vegaConfig']
        alt.Chart.from_json(json_lib.dumps(vegaConfig)).display();

    def handle_error(resp):
      display_section_title('Error')
      print('Code: {}'.format(resp['code']))
      print('Message: {}'.format(resp['message']))

    def get_stream(url, json):
      s = requests.Session()

      acc = ''

      with s.post(url, json=json, headers=headers, stream=True) as resp:
        for line in resp.iter_lines():
          if not line:
            continue

          decoded_line = str(line, encoding='utf-8')

          if decoded_line == '[{':
            acc = '{'
          elif decoded_line == '}]':
            acc += '}'
          elif decoded_line == ',':
            continue
          else:
            acc += decoded_line

          if not is_json(acc):
            continue

          data_json = json_lib.loads(acc)

          if not 'systemMessage' in data_json:
            if 'error' in data_json:
                handle_error(data_json['error'])
            continue

          if 'text' in data_json['systemMessage']:
            handle_text_response(data_json['systemMessage']['text'])
          elif 'schema' in data_json['systemMessage']:
            handle_schema_response(data_json['systemMessage']['schema'])
          elif 'data' in data_json['systemMessage']:
            handle_data_response(data_json['systemMessage']['data'])
          elif 'chart' in data_json['systemMessage']:
            handle_chart_response(data_json['systemMessage']['chart'])
          else:
            colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter())
            print(colored_json)
            print('\n')
            acc = ''

    def get_stream_multi_turn(url, json, conversation_messages):
        s = requests.Session()

        acc = ''

        with s.post(url, json=json, headers=headers, stream=True) as resp:
            for line in resp.iter_lines():
                if not line:
                    continue

                decoded_line = str(line, encoding='utf-8')

                if decoded_line == '[{':
                    acc = '{'
                elif decoded_line == '}]':
                    acc += '}'
                elif decoded_line == ',':
                    continue
                else:
                    acc += decoded_line

                if not is_json(acc):
                    continue

                data_json = json_lib.loads(acc)
                # Store the response that will be used in the next iteration
                conversation_messages.append(data_json)

                if not 'systemMessage' in data_json:
                    if 'error' in data_json:
                        handle_error(data_json['error'])
                    continue

                if 'text' in data_json['systemMessage']:
                    handle_text_response(data_json['systemMessage']['text'])
                elif 'schema' in data_json['systemMessage']:
                    handle_schema_response(data_json['systemMessage']['schema'])
                elif 'data' in data_json['systemMessage']:
                    handle_data_response(data_json['systemMessage']['data'])
                elif 'chart' in data_json['systemMessage']:
                    handle_chart_response(data_json['systemMessage']['chart'])
                else:
                    colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter())
                    print(colored_json)
                print('\n')
                acc = ''