Как мне использовать асинхронный кэш с сопрограммами Kotlin?

#kotlin #caching #kotlin-coroutines #caffeine

#kotlin #кэширование #kotlin-сопрограммы #кофеин

Вопрос:

У меня есть серверное приложение Kotlin JVM, использующее сопрограммы, и мне нужно поместить кэш перед неблокирующим сетевым вызовом. Я полагаю, что могу использовать Caffeine AsyncLoadingCache , чтобы добиться нужного мне неблокирующего поведения кэша. AsyncCacheLoader Интерфейс, который мне понадобился бы для реализации, использует CompletableFuture . Между тем, метод, который я хочу вызвать для загрузки записей кэша, является suspend функцией.

Я могу устранить пробел следующим образом:

 abstract class SuspendingCacheLoader<K, V>: AsyncCacheLoader<K, V> {
    abstract suspend fun load(key: K): V

    final override fun asyncLoad(key: K, executor: Executor): CompletableFuture<V> {
        return GlobalScope.async(executor.asCoroutineDispatcher()) {
            load(key)
        }.asCompletableFuture()
    }
}
  

Это приведет к запуску load функции на предоставленном Executor (по умолчанию, ForkJoinPool ), что с точки зрения Caffeine является правильным поведением.

Однако я знаю, что мне следует стараться избегать использования GlobalScope для запуска сопрограмм.

Я подумал о том, чтобы SuspendingCacheLoader реализовать CoroutineScope свою собственную сопрограмму и управлять ее собственным контекстом. Но CoroutineScope предназначен для реализации объектами с управляемым жизненным циклом. Ни у кэша, ни у AsyncCacheLoader нет перехватов жизненного цикла. Кэш владеет Executor и CompletableFuture экземплярами, поэтому он уже управляет жизненным циклом задач загрузки таким образом. Я не вижу, чтобы принадлежность задач контексту сопрограммы что-то добавляла, и я беспокоюсь, что я не смогу корректно закрыть контекст сопрограммы после того, как кэш перестанет использоваться.

Написать свой собственный механизм асинхронного кэширования было бы непомерно сложно, поэтому я хотел бы интегрироваться с реализацией Caffeine, если смогу.

Используется GlobalScope правильный подход к реализации AsyncCacheLoader или есть лучшее решение?

Ответ №1:

После некоторых размышлений я пришел к гораздо более простому решению, которое, как мне кажется, использует сопрограммы более идиоматично.

Этот подход работает за счет использования AsyncCache.get(key, mappingFunction) вместо реализации AsyncCacheLoader . Однако он игнорирует Executor , на использование которого настроен кэш, следуя советам некоторых других ответов здесь.

 class SuspendingCache<K, V>(private val asyncCache: AsyncCache<K, V>) {
    suspend fun get(key: K): V = supervisorScope {
        getAsync(key).await()
    }

    private fun CoroutineScope.getAsync(key: K) = asyncCache.get(key) { k, _ ->
        future { 
            loadValue(k) 
        }
    }

    private suspend fun loadValue(key: K): V = TODO("Load the value")
}
  

Обратите внимание, что это зависит от kotlinx-coroutines-jdk8 для future разработчика сопрограмм и await() функции.

Я думаю, что игнорирование Executor , вероятно, является правильным выбором. Как указывает @Kiskae, по умолчанию в кэше будет использоваться ForkJoinPool . Выбор в пользу этого, а не диспетчера сопрограмм по умолчанию, вероятно, бесполезен. Однако было бы легко использовать его, если бы мы захотели, изменив getAsync функцию:

 private fun CoroutineScope.getAsync(key: K) = asyncCache.get(key) { k, executor ->
    future(executor.asCoroutineDispatcher()) { 
        loadValue(k) 
    }
}

  

Комментарии:

1. Отлично работает для get , но как сделать ключ недействительным?

2. Чтобы сделать ключ недействительным, вы могли бы добавить другой метод, который вызывает через asyncCache.synchronous().invalidate(key) . Вы можете сделать то же самое для любых других изменений, которые вам необходимо внести в базовый кэш.

3. К сожалению, это метод блокировки. Однако его можно отправить в пул ввода-вывода.

4. Будьте осторожны, если coroutineContext используется вместо supervisorContext , это приведет к проблемам с исключениями. Я чувствую, что это безопаснее использовать GlobalScope

Ответ №2:

Кэш владеет экземплярами Executor и CompletableFuture, поэтому он уже управляет жизненным циклом задач загрузки таким образом.

Это неверно, в документации на Caffeine указано, что он использует предоставленный пользователем Executor или ForkJoinPool.commonPool() , если таковой не предоставлен. Это означает, что жизненного цикла по умолчанию не существует.

Несмотря на это, прямой вызов GlobalScope кажется неправильным решением, потому что нет причин жестко кодировать выбор. Просто укажите a CoroutineScope через конструктор и используйте GlobalScope в качестве аргумента, пока у вас нет явного жизненного цикла для привязки кэша.

Комментарии:

1. Спасибо за ответ и за разъяснения по поводу исполнителя Caffeine. Мне нравится идея принятия CoroutineScope в конструкторе. Если бы я в конечном итоге передал CoroutineScope из более широкого приложения, какие преимущества это могло бы мне дать? Я полагаю, что в рамках AsyncCacheLoader я бы хотел переопределить как Dispatcher (используя предоставленный Executor ), так и Job (поскольку для сбоев загрузки кэша не имело бы смысла распространяться за пределы кэша). Как только это будет сделано, кажется, что от исходного контекста останется не так уж много.

2. Я не понимаю, какие преимущества вы получили бы от переданной области сопрограммы, поскольку вы явно работаете за пределами мира сопрограмм, и у ваших абонентов нет этого под рукой.

Ответ №3:

Вот простое решение. Замените обозначения K, V на свой тип.

     val cache = Caffeine.newBuilder().buildAsync<K, V> { key: K, _ ->
      val future = CompletableFuture<V>()

      launch {
        val result = someAwaitOperation(key)
        future.complete(result)
      }

      future
    }
  

Комментарии:

1. нет необходимости делать все это, поскольку future creator уже доступен

Ответ №4:

Предложите метод расширения, подобный этому

 suspend inline fun <K: Any, V: Any> Caffeine<Any, Any>.suspendingLoadingCache(
    crossinline suspendedLoader: suspend (key: K) -> V
): AsyncLoadingCache<K, V> =
    buildAsync { key, executor: Executor ->
        CoroutineScope(executor.asCoroutineDispatcher()).future {
            suspendedLoader(key)
        }
    }
  

Не рекомендую GlobalScope , используйте CoroutineScope(executor.asCoroutineDispatcher())

future метод определен в kotlinx-coroutines-jdk8 модуле

Ответ №5:

Вот мое решение:

Определите функцию расширения CoroutineVerticle

 fun <K, V> CoroutineVerticle.buildCache(configurator: Caffeine<Any, Any>.() -> Unit = {}, loader: suspend CoroutineScope.(K) -> V) = Caffeine.newBuilder().apply(configurator).buildAsync { key: K, _ ->
    // do not use cache's executor
    future {
        loader(key)
    }
}
  

Создаем наш кэш внутри CoroutineVerticle

 val cache : AsyncLoadingCache<String, String> = buildCache({
  maximumSize(10_000)
  expireAfterWrite(10, TimeUnit.MINUTES)
}) { key ->
    // load data and return it
    delay(1000)
    "data for key: $key"
}
  

Используйте кэш

 suspend fun doSomething() {
    val data = cache.get('key').await()

    val future = cache.get('key2')
    val data2 = future.await()
}