как обрабатывать случай закрытия браузера (действие / событие) в потоковом режиме RxJava2 с помощью SSE (отправленные сервером события)

#rx-java2 #publish #server-sent-events #reactive-streams

#rx-java2 #опубликовать #server-sent-events #реактивные потоки

Вопрос:

Я хочу получать уведомления, если потребитель закрывает браузер или каким-либо образом отключается от потока SSE.

в качестве примера, когда соединение начинается с curl, doOnSubscribe вызывается операция, и значение connectedDeliveryCount увеличивается, но никогда не уменьшается, хотя я специально закрыл соединение с помощью ctrl-c. ни одно из событий не вызывается как doOnComplete или doOnTerminate или doOnCancel

пример соединения с get

 faruk@virtualBox:/usr/lib/jvm$ curl -X GET http://localhost:8080/delivery-stream -v
Note: Unnecessary use of -X or --request, GET is already inferred.
*   Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /delivery-stream HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.58.0
> Accept: */*
> 
^C
faruk@virtualBox:/usr/lib/jvm$
  

блок кода :

 static AtomicInteger connectedDeliveryCount = new AtomicInteger();

@GetMapping(path = "/delivery-stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flowable<String> deliveryStream() {
        return ReactiveStreamsOrderService.deliverySubjectToFlowable()
                .doOnCancel(() -> System.out.println("{doOnCancel}: xxx"))
                .doOnComplete(() -> System.out.println("{doOnComplete}: xxx"))
                .doOnSubscribe(disposable -> System.out.println("{doOnSubscribe}: xxx"))
                .doOnTerminate(() -> System.out.println("{doOnTerminate}: xxx"))
                .onErrorReturn(throwable -> "xxx" throwable.getMessage())
                .doOnSubscribe(subscription -> connectedDeliveryCount.incrementAndGet())
                .doOnTerminate(() -> connectedDeliveryCount.decrementAndGet())
                ;
    }
  

Ответ №1:

Я хочу получать уведомления, если потребитель закрывает браузер или каким-либо образом отключается от потока SSE.

Когда вы отправляете что-либо в сокет, если клиент отключился, вы получаете сообщение об ошибке закрытия сокета. По моему опыту, вы не обнаружите этого, пока не попытаетесь использовать сокет TCP / IP.

Таким образом, решение заключается в отправке регулярных сообщений «keep-alive» по сокету, даже если у вас нет никакой информации для отправки им. Например, каждые 10 секунд отправляйте временную метку или строку состояния сервера. Когда это вызывает ошибку, вы знаете, что клиент отключился в какой-то момент за последние 10 секунд.

Я не знаю rx-java, извините, но я предполагаю, что вы получите ошибку в своем doOnTerminate обработчике.