Как скрыть div, если поток gRPC(наблюдаемые) данные не получены через X секунд?

#angular #rxjs #grpc #rxjs-observables

Вопрос:

Я работаю над привязкой потоковых данных, которые потребляются из вызова потока службы gRPC. Потоковые данные отправляются каждые X миллисекунд из службы gRPC, и я привязываю их как наблюдаемые данные к div в html, например, к простому ngIf*.

Мне нужно обработать случай, когда служба не отправляет никаких данных через X секунд, я не должен показывать div. Поэтому планирую явно установить значение null в переменной привязки только в том случае, если поток не будет получен через X секунд.

Поэтому я пытаюсь найти лучший способ сделать это с помощью операторов RxJS. Я смотрел на интервал, таймер, карту переключения, но не знал, как ее правильно реализовать.

Сервис.ts

   @Injectable()
    export class ApiMyTestService {    
        client: GrpcMyTestServiceClient;
        public readonly testData$: Observable<TestReply.AsObject>;
  
    constructor(private readonly http: HttpClient) {
        this.client = new GrpcMyTestServiceClient(environment.apiProxyUri);
        this.testData$ = this.listCoreStream();
    }
  
   listCoreStream(): Observable <TestReply.AsObject> {
    return new Observable(obs => {      
      const req = new SomeRequest(); 
      req.setClientName("GrpcWebTestClient");
      const stream = this.client.getCoreUpdates(req); // Main gRPC Server stream Api call 
      
      stream.on('status', (status: Status) => {
        console.log('ApiService.getStream.status', status);
      });
      
      stream.on('data', (message: any) => {
        // gRPC Api call returns stream data here. Every X millisec service sends stream data.
        console.log('ApiService.getStream.data', message.toObject());
        obs.next(message.toObject() as TestReply.AsObject);
      });
      
      stream.on('end', () => {
        console.log('ApiService.getStream.end');
        obs.complete();        
      });     
    });
  }
 

Компонент.ts

 export class AppComponent implements AfterViewInit, OnDestroy {   
   public testReply?: TestReply.AsObject;   
   private _subscription: Subscription;

    constructor(private readonly _MyTestService: ApiMyTestService) {     
      this._subscription = new Subscription();
    }

    public ngAfterViewInit(): void {    
        this._subscription = this._MyTestService.testData$.subscribe((data) => {           
           //testReply will be binded to a div, so hide div if null data
           // how to set to null when the service didn't stream any data after X seconds
           this.testReply = data;
            
        });
}
 

HTML:

 //show this div only when we have stream data. Hide it when no data is binded for X secs!
<div *ngIf=(testReply)>
    html elements that uses testReply
</div>
 

Ответ №1:

Вы можете создать наблюдаемое, которое будет выдавать значения из вашего сервиса, но также будет выдавать undefined через x секунд:

 public testReply$ = this._MyTestService.testData$.pipe(
    switchMap(data => merge(
        of(data), 
        timer(5000).pipe(mapTo(undefined))
    ))
);
 

Вот как это работает:

  • merge создает опаление, наблюдаемое из нескольких источников, которое будет излучаться при излучении любого источника. В этом случае источниками являются:
    • Значение, полученное от службы (). data of используется для создания наблюдаемого из нашего простого значения
    • Наблюдаемое, которое будет излучать undefined через 5000 мс. timer() будет выдавать номер 0 по истечении указанного времени, поэтому вместо этого мы используем mapTo для выдачи undefined .
  • switchMap подписывается на это «внутреннее наблюдаемое» для нас и испускает его выбросы. Он также будет заменять внутреннюю наблюдаемую на новую каждый раз, когда он получает излучение. Таким образом, timer() он будет излучать только в том случае, если switchMap не получит следующий выброс в течение 5000 мс.

Теперь вы просто используете testReply$ :

 public ngAfterViewInit(): void {    
    this._subscription = this.testReply$.subscribe(
        data => this.testReply = data
    );
}
 

Чтобы немного упростить процесс, вы можете использовать AsyncPipe его в своем шаблоне, и вам не нужно беспокоиться о подписках:

 <div *ngIf="testReply$ | async as testReply">
    <p> {{ testReply }} </p>
</div>
 
 export class AppComponent {

  public testReply$: Observable<TestReply.AsObject> = this._MyTestService.testData$.pipe(
    switchMap(data => merge(
      of(data), 
      timer(5000).pipe(mapTo(undefined))
    ))
  );

  constructor(private readonly _MyTestService: ApiMyTestService) { }

}
 

Комментарии:

1. Да, это сработало. Спасибо @BizzyBob за объяснение и уточнение с дополнительными деталями.