Conectar y almacenar datos en BigQuery

Cuando agregues un conector de BigQuery a tu app de Vision de Agent Platform de Gemini Enterprise, todos los resultados del modelo de la app conectada se transferirán a la tabla de destino.

Puedes crear tu propia tabla de BigQuery y especificarla cuando agregues un conector de BigQuery a la app, o bien permitir que la plataforma de apps de Vision de Agent Platform de Gemini Enterprise cree la tabla automáticamente.

Creación automática de tablas

Si permites que la plataforma de apps de Vision de Agent Platform de Gemini Enterprise cree automáticamente la tabla, puedes especificar esta opción cuando agregues el nodo del conector de BigQuery.

Si deseas usar la creación automática de tablas, se aplican las siguientes condiciones del conjunto de datos y la tabla:

  • Conjunto de datos: El nombre del conjunto de datos creado automáticamente es visionai_dataset.
  • Tabla: El nombre de la tabla creada automáticamente es visionai_dataset.APPLICATION_ID.
  • Manejo de errores:

    • Si existe una tabla con el mismo nombre en el mismo conjunto de datos, no se produce la creación automática.

Console

  1. Abre la pestaña Applications del panel de Vision de Gemini Enterprise Agent Platform.

    Ir a la pestaña Aplicaciones

  2. Selecciona Ver app junto al nombre de tu aplicación en la lista.

  3. En la página del creador de aplicaciones, selecciona BigQuery en la sección Conectores.

  4. Deja vacío el campo Ruta de BigQuery.

    Ruta de acceso a la tabla especificada en blanco en la IU

  5. Cambia cualquier otro parámetro de configuración.

LÍNEA DE REST Y CMD

Para permitir que la plataforma de apps infiera un esquema de tabla, usa el campo createDefaultTableIfNotExists de BigQueryConfig cuando crees o actualices una app.

Crea y especifica una tabla de forma manual

Si deseas administrar tu tabla de salida de forma manual, esta debe tener el esquema requerido como un subconjunto del esquema de la tabla.

Si la tabla existente tiene esquemas incompatibles, se rechazará la implementación.

Usa el esquema predeterminado

Si usas el esquema predeterminado para las tablas de resultados del modelo, asegúrate de que tu tabla solo contenga las siguientes columnas obligatorias. Puedes copiar directamente el siguiente texto del esquema cuando crees la tabla de BigQuery. Para obtener información más detallada sobre cómo crear una tabla de BigQuery, consulta Crea y usa tablas. Para obtener más información sobre cómo especificar un esquema cuando creas una tabla, consulta Especifica un esquema.

Usa el siguiente texto para describir el esquema cuando crees una tabla. Para obtener información sobre el uso del tipo de columna JSON ("type": "JSON"), consulta Trabaja con datos JSON en SQL estándar. Se recomienda el tipo de columna JSON para la consulta de anotaciones. También puedes usar "type" : "STRING".

[
  {
    "name": "ingestion_time",
    "type": "TIMESTAMP",
    "mode": "REQUIRED"
  },
 {
   "name": "application",
   "type": "STRING",
   "mode": "REQUIRED"
 },
 {
   "name": "instance",
   "type": "STRING",
   "mode": "REQUIRED"
 },
 {
   "name": "node",
   "type": "STRING",
   "mode": "REQUIRED"
 },
 {
   "name": "annotation",
   "type": "JSON",
   "mode": "REQUIRED"
 }
]

Consola de Google Cloud

  1. En la consola de Google Cloud , ve a la página BigQuery.

    Ir a BigQuery

  2. Elige tu proyecto.

  3. Selecciona más opciones .

  4. Haz clic en Crear tabla.

  5. En la sección "Esquema", habilita Editar como texto.

Imagen del esquema predeterminado

gcloud

En el siguiente ejemplo, primero se crea el archivo JSON de la solicitud y, luego, se usa el comando gcloud alpha bq tables create.

  1. Primero, crea el archivo JSON de la solicitud:

    echo "{
    \"schema\": [
        {
          \"name\": \"ingestion_time\",
          \"type\": \"TIMESTAMP\",
          \"mode\": \"REQUIRED\"
        },
        {
          \"name\": \"application\",
          \"type\": \"STRING\",
          \"mode\": \"REQUIRED\"
        },
        {
          \"name\": \"instance\",
          \"type\": \"STRING\",
          \"mode\": \"REQUIRED\"
        },
        {
          \"name\": \"node\",
          \"type\": \"STRING\",
          \"mode\": \"REQUIRED\"
        },
        {
          \"name\": \"annotation\",
          \"type\": \"JSON\",
          \"mode\": \"REQUIRED\"
        }
    ]
    }
    " >> bigquery_schema.json
  2. Envía el comando gcloud. Realiza los siguientes reemplazos:

    • TABLE_NAME: Es el ID de la tabla o el identificador completamente calificado de la tabla.

    • DATASET: Es el ID del conjunto de datos de BigQuery.

    gcloud alpha bq tables create TABLE_NAME \
    --dataset=DATASET \
    --schema-file=./bigquery_schema.json
    

Ejemplo de filas de BigQuery generadas por una app de Vision de Agent Platform de Gemini Enterprise:

ingestion_time aplicación instancia nodo anotación
2022-05-11 23:3211.911378 UTC my_application 5 just-one-node {"bytesFields": ["Ig1qdXN0LW9uZS1ub2RIGgE1Eg5teV9hcHBsaWNhdGlvbgjS+YnOzdj3Ag=="],"displayNames":["hello","world"],"ids":["12345","34567"]}
2022-05-11 23:3211.911338 UTC my_application 1 just-one-node {"bytesFields": ["Ig1qdXN0LW9uZS1ub2RIGgExEg5teV9hcHBsaWNhdGlvbgiq+YnOzdj3Ag=="],"displayNames":["hello","world"],"ids":["12345","34567"]}
2022-05-11 23:3211.911313 UTC my_application 4 just-one-node {"bytesFields": ["Ig1qdXN0LW9uZS1ub2RIGgE0Eg5teV9hcHBsaWNhdGlvbgiR+YnOzdj3Ag=="],"displayNames":["hello","world"],"ids":["12345","34567"]}
2022-05-11 23:3212.235327 UTC my_application 4 just-one-node {"bytesFields": ["Ig1qdXN0LW9uZS1ub2RIGgE0Eg5teV9hcHBsaWNhdGlvbgi/3J3Ozdj3Ag=="],"displayNames":["hello","world"],"ids":["12345","34567"]}

Usa un esquema personalizado

Si el esquema predeterminado no funciona para tu caso de uso, puedes usar Cloud Run Functions para generar filas de BigQuery con un esquema definido por el usuario. Si usas un esquema personalizado, no hay requisitos previos para el esquema de la tabla de BigQuery.

Gráfico de la app con el nodo de BigQuery seleccionado

grafo de la app conectado a BigQuery

El conector de BigQuery se puede conectar a cualquier modelo que genere anotaciones basadas en video o en protos:

  • En el caso de la entrada de video, el conector de BigQuery solo extrae los datos de metadatos almacenados en el encabezado de la transmisión y los transfiere a BigQuery como otras salidas de anotación del modelo. El video en sí no se almacena.
  • Si tu transmisión no contiene metadatos, no se almacenará nada en BigQuery.

Consultar datos de tablas

Con el esquema de tabla de BigQuery predeterminado, puedes realizar análisis potentes después de que la tabla se complete con datos.

Consultas de muestra

Puedes usar las siguientes consultas de ejemplo en BigQuery para obtener estadísticas a partir de los modelos de visión de Agent Platform de Gemini Enterprise.

Por ejemplo, puedes usar BigQuery para generar una curva basada en el tiempo que muestre la cantidad máxima de personas detectadas por minuto con los datos del modelo de detector de personas o vehículos con la siguiente consulta:

WITH
 nested3 AS(
 WITH
   nested2 AS (
   WITH
     nested AS (
     SELECT
       t.ingestion_time AS ingestion_time,
       JSON_QUERY_ARRAY(t.annotation.stats["fullFrameCount"]) AS counts
     FROM
       `PROJECT_ID.DATASET_NAME.TABLE_NAME` AS t)
   SELECT
     ingestion_time,
     e
   FROM
     nested,
     UNNEST(nested.counts) AS e)
 SELECT
   STRING(TIMESTAMP_TRUNC(nested2.ingestion_time, MINUTE, "America/Los_Angeles"),"America/Los_Angeles") AS time,
   IFNULL(INT64(nested2.e["count"]), 0) AS person_count
 FROM
   nested2
 WHERE
   JSON_VALUE(nested2.e["entity"]["labelString"])="Person")
SELECT
 time,
 MAX(person_count)
FROM
 nested3
GROUP BY
 time

De manera similar, puedes usar BigQuery y la función de recuento de líneas de cruce del modelo de análisis de ocupación para crear una consulta que cuente la cantidad total de vehículos que cruzan la línea por minuto:

WITH
 nested4 AS (
 WITH
   nested3 AS (
   WITH
     nested2 AS (
     WITH
       nested AS (
       SELECT
         t.ingestion_time AS ingestion_time,
         JSON_QUERY_ARRAY(t.annotation.stats["crossingLineCounts"]) AS lines
       FROM
         `PROJECT_ID.DATASET_NAME.TABLE_NAME` AS t)
     SELECT
       nested.ingestion_time,
       JSON_QUERY_ARRAY(line["positiveDirectionCounts"]) AS entities
     FROM
       nested,
       UNNEST(nested.lines) AS line
     WHERE
       JSON_VALUE(line.annotation.id) = "LINE_ANNOTATION_ID")
   SELECT
     ingestion_time,
     entity
   FROM
     nested2,
     UNNEST(nested2.entities) AS entity )
 SELECT
   STRING(TIMESTAMP_TRUNC(nested3.ingestion_time, MINUTE, "America/Los_Angeles"),"America/Los_Angeles") AS time,
   IFNULL(INT64(nested3.entity["count"]), 0) AS vehicle_count
 FROM
   nested3
 WHERE
   JSON_VALUE(nested3.entity["entity"]["labelString"])="Vehicle" )
SELECT
 time,
 SUM(vehicle_count)
FROM
 nested4
GROUP BY
 time

Ejecuta tu consulta.

Después de darle formato a tu consulta en SQL estándar de Google, puedes usar la consola para ejecutarla:

Console

  1. En la consola de Google Cloud , abre la página de BigQuery.

    Ir a BigQuery

  2. Selecciona Expand junto al nombre de tu conjunto de datos y, luego, selecciona el nombre de tu tabla.

  3. En la vista de detalles de la tabla, haz clic en Redactar consulta nueva.

    Redactar consulta nueva

  4. Ingresa una consulta en SQL estándar de Google en el área de texto del Editor de consultas. Para ver ejemplos de consultas, consulta Consultas de ejemplo.

  5. Opcional: Para cambiar la ubicación de procesamiento de datos, haz clic en Más y, luego, en Configuración de consulta. En Ubicación de procesamiento, haz clic en Selección automática y elige la ubicación de tus datos. Por último, haz clic en Guardar para actualizar la configuración de la consulta.

  6. Haz clic en Ejecutar.

Esto crea un trabajo de consulta que escribe el resultado en una tabla temporal.

Integración de Cloud Run Functions

Puedes usar funciones de Cloud Run que activen el procesamiento de datos adicional con tu transferencia personalizada de BigQuery. Para usar Cloud Run Functions en tu transferencia personalizada de BigQuery, haz lo siguiente:

  • Cuando uses la consola de Google Cloud , selecciona la función de Cloud correspondiente en el menú desplegable de cada modelo conectado.

    Selecciona la imagen de Cloud Function

  • Cuando uses la API de Vision de Agent Platform de Gemini Enterprise, agrega un par clave-valor al campo cloud_function_mapping de BigQueryConfig en el nodo de BigQuery. La clave es el nombre del nodo de BigQuery y el valor es el activador HTTP de la función de destino.

Para usar Cloud Run Functions con tu transferencia personalizada de BigQuery, la función debe cumplir con los siguientes requisitos:

  • La instancia de Cloud Run Functions debe crearse antes de que crees el nodo de BigQuery.
  • La API de Vision de Agent Platform de Gemini Enterprise espera recibir una anotación AppendRowsRequest que devuelvan las funciones de Cloud Run.
  • Debes configurar el campo proto_rows.writer_schema para todas las respuestas de CloudFunction. Se puede ignorar write_stream.

Ejemplo de integración de Cloud Run Functions

En el siguiente ejemplo, se muestra cómo analizar el resultado del nodo de recuento de ocupación (OccupancyCountPredictionResult) y extraer de él un esquema de tabla ingestion_time, person_count y vehicle_count.

El resultado del siguiente ejemplo es una tabla de BigQuery con el siguiente esquema:

[
  {
    "name": "ingestion_time",
    "type": "TIMESTAMP",
    "mode": "REQUIRED"
  },
  {
    "name": "person_count",
    "type": "INTEGER",
    "mode": "NULLABLE"
  },
      {
    "name": "vehicle_count",
    "type": "INTEGER",
    "mode": "NULLABLE"
  },
]

Usa el siguiente código para crear esta tabla:

  1. Define un .proto (por ejemplo, test_table_schema.proto) para los campos de la tabla que deseas escribir:

    syntax = "proto3";
    
    package visionai.testing;
    
    message TestTableSchema {
      int64 ingestion_time = 1;
      int32 person_count = 2;
      int32 vehicle_count = 3;
    }
    
  2. Compila el archivo .proto para generar el archivo de Python del búfer de protocolo:

    protoc -I=./ --python_out=./ ./test_table_schema.proto
    
  3. Importa el archivo de Python generado y escribe la función de Cloud.

    Python

    import base64
    import sys
    
    from flask import jsonify
    import functions_framework
    from google.protobuf import descriptor_pb2
    from google.protobuf.json_format import MessageToDict
    import test_table_schema_pb2
    
    def table_schema():
      schema = descriptor_pb2.DescriptorProto()
      test_table_schema_pb2.DESCRIPTOR.message_types_by_name[
          'TestTableSchema'].CopyToProto(schema)
      return schema
    
    def bigquery_append_row_request(row):
      append_row_request = {}
      append_row_request['protoRows'] = {
          'writerSchema': {
              'protoDescriptor': MessageToDict(table_schema())
          },
          'rows': {
              'serializedRows':
                  base64.b64encode(row.SerializeToString()).decode('utf-8')
          }
      }
      return append_row_request
    
    @functions_framework.http
    def hello_http(request):
      request_json = request.get_json(silent=False)
      annotations = []
      payloads = []
      if request_json and 'annotations' in request_json:
        for annotation_with_timestamp in request_json['annotations']:
          row = test_table_schema_pb2.TestTableSchema()
          row.person_count = 0
          row.vehicle_count = 0
          if 'ingestionTimeMicros' in annotation_with_timestamp:
            row.ingestion_time = int(
                annotation_with_timestamp['ingestionTimeMicros'])
          if 'annotation' in annotation_with_timestamp:
            annotation = annotation_with_timestamp['annotation']
            if 'stats' in annotation:
              stats = annotation['stats']
              for count in stats['fullFrameCount']:
                if count['entity']['labelString'] == 'Person':
                  if 'count' in count:
                    row.person_count = count['count']
                elif count['entity']['labelString'] == 'Vehicle':
                  if 'count' in count:
                    row.vehicle_count = count['count']
          payloads.append(bigquery_append_row_request(row))
      for payload in payloads:
        annotations.append({'annotation': payload})
      return jsonify(annotations=annotations)
  4. Para incluir tus dependencias en Cloud Run Functions, también debes subir el archivo test_table_schema_pb2.py generado y especificar requirements.txt de manera similar a la siguiente:

    functions-framework==3.*
    click==7.1.2
    cloudevents==1.2.0
    deprecation==2.1.0
    Flask==1.1.2
    gunicorn==20.0.4
    itsdangerous==1.1.0
    Jinja2==2.11.2
    MarkupSafe==1.1.1
    pathtools==0.1.2
    watchdog==1.0.2
    Werkzeug==1.0.1
    protobuf==3.12.2
    
  5. Implementa la Cloud Function y configura el activador HTTP correspondiente en BigQueryConfig.