Работа с состоянием и совместное использование с задачей Monix в рекурсивном цикле

#scala #monix

Вопрос:

У меня есть следующий код, который рекурсивно повторяется и выполняет что-то по сети. По мере прохождения по сети я хотел бы выполнить некоторые оптимизации, где самой первой оптимизацией было бы избежать перехода по сети для определенных элементов, которые я уже пробовал.

Например. в приведенном ниже случае я вызываю URL-адрес, извлекаю ссылки, найденные в этом URL-адресе, вызываю эти URL-адреса и сообщаю о состоянии. Поскольку вполне возможно, что определенные URL-адреса могут быть извлечены снова, для тех URL-адресов, которые не удалось, я хотел бы добавить их в глобальное состояние, чтобы, когда я столкнусь с этим URL-адресом в следующий раз, я буду избегать этих сетевых вызовов.

Вот код:

 def callURLWithCache(url: String): Task[HttpResult] = {
    Task {
      Http(url).timeout(connTimeoutMs = 1000, readTimeoutMs = 3000).asString
    }.attempt.map {
      case Left(err) =>
        println(s"ERR happened ----------------- $url ************************ ${err.getMessage}")
        // Add to the cache
        val httpResult = HttpResult(source = url, isSuccess = false, statusCode = 1000, errorMessage = Some(err.getMessage))
        val returnnnn: Try[Any] = httpResultErrorCache.put(url)(httpResult)
        httpResult
      case Right(doc) =>
        if (doc.isError) {
          HttpResult(source = url, isSuccess = doc.isSuccess, statusCode = doc.code)
        } else {
          val hrefs = (browser.parseString(doc.body) >> elementList("a[href]") >?> attr("href"))
            .distinct.flatten.filter(_.startsWith("http"))
          HttpResult(source = url, isSuccess = doc.isSuccess, statusCode = doc.code, elems = hrefs)
        }
    }
  }
 

Вы можете видеть в левом блоке case (….), что я добавляю класс неудачного обращения в кэш, который я определяю глобально в заключающем классе этой функции как:

 val underlyingCaffeineCache: cache.Cache[String, Entry[HttpResult]] = Caffeine.newBuilder().maximumSize(10000L).build[String, Entry[HttpResult]]
implicit val httpResultErrorCache: Cache[HttpResult] = CaffeineCache(underlyingCaffeineCache)
 

Вот функция, с помощью которой я выполняю рекурсивную операцию:

 def parseSimpleWithFilter(filter: ParserFilter): Task[Seq[HttpResult]] = {
    def parseInner(depth: Int, acc: HttpResult): Task[Seq[HttpResult]] = {
      import cats.implicits._
      if (depth > 0) {
        val batched = acc.elems.collect {
          case elem if httpResultErrorCache.get(elem).toOption.exists(_.isEmpty) =>
            callURLWithCache(elem).flatMap(newElems => parseInner(depth - 1, newElems))
        }.sliding(30).toSeq
          .map(chunk => Task.parSequence(chunk))
        Task.sequence(batched).map(_.flatten).map(_.flatten)
      } else Task.pure(Seq(acc))
    }
    callURLWithCache(filter.url).map(elem => parseInner(filter.recursionDepth, elem)).flatten
  }
 

Видно, что я проверяю, находится ли URL-адрес, который у меня есть в качестве текущего элемента, уже в кэше, что означает, что я уже пробовал его и потерпел неудачу, поэтому я хотел бы избежать повторного вызова HTTP для него.

Но происходит то, что кэш httpResultErrorCache всегда оказывается пустым. Я не уверен, что фрагмент задачи вызывает такое поведение. Есть какие-нибудь идеи о том, как заставить кэш работать?

Комментарии:

1. Похоже , вы кладете вещи в кэш только с isSuccess=false помощью, но только get один раз isSuccess=true … Кроме того, похоже, что вы выполняете 30 задач параллельно, так что у них, скорее всего, не будет доступа к результатам друг друга.

2. Я догадался, что виновником этого является параллелизм. Есть ли обходной путь для этого? Я думаю, что вы путаете с isSuccess в классе case с isSucess из попытки

3. О… да… Не используйте Try так, это чертовски сбивает с толку. Просто сделай if cache.get(elem).toOption.exists(_.isEmpty)

4. @Dima Передал бы состояние в качестве ссылки на callURLWithCache и вернул бы его в работу синтаксического анализа? А ты как думаешь?

5. Другое дело, уверены ли вы, что хотите использовать sliding , а не grouped ? Похоже, вы обрабатываете один и тот же элемент много раз … Может быть, в этом и есть твоя проблема? Если вы видите несколько ошибок с одного и того же URL — адреса, это может быть причиной-два разных блока делают это параллельно …