#scala #monix
Вопрос:
У меня есть следующий код, который рекурсивно повторяется и выполняет что-то по сети. По мере прохождения по сети я хотел бы выполнить некоторые оптимизации, где самой первой оптимизацией было бы избежать перехода по сети для определенных элементов, которые я уже пробовал.
Например. в приведенном ниже случае я вызываю URL-адрес, извлекаю ссылки, найденные в этом URL-адресе, вызываю эти URL-адреса и сообщаю о состоянии. Поскольку вполне возможно, что определенные URL-адреса могут быть извлечены снова, для тех URL-адресов, которые не удалось, я хотел бы добавить их в глобальное состояние, чтобы, когда я столкнусь с этим URL-адресом в следующий раз, я буду избегать этих сетевых вызовов.
Вот код:
def callURLWithCache(url: String): Task[HttpResult] = {
Task {
Http(url).timeout(connTimeoutMs = 1000, readTimeoutMs = 3000).asString
}.attempt.map {
case Left(err) =>
println(s"ERR happened ----------------- $url ************************ ${err.getMessage}")
// Add to the cache
val httpResult = HttpResult(source = url, isSuccess = false, statusCode = 1000, errorMessage = Some(err.getMessage))
val returnnnn: Try[Any] = httpResultErrorCache.put(url)(httpResult)
httpResult
case Right(doc) =>
if (doc.isError) {
HttpResult(source = url, isSuccess = doc.isSuccess, statusCode = doc.code)
} else {
val hrefs = (browser.parseString(doc.body) >> elementList("a[href]") >?> attr("href"))
.distinct.flatten.filter(_.startsWith("http"))
HttpResult(source = url, isSuccess = doc.isSuccess, statusCode = doc.code, elems = hrefs)
}
}
}
Вы можете видеть в левом блоке case (….), что я добавляю класс неудачного обращения в кэш, который я определяю глобально в заключающем классе этой функции как:
val underlyingCaffeineCache: cache.Cache[String, Entry[HttpResult]] = Caffeine.newBuilder().maximumSize(10000L).build[String, Entry[HttpResult]]
implicit val httpResultErrorCache: Cache[HttpResult] = CaffeineCache(underlyingCaffeineCache)
Вот функция, с помощью которой я выполняю рекурсивную операцию:
def parseSimpleWithFilter(filter: ParserFilter): Task[Seq[HttpResult]] = {
def parseInner(depth: Int, acc: HttpResult): Task[Seq[HttpResult]] = {
import cats.implicits._
if (depth > 0) {
val batched = acc.elems.collect {
case elem if httpResultErrorCache.get(elem).toOption.exists(_.isEmpty) =>
callURLWithCache(elem).flatMap(newElems => parseInner(depth - 1, newElems))
}.sliding(30).toSeq
.map(chunk => Task.parSequence(chunk))
Task.sequence(batched).map(_.flatten).map(_.flatten)
} else Task.pure(Seq(acc))
}
callURLWithCache(filter.url).map(elem => parseInner(filter.recursionDepth, elem)).flatten
}
Видно, что я проверяю, находится ли URL-адрес, который у меня есть в качестве текущего элемента, уже в кэше, что означает, что я уже пробовал его и потерпел неудачу, поэтому я хотел бы избежать повторного вызова HTTP для него.
Но происходит то, что кэш httpResultErrorCache всегда оказывается пустым. Я не уверен, что фрагмент задачи вызывает такое поведение. Есть какие-нибудь идеи о том, как заставить кэш работать?
Комментарии:
1. Похоже , вы кладете вещи в кэш только с
isSuccess=false
помощью, но толькоget
один разisSuccess=true
… Кроме того, похоже, что вы выполняете 30 задач параллельно, так что у них, скорее всего, не будет доступа к результатам друг друга.2. Я догадался, что виновником этого является параллелизм. Есть ли обходной путь для этого? Я думаю, что вы путаете с isSuccess в классе case с isSucess из попытки
3. О… да… Не используйте
Try
так, это чертовски сбивает с толку. Просто сделайif cache.get(elem).toOption.exists(_.isEmpty)
4. @Dima Передал бы состояние в качестве ссылки на callURLWithCache и вернул бы его в работу синтаксического анализа? А ты как думаешь?
5. Другое дело, уверены ли вы, что хотите использовать
sliding
, а неgrouped
? Похоже, вы обрабатываете один и тот же элемент много раз … Может быть, в этом и есть твоя проблема? Если вы видите несколько ошибок с одного и того же URL — адреса, это может быть причиной-два разных блока делают это параллельно …