#angular #typescript #rxjs #rxjs-observables #rxjs-pipeable-operators
Вопрос:
Я получаю поток данных в качестве ответа от службы(grpc), например, 5 потоков в течение 2 секунд. Каждый поток — это наблюдаемый объект, на который я подписываюсь. Логика обработки, как только я получаю каждый поток, становится больше, и в каждом потоке есть какой-то тяжелый объект json, который содержит байтовую строку в кодировке base64, включает преобразование json в сложные типы и более условную логику. Таким образом, к тому времени, когда логика обработки для одного потока завершена, я не мог проверить другие потоки, так как они обрабатываются очень быстро (близко, как параллельные) и пропускаются. Поэтому мне нужно объединить все эти потоки Y(например, 5) после прослушивания в течение X секунд в наблюдаемый массив. Ниже приведен фрагмент кода.
MyTestService.ts:
@Injectable() export class MyTestService { client: GrpcMyTestServiceClient; public readonly coreData$: Observablelt;TestReply.AsObjectgt;; constructor(private readonly http: HttpClient) { this.client = new GrpcMyTestServiceClient(environment.apiProxyUri); this.coreData$ = this.listCoreStream(); } listCoreStream(): Observablelt;TestReply.AsObjectgt; { return new Observable(obs =gt; { const req = new SomeRequest(); //Get stream data from service(grpc) const stream = this.client.getCoreUpdates(req); stream.on('data', (message: any) =gt; { obs.next(message.toObject() as TestReply.AsObject); }); }); }
МиКомпонент.ts
public testReply?: TestReply.AsObject; private _subscription: Subscription; constructor(private readonly _MyTestService: MyTestService) { this._subscription = new Subscription(); } ngOnInit(): void { this._subscription = this._MyTestService.coreData$.subscribe((data) =gt; { if (data) { let obj = JSON.parse(data); //processing logic: condition checks, filtering based on child types,dynamic binding of styles, etc.. } }); }
Поскольку данные слишком быстры за короткий промежуток времени, не все записи или обработаны. Это похоже на проблему синхронизации, которая невозможна в веб-мире, где последний поток, удовлетворяющий некоторому условию, перезаписывает предыдущий поток. Чтобы избежать этого и обработать весь поток по одному за раз, мне нужно объединить/объединить все потоки в массив, чтобы я мог повторить каждый из них внутри компонента, подписывающегося на наблюдаемое. Порядок потоковых данных не имеет значения.
Я пытался использовать операторы rxjs, такие как timer
, mergemap
, concatmap
, scan
, merge
. Но все еще новичок в этом, не мог понять, как это правильно сделать. Ниже приведена одна из таких попыток scan
, но она не смогла получить желаемых результатов, а массив имеет пустые значения и не знает, как использовать этот массив console.log
. Любые указатели были бы очень полезны, пожалуйста, предложите.
Попытка решения:
let temp: TestReply.AsObject[] = []; let test = this._MyTestService.coreData$ .pipe( mergeMap(_ =gt; timer(5000)), scanlt;anygt;((allResponses, currentResponse) =gt; [...allResponses, currentResponse], this.temp), ).subscribe(console.log);
Ответ №1:
Это мое решение, соедините все массивы вместе, используя следующий блок в подписке, а затем, наконец, выполните действия в полном блоке.
MyTestService.ts:
@Injectable() export class MyTestService { client: GrpcMyTestServiceClient; public readonly coreData$: Observablelt;TestReply.AsObjectgt;; constructor(private readonly http: HttpClient) { this.client = new GrpcMyTestServiceClient(environment.apiProxyUri); this.coreData$ = this.listCoreStream(); } listCoreStream(): Observablelt;TestReply.AsObjectgt; { return new Observable(obs =gt; { const req = new SomeRequest(); //Get stream data from service(grpc) const stream = this.client.getCoreUpdates(req); stream.on('data', (message: any) =gt; { obs.next(message.toObject() as TestReply.AsObject); }); }); stream.on('end', (message: any) =gt; { obs.complete(); }); }); }
МиКомпонент.ts
public testReply?: TestReply.AsObject; public dataArray = []; private _subscription: Subscription; constructor(private readonly _MyTestService: MyTestService) { this._subscription = new Subscription(); } ngOnInit(): void { this._subscription = this._MyTestService.coreData$.subscribe({ next: (data) =gt; { if (data) { this.dataArray.push(JSON.parse(data)); } }, complete: () =gt; { // this.dataArray // above will be your final data // processing logic: condition checks, filtering based on child types,dynamic binding of styles, etc.. }) }
Комментарии:
1. Спасибо за вашу помощь, но так как мне нужно продолжать прослушивать поток службы из службы grpc, я не могу выполнить obs.complete() в конце потока (в приведенном ниже коде), который вы добавили..Я, делаю obs.error (), если что-то отключено от сети или необработанные проблемы, которые приводят к завершению потока. Таким образом, с этим я не попаду в полный блок внутри моей подписки на компонент, если я прав. stream.on(‘конец’, (сообщение: любое) =gt; { obs.ошибка(); // У меня здесь нет obs.завершения ()}); });
2. @mv_05 Никаких проблем, удачи!