Java 版 Cloud 客户端库上的流式调用

与简单的请求/响应相比,流式调用可实现更复杂的互动模式,从而允许通过单个连接发送或接收多条消息。

Java 版 Cloud 客户端库支持三种类型的流式调用:

  • 服务器流式传输:服务器向您发送一系列响应。
  • 客户端流式传输:您向服务器发送一系列请求。
  • 双向流式传输:您可以向服务器发送一系列请求,而服务器可以向您发回一系列响应。

流式实现以 gRPC-Java 实现为模型,适用于服务器、客户端和双向流式传输。

跨传输的流式传输支持

使用 gRPC 时,系统完全支持流式传输;但对于 HttpJson,系统仅部分支持流式传输。如需了解支持的流式传输服务,请参阅下表。

流媒体服务类型 gRPC HttpJson
服务器流式传输 支持 支持
客户端流式传输 支持 不支持
双向流式传输 支持 不支持

gRPC 和 HttpJson 均支持一元调用(非流式传输)。

确定流式传输的类型

如需确定调用的流式传输类型,请检查返回的 Callable 类型:

  • ServerStreamingCallable:服务器流式传输。
  • ClientStreamingCallable:客户端流式传输。
  • BidiStreamingCallable:双向流式传输。

例如,使用 Java-Aiplatform 和 Java-Speech:

// Server Streaming
ServerStreamingCallable<ReadTensorboardBlobDataRequest, ReadTensorboardBlobDataResponse> callable = aiplatformClient.readTensorboardBlobDataCallable();

// Bidirectional Streaming
BidiStreamingCallable<StreamingRecognizeRequest, StreamingRecognizeResponse> callable = speechClient.streamingRecognizeCallable();

发出流式调用

发出流式调用会有所不同,具体取决于您使用的是服务器流式传输还是双向流式传输。

服务器流式传输

服务器流式传输不需要额外的实现。借助 ServerStream 类,您可以迭代响应流。以 Java-Maps-Routing 为例,以下代码展示了如何调用服务器流式传输 API:

try (RoutesClient routesClient = RoutesClient.create()) {
  ServerStreamingCallable<ComputeRouteMatrixRequest, RouteMatrixElement> computeRouteMatrix =
    routesClient.computeRouteMatrixCallable();  
  ServerStream<RouteMatrixElement> stream = computeRouteMatrix.call(
    ComputeRouteMatrixRequest.newBuilder().build());
  for (RouteMatrixElement element : stream) {
    // Do something with response
  }
}

在此示例中,客户端发送单个 ComputeRouteMatrixRequest 并接收一系列响应。

双向流式传输

双向流式传输需要额外的实现才能进行调用。 以 Java-Speech 为例,以下步骤提供了一个双向流式调用的示例实现。

首先,使用以下代码作为指南来实现 ResponseObserver 接口:

class BidiResponseObserver<T> implements ResponseObserver<T> {
  private final List<T> responses = new ArrayList<>();
  private final SettableApiFuture<List<T>> future = SettableApiFuture.create();

  @Override
  public void onStart(StreamController controller) {
    // no-op
  }

  @Override
  public void onResponse(T response) {
    responses.add(response);
  }

  @Override
  public void onError(Throwable t) {
    future.setException(t);
  }

  @Override
  public void onComplete() {
    future.set(responses);
  }

  public SettableApiFuture<List<T>> getFuture() {
    return future;
  }
}

然后,请按下列步骤操作:

  1. 创建观察者实例: java BidiResponseObserver<StreamingRecognizeResponse> responseObserver = new BidiResponseObserver<>();
  2. 将观察器传递给可调用对象: java ClientStream<EchoRequest> clientStream = speechClient.streamingRecognizeCallable().splitCall(responseObserver);
  3. 向服务器发送请求,并在完成后关闭数据流: java clientStream.send(StreamingRecognizeRequest.newBuilder().build()); clientStream.send(StreamingRecognizeRequest.newBuilder().build()); // ... other requests ... clientStream.send(StreamingRecognizeRequest.newBuilder().build()); clientStream.closeSend();
  4. 遍历响应: ```java List responses = responseObserver.getFuture().get();

    for (StreamingRecognizeResponse response : responses) { // Do something with response } ```

不支持的流式传输错误

对于同时支持 gRPC 和 HTTP/JSON 传输的客户端库,可能会意外配置客户端库以调用不受支持的流式传输调用。例如,以下配置显示了 Java-Speech 的 HttpJson 客户端进行双向流式调用的情况:

// SpeechClient is configured to use HttpJson
try (SpeechClient speechClient = SpeechClient.create(SpeechSettings.newHttpJsonBuilder().build())) {
  // Bidi Callable is not supported in HttpJson
  BidiStreamingCallable<StreamingRecognizeRequest, StreamingRecognizeResponse> callable = speechClient.streamingRecognizeCallable();
  ...
}

这不会导致编译错误,但会显示为运行时错误:

Not implemented: streamingRecognizeCallable(). REST transport is not implemented for this method yet.
Important: The client library MUST be configured with gRPC to use client or bidirectional streaming.