Java 用 Cloud クライアント ライブラリのストリーミング呼び出し

ストリーミング呼び出しでは、単純なリクエスト/レスポンスよりも複雑なインタラクション パターンが可能になり、単一の接続で複数のメッセージを送受信できます。

Java 用 Cloud クライアント ライブラリは、次の 3 種類のストリーミング呼び出しをサポートしています。

  • サーバー ストリーミング: サーバーがレスポンスのストリームを返送します。
  • クライアント ストリーミング: リクエストのストリームをサーバーに送信します。
  • 双方向ストリーミング: リクエストのストリームをサーバーに送信し、サーバーからレスポンスのストリームを返信できます。

ストリーミング実装は、サーバー、クライアント、双方向ストリーミングの gRPC-Java 実装をモデル化しています。

トランスポート間のストリーミングのサポート

ストリーミングは、gRPC を使用する場合は完全にサポートされますが、HttpJson では部分的にのみサポートされます。ストリーミングのサポートについては、次の表をご覧ください。

ストリーミングのタイプ gRPC HttpJson
サーバー ストリーミング サポート対象 サポート対象
クライアント ストリーミング サポート対象 サポート対象外
双方向ストリーミング サポート対象 サポート対象外

単項呼び出し(非ストリーミング)は、gRPC と HttpJson の両方でサポートされています。

ストリーミングのタイプを特定する

呼び出しのストリーミング タイプを判断するには、返された Callable タイプを確認します。

  • ServerStreamingCallable: サーバー ストリーミング。
  • ClientStreamingCallable: クライアント ストリーミング。
  • BidiStreamingCallable: 双方向ストリーミング。

たとえば、Java-Aiplatform と Java-Speech を使用する場合:

// Server Streaming
ServerStreamingCallable<ReadTensorboardBlobDataRequest, ReadTensorboardBlobDataResponse> callable = aiplatformClient.readTensorboardBlobDataCallable();

// Bidirectional Streaming
BidiStreamingCallable<StreamingRecognizeRequest, StreamingRecognizeResponse> callable = speechClient.streamingRecognizeCallable();

ストリーミング呼び出しを行う

ストリーミング呼び出しを行う方法は、サーバー ストリーミングと双方向ストリーミングのどちらを使用するかによって異なります。

サーバー ストリーミング

サーバー ストリーミングには追加の実装は必要ありません。ServerStream クラスを使用すると、レスポンスのストリームを反復処理できます。Java-Maps-Routing を例として、次のコードは Server Streaming API を呼び出す方法を示しています。

try (RoutesClient routesClient = RoutesClient.create()) {
  ServerStreamingCallable<ComputeRouteMatrixRequest, RouteMatrixElement> computeRouteMatrix =
    routesClient.computeRouteMatrixCallable();  
  ServerStream<RouteMatrixElement> stream = computeRouteMatrix.call(
    ComputeRouteMatrixRequest.newBuilder().build());
  for (RouteMatrixElement element : stream) {
    // Do something with response
  }
}

この例では、クライアントは単一の ComputeRouteMatrixRequest を送信し、レスポンスのストリームを受信します。

双方向ストリーミング

双方向ストリーミングでは、呼び出しを行うために追加の実装が必要です。Java-Speech を例として、次の手順では双方向ストリーミング呼び出しを行う実装例を示します。

まず、次のコードをガイドラインとして使用して、ResponseObserver インターフェースを実装します。

class BidiResponseObserver<T> implements ResponseObserver<T> {
  private final List<T> responses = new ArrayList<>();
  private final SettableApiFuture<List<T>> future = SettableApiFuture.create();

  @Override
  public void onStart(StreamController controller) {
    // no-op
  }

  @Override
  public void onResponse(T response) {
    responses.add(response);
  }

  @Override
  public void onError(Throwable t) {
    future.setException(t);
  }

  @Override
  public void onComplete() {
    future.set(responses);
  }

  public SettableApiFuture<List<T>> getFuture() {
    return future;
  }
}

次に、以下の手順を行います。

  1. オブザーバーのインスタンスを作成します。 java BidiResponseObserver<StreamingRecognizeResponse> responseObserver = new BidiResponseObserver<>();
  2. オブザーバーを呼び出し可能オブジェクトに渡します。 java ClientStream<EchoRequest> clientStream = speechClient.streamingRecognizeCallable().splitCall(responseObserver);
  3. リクエストをサーバーに送信し、完了したらストリームを閉じます。 java clientStream.send(StreamingRecognizeRequest.newBuilder().build()); clientStream.send(StreamingRecognizeRequest.newBuilder().build()); // ... other requests ... clientStream.send(StreamingRecognizeRequest.newBuilder().build()); clientStream.closeSend();
  4. レスポンスを反復処理します。 ```java List responses = responseObserver.getFuture().get();

    ```java for (StreamingRecognizeResponse response : responses) { // Do something with response } ```

サポートされていないストリーミング エラー

gRPC と HTTP/JSON の両方のトランスポートをサポートするクライアント ライブラリでは、サポートされていないストリーミング呼び出しを呼び出すようにクライアント ライブラリを誤って構成する可能性があります。たとえば、次の構成は、Java-Speech の HttpJson クライアントが双方向ストリーミング呼び出しを行うことを示しています。

// SpeechClient is configured to use HttpJson
try (SpeechClient speechClient = SpeechClient.create(SpeechSettings.newHttpJsonBuilder().build())) {
  // Bidi Callable is not supported in HttpJson
  BidiStreamingCallable<StreamingRecognizeRequest, StreamingRecognizeResponse> callable = speechClient.streamingRecognizeCallable();
  ...
}

これはコンパイル エラーにはなりませんが、実行時エラーとして表示されます。

Not implemented: streamingRecognizeCallable(). REST transport is not implemented for this method yet.
Important: The client library MUST be configured with gRPC to use client or bidirectional streaming.