Rxjs: Объединение потоков наблюдаемых данных(служба grpc) в массив через X секунд с помощью сканирования или карты слияния или любого rxjs

#angular #typescript #rxjs #rxjs-observables #rxjs-pipeable-operators

Вопрос:

Я получаю поток данных в качестве ответа от службы(grpc), например, 5 потоков в течение 2 секунд. Каждый поток — это наблюдаемый объект, на который я подписываюсь. Логика обработки, как только я получаю каждый поток, становится больше, и в каждом потоке есть какой-то тяжелый объект json, который содержит байтовую строку в кодировке base64, включает преобразование json в сложные типы и более условную логику. Таким образом, к тому времени, когда логика обработки для одного потока завершена, я не мог проверить другие потоки, так как они обрабатываются очень быстро (близко, как параллельные) и пропускаются. Поэтому мне нужно объединить все эти потоки Y(например, 5) после прослушивания в течение X секунд в наблюдаемый массив. Ниже приведен фрагмент кода.

MyTestService.ts:

 @Injectable()  export class MyTestService {   client: GrpcMyTestServiceClient;  public readonly coreData$: Observablelt;TestReply.AsObjectgt;;   constructor(private readonly http: HttpClient) {  this.client = new GrpcMyTestServiceClient(environment.apiProxyUri);  this.coreData$ = this.listCoreStream();  }  listCoreStream(): Observablelt;TestReply.AsObjectgt; {   return new Observable(obs =gt; {   const req = new SomeRequest();   //Get stream data from service(grpc)  const stream = this.client.getCoreUpdates(req);    stream.on('data', (message: any) =gt; {   obs.next(message.toObject() as TestReply.AsObject);  });  }); }  

МиКомпонент.ts

 public testReply?: TestReply.AsObject;  private _subscription: Subscription;  constructor(private readonly _MyTestService: MyTestService) {   this._subscription = new Subscription(); }   ngOnInit(): void {   this._subscription = this._MyTestService.coreData$.subscribe((data) =gt; {  if (data) {  let obj = JSON.parse(data);  //processing logic: condition checks, filtering based on child types,dynamic binding of styles, etc..  }  });  }  

Поскольку данные слишком быстры за короткий промежуток времени, не все записи или обработаны. Это похоже на проблему синхронизации, которая невозможна в веб-мире, где последний поток, удовлетворяющий некоторому условию, перезаписывает предыдущий поток. Чтобы избежать этого и обработать весь поток по одному за раз, мне нужно объединить/объединить все потоки в массив, чтобы я мог повторить каждый из них внутри компонента, подписывающегося на наблюдаемое. Порядок потоковых данных не имеет значения.

Я пытался использовать операторы rxjs, такие как timer , mergemap , concatmap , scan , merge . Но все еще новичок в этом, не мог понять, как это правильно сделать. Ниже приведена одна из таких попыток scan , но она не смогла получить желаемых результатов, а массив имеет пустые значения и не знает, как использовать этот массив console.log . Любые указатели были бы очень полезны, пожалуйста, предложите.

Попытка решения:

 let temp: TestReply.AsObject[] = []; let test = this._MyTestService.coreData$  .pipe(  mergeMap(_ =gt; timer(5000)),  scanlt;anygt;((allResponses, currentResponse) =gt;  [...allResponses, currentResponse], this.temp),  ).subscribe(console.log);  

Ответ №1:

Это мое решение, соедините все массивы вместе, используя следующий блок в подписке, а затем, наконец, выполните действия в полном блоке.

MyTestService.ts:

 @Injectable()  export class MyTestService {   client: GrpcMyTestServiceClient;  public readonly coreData$: Observablelt;TestReply.AsObjectgt;;   constructor(private readonly http: HttpClient) {  this.client = new GrpcMyTestServiceClient(environment.apiProxyUri);  this.coreData$ = this.listCoreStream();  }  listCoreStream(): Observablelt;TestReply.AsObjectgt; {   return new Observable(obs =gt; {   const req = new SomeRequest();   //Get stream data from service(grpc)  const stream = this.client.getCoreUpdates(req);    stream.on('data', (message: any) =gt; {   obs.next(message.toObject() as TestReply.AsObject);  });  });   stream.on('end', (message: any) =gt; {   obs.complete();  });  }); }  

МиКомпонент.ts

 public testReply?: TestReply.AsObject;  public dataArray = []; private _subscription: Subscription;  constructor(private readonly _MyTestService: MyTestService) {   this._subscription = new Subscription(); }   ngOnInit(): void {   this._subscription = this._MyTestService.coreData$.subscribe({  next: (data) =gt; {  if (data) {  this.dataArray.push(JSON.parse(data));  }   },  complete: () =gt; {  // this.dataArray  // above will be your final data  // processing logic: condition checks, filtering based on child types,dynamic binding of styles, etc..  })  }  

Комментарии:

1. Спасибо за вашу помощь, но так как мне нужно продолжать прослушивать поток службы из службы grpc, я не могу выполнить obs.complete() в конце потока (в приведенном ниже коде), который вы добавили..Я, делаю obs.error (), если что-то отключено от сети или необработанные проблемы, которые приводят к завершению потока. Таким образом, с этим я не попаду в полный блок внутри моей подписки на компонент, если я прав. stream.on(‘конец’, (сообщение: любое) =gt; { obs.ошибка(); // У меня здесь нет obs.завершения ()}); });

2. @mv_05 Никаких проблем, удачи!