#angular #websocket #rxjs
#angular #websocket #rxjs
Вопрос:
Совместное использование соединения websocket между несколькими компонентами дублирует соединение.
export class NotificationService {
notification;
destroy$ = new Subject();
constructor(private appService: AppService) {}
createWs() {
this.notification = this.appService
.getTicket()
.pipe(
mergeMap(data =>
webSocket(`wss://echo.websocket.org/`).pipe(
takeUntil(this.destroy$)
)
)
)
.pipe(
catchError(e => {
return throwError(e);
}),
retryWhen(errors =>
errors.pipe(
concatMap((_, iteration) => timer(Math.pow(2, iteration) * 1000)),
take(10)
)
)
);
}
filterNotification() {
if (!this.notification) {
this.createWs();
}
return this.notification;
}
}
Вызов this.notificationService.filterNotification()
различных компонентов создает множественные подключения к websocket. Согласно документации здесь,
Когда WebSocketSubject подписан, он пытается установить соединение с сокетом, если оно уже не установлено. Это означает, что многие подписчики всегда будут прослушивать один и тот же сокет, тем самым экономя ресурсы. Однако, если созданы два экземпляра WebSocketSubject, даже если этим двум был предоставлен один и тот же URL, они попытаются установить отдельные подключения
Новый экземпляр не создается, но подписка дублирует соединение.
Комментарии:
1. попробуйте использовать
switchMap
вместоmergeMap
, честно говоря, я не понимаю необходимости подписываться наwebSocket
многократно каждый раз, когда наблюдаемый, возвращаемыйgetTicket()
emits2. На самом деле мне нужен getTicket — http-запрос — для добавления токена в URL-адрес websocket
3. При каждом
createWs()
вызовеthis.notification
присваивается новое наблюдаемое, но предыдущее значение сохраняется подписанным4.
createWs
вызывается только один раз
Ответ №1:
Запускал mergeMap
новый экземпляр websocket для каждой подписки, что приводило к дублированию. Сохраняя ссылку на внутреннюю наблюдаемую, вместо открытия нового websocket возвращается более поздний
this.notification = this.appService
.getTicket()
.pipe(
mergeMap(data =>
this.ws ? this.ws : this.ws = webSocket(`wss://echo.websocket.org/`).pipe(
takeUntil(this.destroy$)
)
)
)