#kotlin #caching #kotlin-coroutines #caffeine
#kotlin #кэширование #kotlin-сопрограммы #кофеин
Вопрос:
У меня есть серверное приложение Kotlin JVM, использующее сопрограммы, и мне нужно поместить кэш перед неблокирующим сетевым вызовом. Я полагаю, что могу использовать Caffeine AsyncLoadingCache
, чтобы добиться нужного мне неблокирующего поведения кэша. AsyncCacheLoader
Интерфейс, который мне понадобился бы для реализации, использует CompletableFuture
. Между тем, метод, который я хочу вызвать для загрузки записей кэша, является suspend
функцией.
Я могу устранить пробел следующим образом:
abstract class SuspendingCacheLoader<K, V>: AsyncCacheLoader<K, V> {
abstract suspend fun load(key: K): V
final override fun asyncLoad(key: K, executor: Executor): CompletableFuture<V> {
return GlobalScope.async(executor.asCoroutineDispatcher()) {
load(key)
}.asCompletableFuture()
}
}
Это приведет к запуску load
функции на предоставленном Executor
(по умолчанию, ForkJoinPool
), что с точки зрения Caffeine является правильным поведением.
Однако я знаю, что мне следует стараться избегать использования GlobalScope для запуска сопрограмм.
Я подумал о том, чтобы SuspendingCacheLoader
реализовать CoroutineScope
свою собственную сопрограмму и управлять ее собственным контекстом. Но CoroutineScope
предназначен для реализации объектами с управляемым жизненным циклом. Ни у кэша, ни у AsyncCacheLoader
нет перехватов жизненного цикла. Кэш владеет Executor
и CompletableFuture
экземплярами, поэтому он уже управляет жизненным циклом задач загрузки таким образом. Я не вижу, чтобы принадлежность задач контексту сопрограммы что-то добавляла, и я беспокоюсь, что я не смогу корректно закрыть контекст сопрограммы после того, как кэш перестанет использоваться.
Написать свой собственный механизм асинхронного кэширования было бы непомерно сложно, поэтому я хотел бы интегрироваться с реализацией Caffeine, если смогу.
Используется GlobalScope
правильный подход к реализации AsyncCacheLoader
или есть лучшее решение?
Ответ №1:
После некоторых размышлений я пришел к гораздо более простому решению, которое, как мне кажется, использует сопрограммы более идиоматично.
Этот подход работает за счет использования AsyncCache.get(key, mappingFunction)
вместо реализации AsyncCacheLoader
. Однако он игнорирует Executor
, на использование которого настроен кэш, следуя советам некоторых других ответов здесь.
class SuspendingCache<K, V>(private val asyncCache: AsyncCache<K, V>) {
suspend fun get(key: K): V = supervisorScope {
getAsync(key).await()
}
private fun CoroutineScope.getAsync(key: K) = asyncCache.get(key) { k, _ ->
future {
loadValue(k)
}
}
private suspend fun loadValue(key: K): V = TODO("Load the value")
}
Обратите внимание, что это зависит от kotlinx-coroutines-jdk8
для future
разработчика сопрограмм и await()
функции.
Я думаю, что игнорирование Executor
, вероятно, является правильным выбором. Как указывает @Kiskae, по умолчанию в кэше будет использоваться ForkJoinPool
. Выбор в пользу этого, а не диспетчера сопрограмм по умолчанию, вероятно, бесполезен. Однако было бы легко использовать его, если бы мы захотели, изменив getAsync
функцию:
private fun CoroutineScope.getAsync(key: K) = asyncCache.get(key) { k, executor ->
future(executor.asCoroutineDispatcher()) {
loadValue(k)
}
}
Комментарии:
1. Отлично работает для
get
, но как сделать ключ недействительным?2. Чтобы сделать ключ недействительным, вы могли бы добавить другой метод, который вызывает через
asyncCache.synchronous().invalidate(key)
. Вы можете сделать то же самое для любых других изменений, которые вам необходимо внести в базовый кэш.3. К сожалению, это метод блокировки. Однако его можно отправить в пул ввода-вывода.
4. Будьте осторожны, если
coroutineContext
используется вместоsupervisorContext
, это приведет к проблемам с исключениями. Я чувствую, что это безопаснее использоватьGlobalScope
Ответ №2:
Кэш владеет экземплярами Executor и CompletableFuture, поэтому он уже управляет жизненным циклом задач загрузки таким образом.
Это неверно, в документации на Caffeine
указано, что он использует предоставленный пользователем Executor
или ForkJoinPool.commonPool()
, если таковой не предоставлен. Это означает, что жизненного цикла по умолчанию не существует.
Несмотря на это, прямой вызов GlobalScope
кажется неправильным решением, потому что нет причин жестко кодировать выбор. Просто укажите a CoroutineScope
через конструктор и используйте GlobalScope
в качестве аргумента, пока у вас нет явного жизненного цикла для привязки кэша.
Комментарии:
1. Спасибо за ответ и за разъяснения по поводу исполнителя Caffeine. Мне нравится идея принятия
CoroutineScope
в конструкторе. Если бы я в конечном итоге передалCoroutineScope
из более широкого приложения, какие преимущества это могло бы мне дать? Я полагаю, что в рамкахAsyncCacheLoader
я бы хотел переопределить какDispatcher
(используя предоставленныйExecutor
), так иJob
(поскольку для сбоев загрузки кэша не имело бы смысла распространяться за пределы кэша). Как только это будет сделано, кажется, что от исходного контекста останется не так уж много.2. Я не понимаю, какие преимущества вы получили бы от переданной области сопрограммы, поскольку вы явно работаете за пределами мира сопрограмм, и у ваших абонентов нет этого под рукой.
Ответ №3:
Вот простое решение. Замените обозначения K, V на свой тип.
val cache = Caffeine.newBuilder().buildAsync<K, V> { key: K, _ ->
val future = CompletableFuture<V>()
launch {
val result = someAwaitOperation(key)
future.complete(result)
}
future
}
Комментарии:
1. нет необходимости делать все это, поскольку future creator уже доступен
Ответ №4:
Предложите метод расширения, подобный этому
suspend inline fun <K: Any, V: Any> Caffeine<Any, Any>.suspendingLoadingCache(
crossinline suspendedLoader: suspend (key: K) -> V
): AsyncLoadingCache<K, V> =
buildAsync { key, executor: Executor ->
CoroutineScope(executor.asCoroutineDispatcher()).future {
suspendedLoader(key)
}
}
Не рекомендую GlobalScope
, используйте CoroutineScope(executor.asCoroutineDispatcher())
future
метод определен в kotlinx-coroutines-jdk8
модуле
Ответ №5:
Вот мое решение:
Определите функцию расширения CoroutineVerticle
fun <K, V> CoroutineVerticle.buildCache(configurator: Caffeine<Any, Any>.() -> Unit = {}, loader: suspend CoroutineScope.(K) -> V) = Caffeine.newBuilder().apply(configurator).buildAsync { key: K, _ ->
// do not use cache's executor
future {
loader(key)
}
}
Создаем наш кэш внутри CoroutineVerticle
val cache : AsyncLoadingCache<String, String> = buildCache({
maximumSize(10_000)
expireAfterWrite(10, TimeUnit.MINUTES)
}) { key ->
// load data and return it
delay(1000)
"data for key: $key"
}
Используйте кэш
suspend fun doSomething() {
val data = cache.get('key').await()
val future = cache.get('key2')
val data2 = future.await()
}