Механизм кэширования активных запросов (сопрограммы)

#android #kotlin #caching #kotlin-coroutines

Вопрос:

Мне нужно было реализовать кэширование активных запросов, и я сделал это следующим образом:

 private val requestJobCache: MutableMap<RequestBody, Deferred<Response>> = mutableMapOf()

suspend fun fetch(body: RequestBody): Response {
    // ... some code
}

suspend fun get(body: RequestBody): Response {
    if (requestJobCache.containsKey(body)) {
        return requestJobCache[body]!!.await()
    }

    return coroutineScope {
        try {
            val request = async { fetch(body) }

            requestJobCache[body] = request
            return@coroutineScope request.await()
        }
        finally {
            val finishedRequestJob = requestJobCache.remove(body)

            // ...
            // what does finishedRequestJob?.isCompleted equal?
        }
    }
}
 

Этот код работает, но есть одна странная вещь: теоретически finishedRequestJob?.IsCompleted он всегда должен возвращаться true , потому что этот код выполняется в finally блоке после того , как асинхронная функция получила результат. Однако на практике этот метод иногда возвращается false .

Почему это может произойти, где я допустил ошибку в своих рассуждениях и как это правильно реализовать?

Комментарии:

1. Вам действительно следует синхронизировать доступ к requestJobCache . Я не уверен, что это то, что происходит здесь, но обратите внимание, что, например, между requestJobCache.containsKey(body) и requestJobCache[body]!!.await() другим потоком/сопрограммой может remove() появиться элемент, вызывающий NPE. Аналогично, несколько сопрограмм могут проверять containsKey() наличие одного и того же элемента, затем обе они запустят запрос, а затем finishedRequestJob?.isCompleted вернут один из них false .

2. @брут, хорошая мысль! Исходная реализация использует ConcurrentHashMap , но я не знаю, насколько идиоматичен этот метод для kotlin, я собираюсь найти/задать отдельный вопрос об этом.

3. Использование ConcurrentHashMap не решает эту ситуацию, потому что проблема возникает между containsKey() и requestJobCache[body] (как получение, так и настройка). Вам нужно было бы использовать мьютекс или подобную технику, но coroutineScope() это немного усложняет задачу.

4. С какими подводными coroutineScope камнями это связано ? Разве я не мог бы просто использовать withLock {} везде, где работаю с коллекцией?

5. Послушайте, мьютекс-это не волшебная вещь, которая автоматически делает код потокобезопасным. Если вы поместите containsKey() внутри withLock() и requestJobCache[body] внутри другого withLock() , то у вас все равно будет точно такая же проблема. Дело в том, что вы должны убедиться, что никто не будет касаться requestJobCache между проверкой containsKey() и извлечением/добавлением из/в него. Вам нужно поместить оба containsKey() и requestJobCache[body] в один withLock() блок. Проблема в том , что мы не можем приостановить работу внутри withLock() , потому что мы заблокировали бы всех, кто пытается использовать get() функцию.

Ответ №1:

Одной из возможных причин этого является то, что доступ к requestJobCache не синхронизирован. Например, между requestJobCache.containsKey(body) и requestJobCache[body]!!.await() другим потоком/сопрограммой может remove() быть элемент, вызывающий NullPointerException . Аналогично, несколько сопрограмм могут проверять containsKey() наличие одного и того же элемента, затем все они начнут запрос, а затем finishedRequestJob?.isCompleted для некоторых из них вернутся false .

Чтобы исправить это, нам нужно синхронизировать доступ к requestJobCache . В данном случае это немного сложно из-за приостановки и coroutineScope() , но мы должны быть в состоянии решить эти проблемы с помощью кода , подобного этому:

 private val requestJobCache: MutableMap<RequestBody, CompletableDeferred<Response>> = mutableMapOf()
private val requestJobCacheLock = Mutex()

@Suppress("DeferredResultUnused")
suspend fun get(body: RequestBody): Response {
    val (existed, deferred) = requestJobCacheLock.withLock {
        requestJobCache[body]
            ?.let { true to it }
            ?: (false to CompletableDeferred<Response>().also {
                requestJobCache[body] = it
            })
    }

    return if (existed) {
        deferred.await()
    } else {
        val result = runCatching { fetch(body) }
        deferred.completeWith(result)

        requestJobCacheLock.withLock {
            requestJobCache.remove(body)
        }

        result.getOrThrow()
    }
}
 

Обратите внимание, что у меня не было возможности протестировать этот код, поэтому он может содержать ошибки. Надеюсь, вы поняли мою идею.