Функция никогда не достигалась в сопоставлении источников Akka Streams

#scala #akka-stream

#scala #akka-stream

Вопрос:

Учитывая следующий код:

 class MigrationHandler @Inject()(database: Database, doUpdate: UpdateHandler)
                            (implicit @PropagateContext executor: ExecutionContext, actorSystem: ActorSystem)
extends Handler with Logging {
implicit val materializer: ActorMaterializer = ActorMaterializer()
val buttonTypeId = "someId1"
val promotionButtonTypeId = "someId2"
val typeId3 = "someId3"
val typeId4 = "someId4"
val typeIds = Seq[String](buttonTypeId, promotionButtonTypeId, typeId3, typeId4)

def apply(requestHeaders: Headers): Seq[Future[Done]] = {
    for {
        typeId <- typeIds
        result = database.allByType(typeId)
            .map(contents => contents.map(content => migrate(content, typeId))
                .map(migratedContent => doUpdate(ContentId(migratedContent.id),
                    NewContent(migratedContent.raw, isDefaultContent = false), requestHeaders))
                .runForeach(_ => ())).flatten
    } yield result
}

def migrate(content: SomeContent, typeId: String): SomeContent = {
    logger.info(s"$content with type $typeId")
    content
}}
  

Future[Source[SomeContent, _]] возвращается database.allByType(typeId)

В модульном тестировании, где я издеваюсь database.allByType(typeId) над return Source.single(SomeContent()) , я увидел, что не смог получить доступ к migrate функции. Есть идеи, в чем здесь может быть проблема?

Комментарии:

1. Вы уверены database.allByType(typeId) , что возвращает ожидаемое значение? Можете ли вы сохранить ее val , а затем зарегистрировать значение или проверить его в debug? Кроме того, похоже, что вы возвращаетесь Seq[Future[Unit]] из apply , в то время Done как ожидается?

2. @amorfis дааа, это была моя глупая ошибка. В тестовом примере я забыл получить значение futureValue из того, что было возвращено классом…