Java 適用的 Cloud 用戶端程式庫中的串流呼叫

相較於簡單的請求/回應,串流呼叫可實現更複雜的互動模式,透過單一連線傳送或接收多則訊息。

Java 適用的 Cloud 用戶端程式庫支援三種串流呼叫:

  • 伺服器串流:伺服器會將一連串回應傳回給您。
  • 用戶端串流:您會將一連串要求傳送至伺服器。
  • 雙向串流:您可以將一連串要求傳送至伺服器,伺服器也會回傳一連串回應。

串流實作項目是以 gRPC-Java 實作項目為模型,適用於伺服器、用戶端和雙向串流。

支援各種傳輸方式的串流

使用 gRPC 時,系統完全支援串流,但僅部分支援 HttpJson。如需串流支援服務,請參閱下表。

串流類型 gRPC HttpJson
伺服器串流 有權限 有權限
用戶端串流 有權限 不支援
雙向串流 有權限 不支援

gRPC 和 HttpJson 都支援一元呼叫 (非串流)。

判斷串流類型

如要判斷通話的串流類型,請檢查傳回的 Callable 類型:

  • ServerStreamingCallable伺服器串流。
  • ClientStreamingCallable用戶端串流。
  • BidiStreamingCallable雙向串流。

舉例來說,使用 Java-Aiplatform 和 Java-Speech:

// Server Streaming
ServerStreamingCallable<ReadTensorboardBlobDataRequest, ReadTensorboardBlobDataResponse> callable = aiplatformClient.readTensorboardBlobDataCallable();

// Bidirectional Streaming
BidiStreamingCallable<StreamingRecognizeRequest, StreamingRecognizeResponse> callable = speechClient.streamingRecognizeCallable();

撥打串流通話

視您使用伺服器串流或雙向串流而定,進行串流呼叫的方式會有所不同。

伺服器串流

伺服器串流不需要額外導入。ServerStream 類別可讓您疊代回應串流。以 Java-Maps-Routing 為例,以下說明如何呼叫伺服器串流 API:

try (RoutesClient routesClient = RoutesClient.create()) {
  ServerStreamingCallable<ComputeRouteMatrixRequest, RouteMatrixElement> computeRouteMatrix =
    routesClient.computeRouteMatrixCallable();  
  ServerStream<RouteMatrixElement> stream = computeRouteMatrix.call(
    ComputeRouteMatrixRequest.newBuilder().build());
  for (RouteMatrixElement element : stream) {
    // Do something with response
  }
}

在本例中,用戶端會傳送單一 ComputeRouteMatrixRequest,並接收一連串的回應。

雙向串流

雙向串流需要額外實作才能發出呼叫。 以 Java-Speech 為例,下列步驟提供雙向串流呼叫的實作範例。

首先,請使用下列程式碼做為指引,實作 ResponseObserver 介面:

class BidiResponseObserver<T> implements ResponseObserver<T> {
  private final List<T> responses = new ArrayList<>();
  private final SettableApiFuture<List<T>> future = SettableApiFuture.create();

  @Override
  public void onStart(StreamController controller) {
    // no-op
  }

  @Override
  public void onResponse(T response) {
    responses.add(response);
  }

  @Override
  public void onError(Throwable t) {
    future.setException(t);
  }

  @Override
  public void onComplete() {
    future.set(responses);
  }

  public SettableApiFuture<List<T>> getFuture() {
    return future;
  }
}

接下來,請按照下列步驟進行:

  1. 建立觀察器執行個體: java BidiResponseObserver<StreamingRecognizeResponse> responseObserver = new BidiResponseObserver<>();
  2. 將觀察程式傳遞至可呼叫項目: java ClientStream<EchoRequest> clientStream = speechClient.streamingRecognizeCallable().splitCall(responseObserver);
  3. 將要求傳送至伺服器,並在完成後關閉串流: java clientStream.send(StreamingRecognizeRequest.newBuilder().build()); clientStream.send(StreamingRecognizeRequest.newBuilder().build()); // ... other requests ... clientStream.send(StreamingRecognizeRequest.newBuilder().build()); clientStream.closeSend();
  4. 逐一查看回覆: ```java List responses = responseObserver.getFuture().get();

    for (StreamingRecognizeResponse response : responses) { // Do something with response } ```

不支援的串流錯誤

如果用戶端程式庫同時支援 gRPC 和 HTTP/JSON 傳輸,可能會不小心將用戶端程式庫設定為叫用不支援的串流呼叫。舉例來說,下列設定顯示 Java-Speech 的 HttpJson 用戶端發出雙向串流呼叫:

// SpeechClient is configured to use HttpJson
try (SpeechClient speechClient = SpeechClient.create(SpeechSettings.newHttpJsonBuilder().build())) {
  // Bidi Callable is not supported in HttpJson
  BidiStreamingCallable<StreamingRecognizeRequest, StreamingRecognizeResponse> callable = speechClient.streamingRecognizeCallable();
  ...
}

這不會導致編譯錯誤,但會顯示為執行階段錯誤:

Not implemented: streamingRecognizeCallable(). REST transport is not implemented for this method yet.
Important: The client library MUST be configured with gRPC to use client or bidirectional streaming.