#angular #rxjs #grpc #rxjs-observables
Вопрос:
Я работаю над привязкой потоковых данных, которые потребляются из вызова потока службы gRPC. Потоковые данные отправляются каждые X миллисекунд из службы gRPC, и я привязываю их как наблюдаемые данные к div в html, например, к простому ngIf*.
Мне нужно обработать случай, когда служба не отправляет никаких данных через X секунд, я не должен показывать div. Поэтому планирую явно установить значение null в переменной привязки только в том случае, если поток не будет получен через X секунд.
Поэтому я пытаюсь найти лучший способ сделать это с помощью операторов RxJS. Я смотрел на интервал, таймер, карту переключения, но не знал, как ее правильно реализовать.
Сервис.ts
@Injectable()
export class ApiMyTestService {
client: GrpcMyTestServiceClient;
public readonly testData$: Observable<TestReply.AsObject>;
constructor(private readonly http: HttpClient) {
this.client = new GrpcMyTestServiceClient(environment.apiProxyUri);
this.testData$ = this.listCoreStream();
}
listCoreStream(): Observable <TestReply.AsObject> {
return new Observable(obs => {
const req = new SomeRequest();
req.setClientName("GrpcWebTestClient");
const stream = this.client.getCoreUpdates(req); // Main gRPC Server stream Api call
stream.on('status', (status: Status) => {
console.log('ApiService.getStream.status', status);
});
stream.on('data', (message: any) => {
// gRPC Api call returns stream data here. Every X millisec service sends stream data.
console.log('ApiService.getStream.data', message.toObject());
obs.next(message.toObject() as TestReply.AsObject);
});
stream.on('end', () => {
console.log('ApiService.getStream.end');
obs.complete();
});
});
}
Компонент.ts
export class AppComponent implements AfterViewInit, OnDestroy {
public testReply?: TestReply.AsObject;
private _subscription: Subscription;
constructor(private readonly _MyTestService: ApiMyTestService) {
this._subscription = new Subscription();
}
public ngAfterViewInit(): void {
this._subscription = this._MyTestService.testData$.subscribe((data) => {
//testReply will be binded to a div, so hide div if null data
// how to set to null when the service didn't stream any data after X seconds
this.testReply = data;
});
}
HTML:
//show this div only when we have stream data. Hide it when no data is binded for X secs!
<div *ngIf=(testReply)>
html elements that uses testReply
</div>
Ответ №1:
Вы можете создать наблюдаемое, которое будет выдавать значения из вашего сервиса, но также будет выдавать undefined
через x секунд:
public testReply$ = this._MyTestService.testData$.pipe(
switchMap(data => merge(
of(data),
timer(5000).pipe(mapTo(undefined))
))
);
Вот как это работает:
merge
создает опаление, наблюдаемое из нескольких источников, которое будет излучаться при излучении любого источника. В этом случае источниками являются:switchMap
подписывается на это «внутреннее наблюдаемое» для нас и испускает его выбросы. Он также будет заменять внутреннюю наблюдаемую на новую каждый раз, когда он получает излучение. Таким образом,timer()
он будет излучать только в том случае, еслиswitchMap
не получит следующий выброс в течение 5000 мс.
Теперь вы просто используете testReply$
:
public ngAfterViewInit(): void {
this._subscription = this.testReply$.subscribe(
data => this.testReply = data
);
}
Чтобы немного упростить процесс, вы можете использовать AsyncPipe
его в своем шаблоне, и вам не нужно беспокоиться о подписках:
<div *ngIf="testReply$ | async as testReply">
<p> {{ testReply }} </p>
</div>
export class AppComponent {
public testReply$: Observable<TestReply.AsObject> = this._MyTestService.testData$.pipe(
switchMap(data => merge(
of(data),
timer(5000).pipe(mapTo(undefined))
))
);
constructor(private readonly _MyTestService: ApiMyTestService) { }
}
Комментарии:
1. Да, это сработало. Спасибо @BizzyBob за объяснение и уточнение с дополнительными деталями.