Модель потоковой передачи сервера Java gRPC

#android #grpc #mobile-application #grpc-java

#Android #grpc #мобильное приложение #grpc-java

Вопрос:

Я ищу пример с прослушивателем на стороне клиента для потоковой передачи на стороне сервера, следуя примерам здесь — https://grpc.io/docs/languages/java/basics /

Я настроил текущие конфигурации на клиенте и сервере в соответствии с документом — https://github.com/grpc/grpc/blob/master/doc/keepalive.md

Все примеры показывают, что при отправке запроса на сервер сервер будет отвечать как асинхронная или унарная модель, или клиент может общаться с новым запросом. Каждый раз, когда запрос должен быть отправлен на сервер для ответа. И соединение остается в живых гораздо дольше; что хорошо.

Вопрос: Возможно ли, чтобы клиент отправлял один запрос и бесконечно получал ответы от сервера без использования модели BIDI (двунаправленной)? в этом сценарии было бы несколько основано на событиях, что всякий раз, когда на сервере имеются данные, они будут продолжать отправку, не требуя дальнейшего запроса. Опять же, это сообщение является одноадресным, а не широковещательным; это означает, что каждое соединение (пользователь) уникально для ответа на основе пользовательских данных.

Не уверен, но может быть один из способов — сервер должен сохранять объект запроса активным где-то в кэше и продолжать отвечать? Должен ли клиент использовать цикл опроса для бесконечного прослушивания ответов?

С нетерпением ждем примеров, если таковые имеются, которые могут решить эту проблему.

Ответ №1:

Один запрос клиента и множество ответов сервера — это «потоковая передача с сервера». Вы бы использовали stream ключевое слово только для ответа.

Вы можете использовать блокирующий или асинхронный API для RPC с потоковой передачей на сервер. В примере используется API блокировки, поскольку операции блокировки в целом гораздо более понятны. Но можно было бы использовать и асинхронную заглушку.

Для потоковой передачи с сервера асинхронный клиентский API позволяет передавать запрос и наблюдатель ответа для запуска вызова. После этого он будет выглядеть так же, как в примере с bidi, за исключением того, что клиент больше не будет отправлять сообщения. Сервер может вызывать StreamObserver всякий раз, когда он хочет отправить сообщение, а StreamObserver клиента будет вызываться при поступлении нового сообщения.

Чтобы увидеть пример, вы можете заменить блокирующую заглушку listFeatures() на асинхронную заглушку:

 final CountDownLatch finishLatch = new CountDownLatch(1);
asyncStub.listFeatures(request, new StreamObserver<Feature>() {
  int i;

  @Override public void onNext(Feature feature) {
    info("Result #"   i   ": {0}", feature);
    if (testHelper != null) {
      testHelper.onMessage(feature);
    }
    i  ;
  }

  @Override public void onCompleted() {
    finishLatch.countDown();
  }

  @Override public void onError(Throwable t) {
    warning("RPC failed: {0}", Status.fromThrowable(t));
    if (testHelper != null) {
      testHelper.onRpcError(t);
    }
    finishLatch.countDown();
  }
});
finishLatch.await();