与简单的请求/响应相比,流式调用可实现更复杂的互动模式,从而允许通过单个连接发送或接收多条消息。
Java 版 Cloud 客户端库支持三种类型的流式调用:
- 服务器流式传输:服务器向您发送一系列响应。
- 客户端流式传输:您向服务器发送一系列请求。
- 双向流式传输:您可以向服务器发送一系列请求,而服务器可以向您发回一系列响应。
流式实现以 gRPC-Java 实现为模型,适用于服务器、客户端和双向流式传输。
跨传输的流式传输支持
使用 gRPC 时,系统完全支持流式传输;但对于 HttpJson,系统仅部分支持流式传输。如需了解支持的流式传输服务,请参阅下表。
| 流媒体服务类型 | gRPC | HttpJson |
|---|---|---|
| 服务器流式传输 | 支持 | 支持 |
| 客户端流式传输 | 支持 | 不支持 |
| 双向流式传输 | 支持 | 不支持 |
gRPC 和 HttpJson 均支持一元调用(非流式传输)。
确定流式传输的类型
如需确定调用的流式传输类型,请检查返回的 Callable 类型:
ServerStreamingCallable:服务器流式传输。ClientStreamingCallable:客户端流式传输。BidiStreamingCallable:双向流式传输。
例如,使用 Java-Aiplatform 和 Java-Speech:
// Server Streaming
ServerStreamingCallable<ReadTensorboardBlobDataRequest, ReadTensorboardBlobDataResponse> callable = aiplatformClient.readTensorboardBlobDataCallable();
// Bidirectional Streaming
BidiStreamingCallable<StreamingRecognizeRequest, StreamingRecognizeResponse> callable = speechClient.streamingRecognizeCallable();
发出流式调用
发出流式调用会有所不同,具体取决于您使用的是服务器流式传输还是双向流式传输。
服务器流式传输
服务器流式传输不需要额外的实现。借助 ServerStream 类,您可以迭代响应流。以 Java-Maps-Routing 为例,以下代码展示了如何调用服务器流式传输 API:
try (RoutesClient routesClient = RoutesClient.create()) {
ServerStreamingCallable<ComputeRouteMatrixRequest, RouteMatrixElement> computeRouteMatrix =
routesClient.computeRouteMatrixCallable();
ServerStream<RouteMatrixElement> stream = computeRouteMatrix.call(
ComputeRouteMatrixRequest.newBuilder().build());
for (RouteMatrixElement element : stream) {
// Do something with response
}
}
在此示例中,客户端发送单个 ComputeRouteMatrixRequest 并接收一系列响应。
双向流式传输
双向流式传输需要额外的实现才能进行调用。 以 Java-Speech 为例,以下步骤提供了一个双向流式调用的示例实现。
首先,使用以下代码作为指南来实现 ResponseObserver 接口:
class BidiResponseObserver<T> implements ResponseObserver<T> {
private final List<T> responses = new ArrayList<>();
private final SettableApiFuture<List<T>> future = SettableApiFuture.create();
@Override
public void onStart(StreamController controller) {
// no-op
}
@Override
public void onResponse(T response) {
responses.add(response);
}
@Override
public void onError(Throwable t) {
future.setException(t);
}
@Override
public void onComplete() {
future.set(responses);
}
public SettableApiFuture<List<T>> getFuture() {
return future;
}
}
然后,请按下列步骤操作:
- 创建观察者实例:
java BidiResponseObserver<StreamingRecognizeResponse> responseObserver = new BidiResponseObserver<>(); - 将观察器传递给可调用对象:
java ClientStream<EchoRequest> clientStream = speechClient.streamingRecognizeCallable().splitCall(responseObserver); - 向服务器发送请求,并在完成后关闭数据流:
java clientStream.send(StreamingRecognizeRequest.newBuilder().build()); clientStream.send(StreamingRecognizeRequest.newBuilder().build()); // ... other requests ... clientStream.send(StreamingRecognizeRequest.newBuilder().build()); clientStream.closeSend(); 遍历响应: ```java List
responses = responseObserver.getFuture().get(); for (StreamingRecognizeResponse response : responses) { // Do something with response } ```
不支持的流式传输错误
对于同时支持 gRPC 和 HTTP/JSON 传输的客户端库,可能会意外配置客户端库以调用不受支持的流式传输调用。例如,以下配置显示了 Java-Speech 的 HttpJson 客户端进行双向流式调用的情况:
// SpeechClient is configured to use HttpJson
try (SpeechClient speechClient = SpeechClient.create(SpeechSettings.newHttpJsonBuilder().build())) {
// Bidi Callable is not supported in HttpJson
BidiStreamingCallable<StreamingRecognizeRequest, StreamingRecognizeResponse> callable = speechClient.streamingRecognizeCallable();
...
}
这不会导致编译错误,但会显示为运行时错误:
Not implemented: streamingRecognizeCallable(). REST transport is not implemented for this method yet.
Important: The client library MUST be configured with gRPC to use client or bidirectional streaming.