Напишите модульный тест для параллельного потока в реакторе проекта

#java #unit-testing #reactive-programming #project-reactor #reactive

#Ява #модульное тестирование #реактивное программирование #проект-реактор #реактивный

Вопрос:

У меня есть реактивная цепь, как показано ниже:

 public Fluxlt;TestObjectgt; sendData(){   return Flux.fromIterable(o.getC())  .parallel(threadCap)  .runOn(Schedulers.parallel())  .filter(c -gt; filterC())  .doOnNext(c -gt; log.debug("C Name :{} on thread {}", c.getName(), Thread.currentThread().getName()))  .map(c -gt; c.getId().trim())  .flatMap(cId -gt; Flux.fromIterable(getL(l, cId))  .parallel(threadCap)  .runOn(Schedulers.parallel())  .flatMap(l -gt; Flux.fromIterable(getM(cId))  .parallel(threadCap)  .runOn(Schedulers.parallel())  .flatMap(m -gt; {  String q = "MessageFormat.format("{0}[{1}]", l.getId(), cId)";  Listlt;Datagt; data = o.getData(q);  return Flux.fromIterable(data)  .parallel(threadCap)  .runOn(Schedulers.parallel())  .flatMap(d -gt;  {  Fluxlt;TestObjectgt; testObject = Flux.just(writeData(d, q));  return testObject;  });  }))  )  .doOnError(throwable -gt; {  log.error("Error while reading data : {} ", throwable.getMessage());  return;  })  .sequential(); }  

Я пытаюсь написать модульные тесты для вышеуказанного метода. Это то, что у меня есть сейчас:

 Fluxlt;TestObjectgt; actual = sendData();  StepVerifier.create(actual)  .expectSubscription()  .expectNext(expectedTestObject)  .verifyComplete();  

Я хочу проверить, что возвращается ожидаемый объект TestObject, но на самом деле он просто завершает выполнение, и тест завершается неудачно, а сообщение actual является полным. Я считаю, что это связано с тем, что я добавил параллелизм, и он не ждет завершения выполнения. У кого-нибудь есть какие-либо указания о том, как я могу выполнить тест, чтобы дождаться завершения выполнения, чтобы проверить возвращенное значение объекта?