#android #kotlin #kotlin-flow
#Android #котлин #котлин-поток
Вопрос:
У меня есть следующий метод:
operator fun invoke(query: String): Flow<MutableList<JobDomainModel>> = flow {
val jobDomainModelList = mutableListOf<JobDomainModel>()
jobListingRepository.searchJobs(sanitizeSearchQuery(query))
.collect { jobEntityList: List<JobEntity> ->
for (jobEntity in jobEntityList) {
categoriesRepository.getCategoryById(jobEntity.categoryId)
.collect { categoryEntity ->
if (categoryEntity.categoryId == jobEntity.categoryId) {
jobDomainModelList.add(jobEntity.toDomainModel(categoryEntity))
}
}
}
emit(jobDomainModelList)
}
}
Он выполняет поиск в репозитории , вызывая search
метод , который возвращает a Flow<List<JobEntity>>
. Затем для каждого JobEntity
в потоке мне нужно извлечь из базы данных категорию, к которой относится это задание. Как только у меня будет эта категория и задание, я могу преобразовать задание в объект модели предметной области ( JobDomainModel
) и добавить его в список, который будет возвращен в потоке как возвращаемый объект метода.
Проблема, с которой я сталкиваюсь, заключается в том, что ничего никогда не излучается. Я не уверен, что я что-то упускаю из виду при работе с потоками в Kotlin, но я не извлекаю категорию по ID ( categoriesRepository.getCategoryById(jobEntity.categoryId)
), тогда она работает нормально, и список генерируется.
Заранее большое спасибо!
Комментарии:
1. Просто чтобы убедиться:
searchJobs()
на самом деле это поток значений (например, WebSocket) или однократный запрос?2. Это тоже поток, поскольку он выполняет поиск в БД с использованием Room через DAO
3. Я также пробовал map вместо collect (при выборе категории для задания), но он по-прежнему не работает
Ответ №1:
Я думаю, проблема в том, что вы собираете потоки бесконечной длины, поэтому collect
никогда не возвращаетесь. Вы должны использовать .take(1)
, чтобы получить конечный поток, прежде чем собирать его, или использовать first()
.
Потоки, возвращаемые вашим DAO, имеют бесконечную длину. Первое значение — это первый выполненный запрос, но поток будет продолжаться вечно, пока не будет отменен. Каждый элемент в потоке представляет собой новый запрос, выполняемый при изменении содержимого базы данных.
Что-то вроде этого:
operator fun invoke(query: String): Flow<MutableList<JobDomainModel>> =
jobListingRepository.searchJobs(sanitizeSearchQuery(query))
.map { jobEntityList: List<JobEntity> ->
jobEntityList.mapNotNull { jobEntity ->
categoriesRepository.getCategoryById(jobEntity.categoryId)
.first()
.takeIf { it.categoryId == jobEntity.categoryId }
}
}
В качестве альтернативы, в вашем DAO вы могли бы создать suspend
версию функции getCategoryById()
, которая просто возвращает список.