#java #websocket #redis #redisson
#java #websocket #redis #redisson
Вопрос:
Я пытаюсь реализовать простое приложение Websocket на Java, которое может масштабироваться по горизонтали, используя Redis и библиотеку Redisson.
Сервер Websocket в основном отслеживает подключенных клиентов и отправляет сообщения, которые принимаются на Rtopic — это отлично работает.
Для использования у меня есть код, который добавляет прослушивателя при регистрации клиента: он связал объект клиента со слушателем с помощью:
private static RedissonClient redisson = RedissonRedisServer.createRedisConnectionWithConfig();
public static final RTopic subcriberTopic = redisson.getTopic("clientsMapTopic");
public static boolean sendToPubSub(ConnectedClient q, String message) {
boolean[] success = {true};
MessageListener<Message> listener = new MessageListener<Message>() {
@Override
public void onMessage(CharSequence channel, Message message) {
logger.debug("The message is : " message.getMediaId());
try {
logger.debug("ConnectedClient mediaid: " q.getMediaid() ",Message mediaid " message.getMediaId());
if (q.getMediaid().equals(message.getMediaId())) {
// we need to verify if the message goes to the right receiver
logger.debug("MESSAGE from PUBSUB to (" q.getId() ") @ " q.getSession().getId() " " message);
// this is the actual message to the websocket client
// this executes on the wrong connected client when the connection is closed and reopened
q.getSession().getBasicRemote().sendText(message.getMessage());
}
} catch (Exception e) {
e.printStackTrace();
success[0] = false;
}
}
};
int listenerId = subcriberTopic.addListener(Message.class, listener);
}
Проблема, которую я наблюдаю, заключается в следующем:
- первоначальное соединение с клиентом регистрирует прослушиватель, связанный с этим объектом
- отправленное сообщение на сервер ws принимается слушателем и отправляется правильно
- отключите websocket — создайте новое соединение — будет создан новый прослушиватель
- отправленное сообщение на сервер ws получает тот же исходный прослушиватель и использует этот подключенный клиент вместо вновь зарегистрированного
- сбой отправки (поскольку клиентское и ws-соединение не существует) и не обрабатывается дальше
Кажется, мне просто нужно удалить прослушиватель для клиента, если клиент удаляется, но я не нашел хорошего способа сделать это, потому что, хотя я вижу в отладчике, что у прослушивателя есть связанный подключенный клиентский объект, я не могу получить их без добавления кода для этого.
Правильно ли я это наблюдаю и каков хороший способ заставить это работать должным образом?
Ответ №1:
Когда я писал вопрос, я как бы склонялся к ответу, который я имел в виду, и попробовал, который сработал. Я добавил ConcurrentHashMap для отслеживания связи между подключенным клиентом и слушателем. В логике, в которой я обрабатывал ошибку websocket, указывающую на удаление клиента, я затем удалил связанного прослушивателя (и запись с карты). Теперь все работает так, как ожидалось.
небольшой фрагмент:
int listenerId = subcriberTopic.addListener(Message.class, listener);
clientListeners.put(q,(Integer)listenerId);
А затем в обработчике onError websocket, который запускает очистку:
// remove the associated listener
int listenerIdForClient = MessageContainer.clientListeners.get(cP);
MessageContainer.subcriberTopic.removeListener((Integer) listenerIdForClient);
// remove entry from map
MessageContainer.clientListeners.remove(cP);
Теперь слушатель очищается должным образом, и в следующий раз создается новый слушатель и обрабатывает сообщения.