Streaming-Aufrufe in Cloud-Clientbibliotheken für Java

Streaming-Aufrufe ermöglichen komplexere Interaktionsmuster als einfache Anfragen/Antworten, da über eine einzelne Verbindung mehrere Nachrichten gesendet oder empfangen werden können.

Cloud-Clientbibliotheken für Java unterstützen drei Arten von Streamingaufrufen:

  • Serverstreaming:Der Server sendet einen Stream von Antworten an Sie zurück.
  • Client-Streaming:Sie senden einen Stream von Anfragen an den Server.
  • Bidirektionales Streaming:Sie können einen Stream von Anfragen an den Server senden und der Server kann einen Stream von Antworten an Sie zurücksenden.

Die Streaming-Implementierungen sind an die gRPC-Java-Implementierungen für Server-, Client- und bidirektionales Streaming angelehnt.

Streaming-Unterstützung für alle Transportarten

Streaming wird bei Verwendung von gRPC vollständig unterstützt, bei HttpJson jedoch nur teilweise. Weitere Informationen zur Unterstützung von Streaming finden Sie in der folgenden Tabelle.

Streamingtyp gRPC HttpJson
Server-Streaming Unterstützt Unterstützt
Client-Streaming Unterstützt Nicht unterstützt
Bidirektionales Streaming Unterstützt Nicht unterstützt

Unäre Aufrufe (nicht Streaming) werden sowohl für gRPC als auch für HttpJson unterstützt.

Art des Streamings bestimmen

Um den Streamingtyp des Anrufs zu ermitteln, prüfen Sie den zurückgegebenen Typ Callable:

  • ServerStreamingCallable:Serverstreaming.
  • ClientStreamingCallable:Client-Streaming.
  • BidiStreamingCallable:Bidirektionales Streaming.

Beispiel: Java-Aiplatform und Java-Speech verwenden:

// Server Streaming
ServerStreamingCallable<ReadTensorboardBlobDataRequest, ReadTensorboardBlobDataResponse> callable = aiplatformClient.readTensorboardBlobDataCallable();

// Bidirectional Streaming
BidiStreamingCallable<StreamingRecognizeRequest, StreamingRecognizeResponse> callable = speechClient.streamingRecognizeCallable();

Streamingaufrufe ausführen

Wie ein Streamingaufruf erfolgt, hängt davon ab, ob Sie Server-Streaming oder bidirektionales Streaming verwenden.

Server-Streaming

Für das Server-Streaming ist keine zusätzliche Implementierung erforderlich. Mit der Klasse ServerStream können Sie den Stream von Antworten durchlaufen. Das folgende Beispiel zeigt, wie die Server Streaming API mit Java-Maps-Routing aufgerufen wird:

try (RoutesClient routesClient = RoutesClient.create()) {
  ServerStreamingCallable<ComputeRouteMatrixRequest, RouteMatrixElement> computeRouteMatrix =
    routesClient.computeRouteMatrixCallable();  
  ServerStream<RouteMatrixElement> stream = computeRouteMatrix.call(
    ComputeRouteMatrixRequest.newBuilder().build());
  for (RouteMatrixElement element : stream) {
    // Do something with response
  }
}

In diesem Beispiel sendet der Client ein einzelnes ComputeRouteMatrixRequest und empfängt einen Stream von Antworten.

Bidirektionales Streaming

Für bidirektionales Streaming ist eine zusätzliche Implementierung erforderlich, um den Anruf zu starten. Anhand von Java-Speech wird in den folgenden Schritten eine Beispielimplementierung für einen bidirektionalen Streaming-Aufruf gezeigt.

Implementieren Sie zuerst die ResponseObserver-Schnittstelle. Verwenden Sie dazu den folgenden Code als Richtlinie:

class BidiResponseObserver<T> implements ResponseObserver<T> {
  private final List<T> responses = new ArrayList<>();
  private final SettableApiFuture<List<T>> future = SettableApiFuture.create();

  @Override
  public void onStart(StreamController controller) {
    // no-op
  }

  @Override
  public void onResponse(T response) {
    responses.add(response);
  }

  @Override
  public void onError(Throwable t) {
    future.setException(t);
  }

  @Override
  public void onComplete() {
    future.set(responses);
  }

  public SettableApiFuture<List<T>> getFuture() {
    return future;
  }
}

Führen Sie dann folgende Schritte aus:

  1. Erstellen Sie eine Instanz des Observers: java BidiResponseObserver<StreamingRecognizeResponse> responseObserver = new BidiResponseObserver<>();
  2. Übergeben Sie den Observer an das aufrufbare Objekt: java ClientStream<EchoRequest> clientStream = speechClient.streamingRecognizeCallable().splitCall(responseObserver);
  3. Senden Sie die Anfragen an den Server und schließen Sie den Stream, wenn Sie fertig sind: java clientStream.send(StreamingRecognizeRequest.newBuilder().build()); clientStream.send(StreamingRecognizeRequest.newBuilder().build()); // ... other requests ... clientStream.send(StreamingRecognizeRequest.newBuilder().build()); clientStream.closeSend();
  4. Antworten durchlaufen: ```java List responses = responseObserver.getFuture().get();

    for (StreamingRecognizeResponse response : responses) { // Do something with response } ```

Nicht unterstützte Streamingfehler

Bei Clientbibliotheken, die sowohl gRPC- als auch HTTP/JSON-Transports unterstützen, kann es passieren, dass die Clientbibliothek versehentlich so konfiguriert wird, dass ein nicht unterstützter Streamingaufruf erfolgt. Die folgende Konfiguration zeigt beispielsweise, wie der HttpJson-Client von Java-Speech einen bidirektionalen Streamingaufruf ausführt:

// SpeechClient is configured to use HttpJson
try (SpeechClient speechClient = SpeechClient.create(SpeechSettings.newHttpJsonBuilder().build())) {
  // Bidi Callable is not supported in HttpJson
  BidiStreamingCallable<StreamingRecognizeRequest, StreamingRecognizeResponse> callable = speechClient.streamingRecognizeCallable();
  ...
}

Dies führt nicht zu einem Kompilierungsfehler, sondern zu einem Laufzeitfehler:

Not implemented: streamingRecognizeCallable(). REST transport is not implemented for this method yet.
Important: The client library MUST be configured with gRPC to use client or bidirectional streaming.