Stream-to-Stream-Joins in kontinuierlichen Abfragen verwenden

Wenn Sie Unterstützung für diese Funktion anfordern oder Feedback geben möchten, senden Sie eine E-Mail an bq-continuous-queries-feedback@google.com.

Kontinuierliche Abfragen unterstützen JOIN als zustandsbehafteten Vorgang. Mit zustandsbehafteten Vorgängen können kontinuierliche Abfragen komplexe Analysen durchführen, bei denen Informationen über mehrere Zeilen oder Zeitintervalle hinweg beibehalten werden müssen. Mit dieser Funktion können Sie Ereignisse aus verschiedenen Streams korrelieren, indem Sie die erforderlichen Daten während der Ausführung der Abfrage im Arbeitsspeicher speichern. Weitere Informationen zu zustandsbehafteten Vorgängen finden Sie unter Unterstützte zustandsbehaftete Vorgänge.

Stream-to-Stream-Joins sind Join-Vorgänge zwischen zwei oder mehr Tabellen, in die zeitbezogene Daten aufgenommen werden.

Unterstützte JOIN-Typen

Die folgenden JOIN-Typen werden von kontinuierlichen Abfragen unterstützt:

  • Stream-to-Stream-JOIN: Ein Join-Vorgang zwischen zwei oder mehr Tabellen, in die zeitbezogene Daten aufgenommen werden.
  • `INNER JOIN`

Nicht unterstützte JOIN-Typen

Die folgenden JOIN-Typen werden von kontinuierlichen Abfragen nicht unterstützt:

  • Stream-to-Static-Table-Joins: Ein Join, bei dem mindestens eine verknüpfte Tabelle eine statische Tabelle ist, in die keine zeitbezogenen Daten aufgenommen werden. Ein Beispiel für eine statische Tabelle ist eine Dimensionstabelle.
  • Andere Formen von JOIN-Vorgängen als INNER.
  • Joins ohne zeitbezogene JOIN-Klauseln.

Daten aus mehreren Streams verknüpfen

Die folgende Abfrage zeigt, wie Sie eine Tabelle mit Taxifahrten mit einer Tabelle mit Taxianfragen verknüpfen, das nächste verfügbare Taxi für den Anfragenden innerhalb eines Zeitfensters von 5 Minuten ermitteln und diese Daten in eine andere BigQuery-Tabelle exportieren:

INSERT INTO
 `real_time_taxi_streaming.matched_rides`
SELECT
 requests.timestamp AS request_time,
 requests.request_id,
 taxis.taxi_id,
 ST_DISTANCE(
   ST_GEOGPOINT(requests.longitude, requests.latitude),
   ST_GEOGPOINT(taxis.longitude, taxis.latitude)
   ) AS distance_in_meters,
 taxis.timestamp AS taxi_available_time
FROM
 APPENDS (TABLE `real_time_taxi_streaming.ride_requests`,
   CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) AS requests
INNER JOIN
 APPENDS (TABLE `real_time_taxi_streaming.taxirides`,
   CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) AS taxis
ON
 requests.geohash = taxis.geohash
WHERE
 taxis.ride_status = 'available'
 AND taxis._CHANGE_TIMESTAMP BETWEEN(requests._CHANGE_TIMESTAMP - INTERVAL 5 MINUTE) AND requests._CHANGE_TIMESTAMP
 AND ST_DWITHIN(
   ST_GEOGPOINT(requests.longitude, requests.latitude),
   ST_GEOGPOINT(taxis.longitude, taxis.latitude),
   2000 -- Distance in meters
   );

Überlegungen zu Joins

In den folgenden Abschnitten werden die notwendigen Überlegungen bei der Ausführung eines Stream-to-Stream-Joins beschrieben.

Beschränkungen

  • Es werden nur INNER JOIN-Vorgänge unterstützt. Andere Formen wie LEFT oder FULL OUTER werden nicht unterstützt.
  • Auf jeder Seite des INNER JOIN-Vorgangs muss eine Startzeit für die kontinuierliche Abfrage angegeben werden.
  • Neben einem Join-Schlüssel (z. B. table1.user_id = table2.user_id) muss die JOIN-Klausel eine Bedingung enthalten, um die Zeitstempelspalte auf ein konstantes Intervall zu beschränken. Diese Bedingung begrenzt, wie lange das System wartet, bis ein übereinstimmendes Ereignis im anderen Stream eintrifft. Sie können beispielsweise angeben, dass ein Ereignis aus einem Stream nur mit einem Ereignis aus einem anderen Stream verknüpft werden kann, wenn die Zeitstempel innerhalb eines Intervalls von 5 Minuten liegen. Sie sind nicht auf symmetrische Intervalle beschränkt. Sie können beispielsweise ein 5-Minuten-Intervall auf einer Seite der Join-Bedingung und ein 1-Stunden-Intervall auf der anderen Seite verwenden.
  • Beim Starten einer kontinuierlichen Abfrage mit einem Stream-to-Stream-Join wird nur die Funktion APPENDS unterstützt. Die Funktion CHANGES wird nicht unterstützt.
  • Die BigQuery-Systemzeitspalte, definiert durch _CHANGE_TIMESTAMP, ist die einzige unterstützte Zeitstempelspalte für Join-Vorgänge. Nutzerdefinierte Spalten werden nicht unterstützt.
  • Bei Verwendung in Verbindung mit Fensteraggregationen müssen alle dokumentierten Einschränkungen für Fensteraggregationen eingehalten werden.

Kosten

Kontinuierliche Abfragen in BigQuery werden basierend auf der während der Ausführung des Jobs verbrauchten Rechenkapazität (Slots) abgerechnet. Dieses rechenbasierte Modell gilt auch für zustandsbehaftete Vorgänge wie Joins. Da Joins erfordern, dass das System den „Zustand“ speichert, während die Abfrage aktiv ist, verbrauchen sie zusätzliche Slot-Ressourcen. Mehr Kontext oder Daten, die in einem Join gespeichert sind, z. B. wenn Sie längere Zeitintervalle in der JOIN- oder WHERE-Klausel verwenden, erfordern die Beibehaltung eines größeren Zustands, was zu einer höheren Slot-Auslastung führt.

Nächste Schritte