Android: сбор потока Kotlin внутри другого, не излучающего

#android #kotlin #kotlin-flow

#Android #котлин #котлин-поток

Вопрос:

У меня есть следующий метод:

     operator fun invoke(query: String): Flow<MutableList<JobDomainModel>> = flow {
        val jobDomainModelList = mutableListOf<JobDomainModel>()
        jobListingRepository.searchJobs(sanitizeSearchQuery(query))
            .collect { jobEntityList: List<JobEntity> ->
                for (jobEntity in jobEntityList) {
                    categoriesRepository.getCategoryById(jobEntity.categoryId)
                        .collect { categoryEntity ->
                            if (categoryEntity.categoryId == jobEntity.categoryId) {
                                jobDomainModelList.add(jobEntity.toDomainModel(categoryEntity))
                            }
                        }
                }
                emit(jobDomainModelList)
            }
    }
 

Он выполняет поиск в репозитории , вызывая search метод , который возвращает a Flow<List<JobEntity>> . Затем для каждого JobEntity в потоке мне нужно извлечь из базы данных категорию, к которой относится это задание. Как только у меня будет эта категория и задание, я могу преобразовать задание в объект модели предметной области ( JobDomainModel ) и добавить его в список, который будет возвращен в потоке как возвращаемый объект метода.

Проблема, с которой я сталкиваюсь, заключается в том, что ничего никогда не излучается. Я не уверен, что я что-то упускаю из виду при работе с потоками в Kotlin, но я не извлекаю категорию по ID ( categoriesRepository.getCategoryById(jobEntity.categoryId) ), тогда она работает нормально, и список генерируется.

Заранее большое спасибо!

Комментарии:

1. Просто чтобы убедиться: searchJobs() на самом деле это поток значений (например, WebSocket) или однократный запрос?

2. Это тоже поток, поскольку он выполняет поиск в БД с использованием Room через DAO

3. Я также пробовал map вместо collect (при выборе категории для задания), но он по-прежнему не работает

Ответ №1:

Я думаю, проблема в том, что вы собираете потоки бесконечной длины, поэтому collect никогда не возвращаетесь. Вы должны использовать .take(1) , чтобы получить конечный поток, прежде чем собирать его, или использовать first() .

Потоки, возвращаемые вашим DAO, имеют бесконечную длину. Первое значение — это первый выполненный запрос, но поток будет продолжаться вечно, пока не будет отменен. Каждый элемент в потоке представляет собой новый запрос, выполняемый при изменении содержимого базы данных.

Что-то вроде этого:

 operator fun invoke(query: String): Flow<MutableList<JobDomainModel>> =
    jobListingRepository.searchJobs(sanitizeSearchQuery(query))
        .map { jobEntityList: List<JobEntity> ->
            jobEntityList.mapNotNull { jobEntity ->
                categoriesRepository.getCategoryById(jobEntity.categoryId)
                    .first()
                    .takeIf { it.categoryId == jobEntity.categoryId }
            }
        }
 

В качестве альтернативы, в вашем DAO вы могли бы создать suspend версию функции getCategoryById() , которая просто возвращает список.