Verbinden und Daten in BigQuery speichern

Wenn Sie Ihrer Gemini Enterprise Agent Platform Vision-App einen BigQuery-Connector hinzufügen, werden alle Ausgaben des verbundenen App-Modells in die Zieltabelle aufgenommen.

Sie können entweder eine eigene BigQuery-Tabelle erstellen und diese Tabelle angeben, wenn Sie der App einen BigQuery-Connector hinzufügen, oder die Tabelle automatisch von der Gemini Enterprise Agent Platform Vision-App-Plattform erstellen lassen.

Automatische Tabellenerstellung

Wenn Sie die Tabelle automatisch von der Gemini Enterprise Agent Platform Vision-App-Plattform erstellen lassen, können Sie diese Option angeben, wenn Sie den BigQuery-Connector-Knoten hinzufügen.

Die folgenden Dataset- und Tabellenbedingungen gelten, wenn Sie die automatische Tabellenerstellung verwenden möchten:

  • Dataset: Der automatisch erstellte Datasetname ist visionai_dataset.
  • Tabelle: Der automatisch erstellte Tabellenname ist visionai_dataset.APPLICATION_ID.
  • Fehlerbehandlung:

    • Wenn die Tabelle mit demselben Namen im selben Dataset vorhanden ist, erfolgt keine automatische Erstellung.

Console

  1. Öffnen Sie den Tab Anwendungen des Gemini Enterprise Agent Platform Vision-Dashboards.

    Tab „Anwendungen“ aufrufen

  2. Wählen Sie in der Liste neben dem Namen Ihrer Anwendung App ansehen aus.

  3. Wählen Sie auf der Seite des Anwendungs-Builders im Bereich Connectors (Connectors) die Option BigQuery aus.

  4. Lassen Sie das Feld BigQuery-Pfad leer.

    Tabellenpfad in der Benutzeroberfläche leer gelassen

  5. Ändern Sie alle anderen Einstellungen.

REST UND BEFEHLSZEILE

Wenn die App-Plattform ein Tabellenschema ableiten soll, verwenden Sie das createDefaultTableIfNotExists Feld von BigQueryConfig wenn Sie eine App erstellen oder aktualisieren.

Tabelle manuell erstellen und angeben

Wenn Sie Ihre Ausgabetabelle manuell verwalten möchten, muss die Tabelle das erforderliche Schema als Teilmenge des Tabellenschemas haben.

Wenn die vorhandene Tabelle inkompatible Schemas hat, wird die Bereitstellung abgelehnt.

Standardschema verwenden

Wenn Sie das Standardschema für Tabellen mit Modellausgaben verwenden, muss Ihre Tabelle nur die folgenden erforderlichen Spalten enthalten. Sie können den folgenden Schematext direkt kopieren, wenn Sie die BigQuery-Tabelle erstellen. Weitere Informationen zum Erstellen einer BigQuery-Tabelle finden Sie unter Tabellen erstellen und verwenden. Weitere Informationen zur Schemaspezifikation beim Erstellen einer Tabelle finden Sie unter Schema angeben.

Verwenden Sie den folgenden Text, um das Schema zu beschreiben, wenn Sie eine Tabelle erstellen. Informationen zur Verwendung des JSON Spaltentyps ("type": "JSON") finden Sie unter Mit JSON-Daten in Standard-SQL arbeiten. Der Spaltentyp „JSON“ wird für die Annotation-Abfrage empfohlen. Sie können auch "type" : "STRING" verwenden.

[
  {
    "name": "ingestion_time",
    "type": "TIMESTAMP",
    "mode": "REQUIRED"
  },
 {
   "name": "application",
   "type": "STRING",
   "mode": "REQUIRED"
 },
 {
   "name": "instance",
   "type": "STRING",
   "mode": "REQUIRED"
 },
 {
   "name": "node",
   "type": "STRING",
   "mode": "REQUIRED"
 },
 {
   "name": "annotation",
   "type": "JSON",
   "mode": "REQUIRED"
 }
]

Google Cloud Console

  1. Rufen Sie in der Google Cloud Console die Seite BigQuery auf.

    BigQuery aufrufen

  2. Wählen Sie Ihr Projekt aus.

  3. Wählen Sie „Weitere Optionen“ aus.

  4. Klicken Sie auf Tabelle erstellen.

  5. Aktivieren Sie im Abschnitt „Schema“ die Option Als Text bearbeiten.

Standardbild für Schema

gcloud

Im folgenden Beispiel wird zuerst die JSON-Anfragedatei erstellt und dann der gcloud alpha bq tables create Befehl verwendet.

  1. Erstellen Sie zuerst die JSON-Anfragedatei:

    echo "{
    \"schema\": [
        {
          \"name\": \"ingestion_time\",
          \"type\": \"TIMESTAMP\",
          \"mode\": \"REQUIRED\"
        },
        {
          \"name\": \"application\",
          \"type\": \"STRING\",
          \"mode\": \"REQUIRED\"
        },
        {
          \"name\": \"instance\",
          \"type\": \"STRING\",
          \"mode\": \"REQUIRED\"
        },
        {
          \"name\": \"node\",
          \"type\": \"STRING\",
          \"mode\": \"REQUIRED\"
        },
        {
          \"name\": \"annotation\",
          \"type\": \"JSON\",
          \"mode\": \"REQUIRED\"
        }
    ]
    }
    " >> bigquery_schema.json
  2. Senden Sie den gcloud-Befehl. Ersetzen Sie die folgenden Werte:

    • TABLE_NAME: Die ID der Tabelle oder die voll qualifizierte Kennung für die Tabelle.

    • DATASET: Die ID des BigQuery-Datasets.

    gcloud alpha bq tables create TABLE_NAME \
    --dataset=DATASET \
    --schema-file=./bigquery_schema.json
    

Beispiel für BigQuery-Zeilen, die von einer Gemini Enterprise Agent Platform Vision-App generiert wurden:

ingestion_time application instance node annotation
2022-05-11 23:3211.911378 UTC my_application 5 just-one-node {"bytesFields": ["Ig1qdXN0LW9uZS1ub2RIGgE1Eg5teV9hcHBsaWNhdGlvbgjS+YnOzdj3Ag=="],"displayNames":["hello","world"],"ids":["12345","34567"]}
2022-05-11 23:3211.911338 UTC my_application 1 just-one-node {"bytesFields": ["Ig1qdXN0LW9uZS1ub2RIGgExEg5teV9hcHBsaWNhdGlvbgiq+YnOzdj3Ag=="],"displayNames":["hello","world"],"ids":["12345","34567"]}
2022-05-11 23:3211.911313 UTC my_application 4 just-one-node {"bytesFields": ["Ig1qdXN0LW9uZS1ub2RIGgE0Eg5teV9hcHBsaWNhdGlvbgiR+YnOzdj3Ag=="],"displayNames":["hello","world"],"ids":["12345","34567"]}
2022-05-11 23:3212.235327 UTC my_application 4 just-one-node {"bytesFields": ["Ig1qdXN0LW9uZS1ub2RIGgE0Eg5teV9hcHBsaWNhdGlvbgi/3J3Ozdj3Ag=="],"displayNames":["hello","world"],"ids":["12345","34567"]}

Benutzerdefiniertes Schema verwenden

Wenn das Standardschema für Ihren Anwendungsfall nicht geeignet ist, können Sie Cloud Run-Funktionen verwenden, um BigQuery-Zeilen mit einem benutzerdefinierten Schema zu generieren. Wenn Sie ein benutzerdefiniertes Schema verwenden, gibt es keine Voraussetzung für das BigQuery-Tabellenschema.

App-Diagramm mit ausgewähltem BigQuery-Knoten

App-Diagramm mit BigQuery verbunden

Der BigQuery-Connector kann mit jedem Modell verbunden werden, das video- oder protobasierte Annotationen ausgibt:

  • Bei Videoeingaben extrahiert der BigQuery-Connector nur die Metadaten, die im Streamheader gespeichert sind, und nimmt diese Daten als andere Modellanmerkungsausgaben in BigQuery auf. Das Video selbst wird nicht gespeichert.
  • Wenn Ihr Stream keine Metadaten enthält, wird nichts in BigQuery gespeichert.

Tabellendaten abfragen

Mit dem Standard-BigQuery-Tabellenschema können Sie nach dem Füllen der Tabelle mit Daten leistungsstarke Analysen durchführen.

Beispielabfragen

Sie können die folgenden Beispielabfragen in BigQuery verwenden, um Erkenntnisse aus Gemini Enterprise Agent Platform Vision-Modellen zu gewinnen.

Mit BigQuery können Sie beispielsweise mit Daten aus dem Modell „Personen-/Fahrzeugerkennung“ mit der folgenden Abfrage eine zeitbasierte Kurve für die maximale Anzahl der erkannten Personen pro Minute erstellen:

WITH
 nested3 AS(
 WITH
   nested2 AS (
   WITH
     nested AS (
     SELECT
       t.ingestion_time AS ingestion_time,
       JSON_QUERY_ARRAY(t.annotation.stats["fullFrameCount"]) AS counts
     FROM
       `PROJECT_ID.DATASET_NAME.TABLE_NAME` AS t)
   SELECT
     ingestion_time,
     e
   FROM
     nested,
     UNNEST(nested.counts) AS e)
 SELECT
   STRING(TIMESTAMP_TRUNC(nested2.ingestion_time, MINUTE, "America/Los_Angeles"),"America/Los_Angeles") AS time,
   IFNULL(INT64(nested2.e["count"]), 0) AS person_count
 FROM
   nested2
 WHERE
   JSON_VALUE(nested2.e["entity"]["labelString"])="Person")
SELECT
 time,
 MAX(person_count)
FROM
 nested3
GROUP BY
 time

Ebenso können Sie mit BigQuery und der Funktion zur Zählung der Überquerungslinien des Modells „Auslastungsanalyse“ eine Abfrage erstellen, die die Gesamtzahl der Fahrzeuge zählt, die die Überquerungslinie pro Minute passieren:

WITH
 nested4 AS (
 WITH
   nested3 AS (
   WITH
     nested2 AS (
     WITH
       nested AS (
       SELECT
         t.ingestion_time AS ingestion_time,
         JSON_QUERY_ARRAY(t.annotation.stats["crossingLineCounts"]) AS lines
       FROM
         `PROJECT_ID.DATASET_NAME.TABLE_NAME` AS t)
     SELECT
       nested.ingestion_time,
       JSON_QUERY_ARRAY(line["positiveDirectionCounts"]) AS entities
     FROM
       nested,
       UNNEST(nested.lines) AS line
     WHERE
       JSON_VALUE(line.annotation.id) = "LINE_ANNOTATION_ID")
   SELECT
     ingestion_time,
     entity
   FROM
     nested2,
     UNNEST(nested2.entities) AS entity )
 SELECT
   STRING(TIMESTAMP_TRUNC(nested3.ingestion_time, MINUTE, "America/Los_Angeles"),"America/Los_Angeles") AS time,
   IFNULL(INT64(nested3.entity["count"]), 0) AS vehicle_count
 FROM
   nested3
 WHERE
   JSON_VALUE(nested3.entity["entity"]["labelString"])="Vehicle" )
SELECT
 time,
 SUM(vehicle_count)
FROM
 nested4
GROUP BY
 time

Abfrage ausführen

Nachdem Sie Ihre Google Standard-SQL-Abfrage formatiert haben, können Sie die Console verwenden, um die Abfrage auszuführen:

Console

  1. Öffnen Sie in der Google Cloud Console die Seite „BigQuery“.

    BigQuery aufrufen

  2. Wählen Sie neben dem Namen Ihres Datasets Maximieren und dann den Namen Ihrer Tabelle aus.

  3. Klicken Sie in der Tabellendetailansicht auf Neue Abfrage erstellen.

    Neue Abfrage erstellen

  4. Geben Sie im Textbereich Abfrageeditor eine Google Standard-SQL-Abfrage ein. Beispielabfragen finden Sie unter Beispielabfragen.

  5. Optional: Klicken Sie auf Mehr und dann auf Abfrageeinstellungen , um den Ort der Datenverarbeitung zu ändern. Klicken Sie unter Verarbeitungsstandort auf Automatische Auswahl und wählen Sie den Standort Ihrer Daten aus. Klicken Sie abschließend auf Save (Speichern), um die Abfrageeinstellungen zu aktualisieren.

  6. Klicken Sie auf Run (Ausführen).

Dies erstellt einen Abfragejob, der die Ausgabe in eine temporäre Tabelle schreibt.

Cloud Run-Funktionen integrieren

Sie können Cloud Run-Funktionen verwenden, um eine zusätzliche Datenverarbeitung mit Ihrer benutzerdefinierten BigQuery-Aufnahme auszulösen. So verwenden Sie Cloud Run-Funktionen für Ihre benutzerdefinierte BigQuery-Aufnahme:

  • Wenn Sie die Google Cloud Console verwenden, wählen Sie die entsprechende Cloud-Funktion im Drop-down-Menü jedes verbundenen Modells aus.

    Cloud Functions-Funktionsbild auswählen

  • Wenn Sie die Gemini Enterprise Agent Platform Vision API verwenden, fügen Sie dem cloud_function_mapping Feld von BigQueryConfig im BigQuery-Knoten ein Schlüssel-Wert-Paar hinzu. Der Schlüssel ist der Name des BigQuery-Knotens und der Wert ist der HTTP-Trigger der Zielfunktion.

Damit Sie Cloud Run-Funktionen mit Ihrer benutzerdefinierten BigQuery-Aufnahme verwenden können, muss die Funktion die folgenden Anforderungen erfüllen:

  • Die Cloud Run-Funktionsinstanz muss erstellt werden, bevor Sie den BigQuery-Knoten erstellen.
  • Die Gemini Enterprise Agent Platform Vision API erwartet, dass eine AppendRowsRequest-Annotation von Cloud Run-Funktionen zurückgegeben wird.
  • Sie müssen das Feld proto_rows.writer_schema für alle CloudFunction Antworten festlegen. write_stream kann ignoriert werden.

Beispiel für die Integration von Cloud Run-Funktionen

Im folgenden Beispiel wird gezeigt, wie Sie die Ausgabe des Knotens „Auslastungszählung“ (OccupancyCountPredictionResult) parsen und daraus ein Tabellenschema für ingestion_time, person_count und vehicle_count extrahieren.

Das Ergebnis des folgenden Beispiels ist eine BigQuery-Tabelle mit dem Schema:

[
  {
    "name": "ingestion_time",
    "type": "TIMESTAMP",
    "mode": "REQUIRED"
  },
  {
    "name": "person_count",
    "type": "INTEGER",
    "mode": "NULLABLE"
  },
      {
    "name": "vehicle_count",
    "type": "INTEGER",
    "mode": "NULLABLE"
  },
]

Verwenden Sie den folgenden Code, um diese Tabelle zu erstellen:

  1. Definieren Sie ein Proto (z. B. test_table_schema.proto) für die Tabellenfelder, die Sie schreiben möchten:

    syntax = "proto3";
    
    package visionai.testing;
    
    message TestTableSchema {
      int64 ingestion_time = 1;
      int32 person_count = 2;
      int32 vehicle_count = 3;
    }
    
  2. Kompilieren Sie die Protodatei, um die Protocol Buffer-Python-Datei zu generieren:

    protoc -I=./ --python_out=./ ./test_table_schema.proto
    
  3. Importieren Sie die generierte Python-Datei und schreiben Sie die Cloud-Funktion:

    Python

    import base64
    import sys
    
    from flask import jsonify
    import functions_framework
    from google.protobuf import descriptor_pb2
    from google.protobuf.json_format import MessageToDict
    import test_table_schema_pb2
    
    def table_schema():
      schema = descriptor_pb2.DescriptorProto()
      test_table_schema_pb2.DESCRIPTOR.message_types_by_name[
          'TestTableSchema'].CopyToProto(schema)
      return schema
    
    def bigquery_append_row_request(row):
      append_row_request = {}
      append_row_request['protoRows'] = {
          'writerSchema': {
              'protoDescriptor': MessageToDict(table_schema())
          },
          'rows': {
              'serializedRows':
                  base64.b64encode(row.SerializeToString()).decode('utf-8')
          }
      }
      return append_row_request
    
    @functions_framework.http
    def hello_http(request):
      request_json = request.get_json(silent=False)
      annotations = []
      payloads = []
      if request_json and 'annotations' in request_json:
        for annotation_with_timestamp in request_json['annotations']:
          row = test_table_schema_pb2.TestTableSchema()
          row.person_count = 0
          row.vehicle_count = 0
          if 'ingestionTimeMicros' in annotation_with_timestamp:
            row.ingestion_time = int(
                annotation_with_timestamp['ingestionTimeMicros'])
          if 'annotation' in annotation_with_timestamp:
            annotation = annotation_with_timestamp['annotation']
            if 'stats' in annotation:
              stats = annotation['stats']
              for count in stats['fullFrameCount']:
                if count['entity']['labelString'] == 'Person':
                  if 'count' in count:
                    row.person_count = count['count']
                elif count['entity']['labelString'] == 'Vehicle':
                  if 'count' in count:
                    row.vehicle_count = count['count']
          payloads.append(bigquery_append_row_request(row))
      for payload in payloads:
        annotations.append({'annotation': payload})
      return jsonify(annotations=annotations)
  4. Wenn Sie Ihre Abhängigkeiten in Cloud Run-Funktionen einbeziehen möchten, müssen Sie auch die generierte Datei test_table_schema_pb2.py hochladen und requirements.txt ähnlich wie folgt angeben:

    functions-framework==3.*
    click==7.1.2
    cloudevents==1.2.0
    deprecation==2.1.0
    Flask==1.1.2
    gunicorn==20.0.4
    itsdangerous==1.1.0
    Jinja2==2.11.2
    MarkupSafe==1.1.1
    pathtools==0.1.2
    watchdog==1.0.2
    Werkzeug==1.0.1
    protobuf==3.12.2
    
  5. Stellen Sie die Cloud-Funktion bereit und legen Sie den entsprechenden HTTP-Trigger in der BigQueryConfig fest.