Java용 Cloud 클라이언트 라이브러리에서 스트림 호출

스트리밍 호출을 사용하면 간단한 요청/응답보다 더 복잡한 상호작용 패턴이 가능해져 단일 연결을 통해 여러 메시지를 전송하거나 수신할 수 있습니다.

자바용 Cloud 클라이언트 라이브러리는 다음 세 가지 유형의 스트리밍 호출을 지원합니다.

  • 서버 스트리밍: 서버가 응답 스트림을 다시 보냅니다.
  • 클라이언트 스트리밍: 요청 스트림을 서버에 전송합니다.
  • 양방향 스트리밍: 서버에 요청 스트림을 전송할 수 있으며 서버는 응답 스트림을 다시 전송할 수 있습니다.

스트리밍 구현은 서버, 클라이언트, 양방향 스트리밍을 위한 gRPC-Java 구현을 기반으로 모델링됩니다.

전송 간 스트리밍 지원

스트리밍은 gRPC를 사용하는 경우 완전히 지원되지만 HttpJson의 경우 부분적으로만 지원됩니다. 스트리밍 지원은 다음 표를 참고하세요.

스트리밍 유형 gRPC HttpJson
서버 스트리밍 지원됨 지원됨
클라이언트 스트리밍 지원됨 지원되지 않음
양방향 스트리밍 지원됨 지원되지 않음

단항 호출 (비스트리밍)은 gRPC와 HttpJson 모두에서 지원됩니다.

스트리밍 유형 확인

호출의 스트리밍 유형을 확인하려면 반환된 Callable 유형을 확인하세요.

  • ServerStreamingCallable: 서버 스트리밍
  • ClientStreamingCallable: 클라이언트 스트리밍입니다.
  • BidiStreamingCallable: 양방향 스트리밍입니다.

예를 들어 Java-Aiplatform 및 Java-Speech를 사용하는 경우:

// Server Streaming
ServerStreamingCallable<ReadTensorboardBlobDataRequest, ReadTensorboardBlobDataResponse> callable = aiplatformClient.readTensorboardBlobDataCallable();

// Bidirectional Streaming
BidiStreamingCallable<StreamingRecognizeRequest, StreamingRecognizeResponse> callable = speechClient.streamingRecognizeCallable();

스트리밍 호출하기

스트리밍 호출은 서버 스트리밍을 사용하는지 양방향 스트리밍을 사용하는지에 따라 다릅니다.

서버 스트리밍

서버 스트리밍에는 추가 구현이 필요하지 않습니다. ServerStream 클래스를 사용하면 응답 스트림을 반복할 수 있습니다. Java-Maps-Routing을 예로 들어 서버 스트리밍 API를 호출하는 방법을 보여줍니다.

try (RoutesClient routesClient = RoutesClient.create()) {
  ServerStreamingCallable<ComputeRouteMatrixRequest, RouteMatrixElement> computeRouteMatrix =
    routesClient.computeRouteMatrixCallable();  
  ServerStream<RouteMatrixElement> stream = computeRouteMatrix.call(
    ComputeRouteMatrixRequest.newBuilder().build());
  for (RouteMatrixElement element : stream) {
    // Do something with response
  }
}

이 예시에서 클라이언트는 단일 ComputeRouteMatrixRequest를 전송하고 응답 스트림을 수신합니다.

양방향 스트리밍

양방향 스트리밍에는 호출을 위한 추가 구현이 필요합니다. Java-Speech를 예로 사용하여 다음 단계에서는 양방향 스트리밍 호출을 실행하는 예시 구현을 제공합니다.

먼저 다음 코드를 가이드라인으로 사용하여 ResponseObserver 인터페이스를 구현합니다.

class BidiResponseObserver<T> implements ResponseObserver<T> {
  private final List<T> responses = new ArrayList<>();
  private final SettableApiFuture<List<T>> future = SettableApiFuture.create();

  @Override
  public void onStart(StreamController controller) {
    // no-op
  }

  @Override
  public void onResponse(T response) {
    responses.add(response);
  }

  @Override
  public void onError(Throwable t) {
    future.setException(t);
  }

  @Override
  public void onComplete() {
    future.set(responses);
  }

  public SettableApiFuture<List<T>> getFuture() {
    return future;
  }
}

이어서 다음 단계를 수행합니다.

  1. 관찰자 인스턴스를 만듭니다. java BidiResponseObserver<StreamingRecognizeResponse> responseObserver = new BidiResponseObserver<>();
  2. 관찰자를 호출 가능 항목에 전달합니다. java ClientStream<EchoRequest> clientStream = speechClient.streamingRecognizeCallable().splitCall(responseObserver);
  3. 서버에 요청을 보내고 완료되면 스트림을 닫습니다. java clientStream.send(StreamingRecognizeRequest.newBuilder().build()); clientStream.send(StreamingRecognizeRequest.newBuilder().build()); // ... other requests ... clientStream.send(StreamingRecognizeRequest.newBuilder().build()); clientStream.closeSend();
  4. 응답을 반복합니다. ```java List responses = responseObserver.getFuture().get();

    for (StreamingRecognizeResponse response : responses) { // Do something with response } ```

지원되지 않는 스트리밍 오류

gRPC와 HTTP/JSON 전송을 모두 지원하는 클라이언트 라이브러리의 경우 지원되지 않는 스트리밍 호출을 호출하도록 클라이언트 라이브러리를 실수로 구성할 수 있습니다. 예를 들어 다음 구성은 양방향 스트리밍 호출을 실행하는 Java-Speech의 HttpJson 클라이언트를 보여줍니다.

// SpeechClient is configured to use HttpJson
try (SpeechClient speechClient = SpeechClient.create(SpeechSettings.newHttpJsonBuilder().build())) {
  // Bidi Callable is not supported in HttpJson
  BidiStreamingCallable<StreamingRecognizeRequest, StreamingRecognizeResponse> callable = speechClient.streamingRecognizeCallable();
  ...
}

이렇게 하면 컴파일 오류가 발생하지는 않지만 런타임 오류로 표시됩니다.

Not implemented: streamingRecognizeCallable(). REST transport is not implemented for this method yet.
Important: The client library MUST be configured with gRPC to use client or bidirectional streaming.