Очистка Redisson pub / дополнительных слушателей, когда связанный объект «устаревший»

#java #websocket #redis #redisson

#java #websocket #redis #redisson

Вопрос:

Я пытаюсь реализовать простое приложение Websocket на Java, которое может масштабироваться по горизонтали, используя Redis и библиотеку Redisson.

Сервер Websocket в основном отслеживает подключенных клиентов и отправляет сообщения, которые принимаются на Rtopic — это отлично работает.

Для использования у меня есть код, который добавляет прослушивателя при регистрации клиента: он связал объект клиента со слушателем с помощью:

 private static RedissonClient redisson = RedissonRedisServer.createRedisConnectionWithConfig();
public static final RTopic subcriberTopic = redisson.getTopic("clientsMapTopic");

public static boolean sendToPubSub(ConnectedClient q, String message) {
        boolean[] success = {true};
        MessageListener<Message> listener = new MessageListener<Message>() {
            @Override
            public void onMessage(CharSequence channel, Message message) {
                logger.debug("The message is : "   message.getMediaId());

                try {
                    logger.debug("ConnectedClient mediaid: "   q.getMediaid()   ",Message mediaid "   message.getMediaId());
                    if (q.getMediaid().equals(message.getMediaId())) {
                        // we need to verify if the message goes to the right receiver
                        logger.debug("MESSAGE from PUBSUB to ("   q.getId()   ") @ "   q.getSession().getId()   " "   message);
                        // this is the actual message to the websocket client
                        // this executes on the wrong connected client when the connection is closed and reopened
                        q.getSession().getBasicRemote().sendText(message.getMessage());
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    success[0] = false;
                }
            }
        };
        int listenerId = subcriberTopic.addListener(Message.class, listener);
}
 

Проблема, которую я наблюдаю, заключается в следующем:

  • первоначальное соединение с клиентом регистрирует прослушиватель, связанный с этим объектом
  • отправленное сообщение на сервер ws принимается слушателем и отправляется правильно
  • отключите websocket — создайте новое соединение — будет создан новый прослушиватель
  • отправленное сообщение на сервер ws получает тот же исходный прослушиватель и использует этот подключенный клиент вместо вновь зарегистрированного
  • сбой отправки (поскольку клиентское и ws-соединение не существует) и не обрабатывается дальше

Кажется, мне просто нужно удалить прослушиватель для клиента, если клиент удаляется, но я не нашел хорошего способа сделать это, потому что, хотя я вижу в отладчике, что у прослушивателя есть связанный подключенный клиентский объект, я не могу получить их без добавления кода для этого.

Правильно ли я это наблюдаю и каков хороший способ заставить это работать должным образом?

Ответ №1:

Когда я писал вопрос, я как бы склонялся к ответу, который я имел в виду, и попробовал, который сработал. Я добавил ConcurrentHashMap для отслеживания связи между подключенным клиентом и слушателем. В логике, в которой я обрабатывал ошибку websocket, указывающую на удаление клиента, я затем удалил связанного прослушивателя (и запись с карты). Теперь все работает так, как ожидалось.

небольшой фрагмент:

 int listenerId = subcriberTopic.addListener(Message.class, listener);
clientListeners.put(q,(Integer)listenerId);
 

А затем в обработчике onError websocket, который запускает очистку:

 // remove the associated listener
int listenerIdForClient = MessageContainer.clientListeners.get(cP);
MessageContainer.subcriberTopic.removeListener((Integer) listenerIdForClient);
// remove entry from map
MessageContainer.clientListeners.remove(cP);
 

Теперь слушатель очищается должным образом, и в следующий раз создается новый слушатель и обрабатывает сообщения.