Путешествие из akka-потока в fs2 — как определить http-поток akka-потока, подобный этапу в fs2, с использованием http4s

#scala #http4s #fs2

Вопрос:

я нахожусь в своем путешествии, чтобы углубить свои знания в fs2, и хочу попробовать fs2-кафку для варианта использования, в котором я заменил бы поток akka. Идея проста: считывайте данные из кафки и отправляйте данные по http-запросу в приемник, а затем возвращайтесь к кафке в случае успеха. До сих пор я не могу разобраться в http-части. В akka stream / akka http у вас есть готовый поток для этого https://doc.akka.io/docs/akka-http/current/client-side/host-level.html#using-a-host-connection-pool

 Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool]
 

Которые безупречно интегрируются с потоком akka.

Я пытался понять, могу ли я сделать что-то подобное с http4s и fs2 .

У кого-нибудь есть какая-либо ссылка, пример кода, блог и что еще, что показывает, как сделать такую интеграцию. До сих пор единственное, о чем я мог думать, — это обернуть поток в метод использования клиентского ресурса, т. е.

 BlazeClientBuilder[IO](IORuntime.global.compute).resource.use { ..... run stream here ..... }
 

Даже тогда я не уверен во всем этом

Комментарии:

1. someStrem.evalMap(data => sendHttpRequest(someClient, data)) — Где someClient было создано подобное Stream.resource(SomeClientBuilder....build).flatMap { someClient => ??? } ? Или в чем был вопрос? как создать клиента? как выполнить http-запрос?

2. Я думаю, вы почти ответили на мой вопрос. Я искал подтверждения тому, что хорошо понимаю, что нужно делать. Я работал с akka stream очень долгое время, и, как вы знаете, у них есть обширная документация и множество примеров. Это не относится к fs2 или fs2-кафке (для сравнения). Так что этот конкретный вариант использования-это то, что нужно выяснить самостоятельно, основываясь на том, насколько хорошо вы понимаете суть экосистемы.

3. Хотя я становлюсь лучше с экосистемой all cats-effect, я все еще учусь и все еще не чувствую себя уверенно на 100%. Поэтому я ищу готовый пример, подобный тому, что у вас есть в документации akka, чтобы подтвердить, что я иду правильным путем

4. Итак, вопрос заключался в том, как вы создаете клиента и используете его в своем потоке для фактической отправки данных, которые вы используете из Кафки ? Можете ли вы предоставить полную линию основной идеи, после этого я выясню все остальное. Спасибо

5. Но я думаю, что уже могу работать с тем, что вы предоставили 🙂

Ответ №1:

Дело в экосистеме уровня типов заключается в том, что все это просто библиотека, вам не нужны примеры того, как многие из них взаимодействуют друг с другом, вам просто нужно понять, как работает каждая библиотека и основные правила композиции.

 def createClient(/** whatever arguments you need */): Resource[IO, Client[IO]] = {
  // Fill this based on the documentation of the client of your choice:
  // I would recommend the ember client from http4s:
  // https://http4s.org/v0.23/api/org/http4s/ember/client/emberclientbuilder 
}


def sendHttpRequest(client: Client[IO])(data: Data): IO[Result] = {
  // Fill this based on the documentation of your client:
  // https://http4s.org/v0.23/client/
  // https://http4s.org/v0.23/api/org/http4s/client/client
}

def getStreamOfRecords(/** whatever arguments you need */): Stream[IO, CommittableConsumerRecord[IO, Key, Data]] = {
  // Fill this based on the documentation of fs2-kafka:
  // https://fd4s.github.io/fs2-kafka/docs/consumers
}

def program(/** whatever arguments you need */): Stream[IO, Unit] = {
  // Based on the documentation of fs2 and fs2-kafka I would guess something like this:
  Stream.fromResource(createClient(...)).flatMap { client =>
    getStreamOfRecords(...).evalMapFilter { committable =>
      sendHttpRequest(client)(data = committable.record).map { result =>
        if (result.isSuccess) Some(committable.offset)
        else None
      }
    }.through(commitBatchWithin(...))
  }
}

object Main extends IOApp.Simple {
  override final val run: IO[Unit] =
    program(...).compile.drain
}
 

Обратите внимание, что я написал все это у себя в голове, и, просто взглянув на документацию, вам нужно многое изменить (особенно типы, такие как Data amp; Result ). А также настройка таких вещей, как обработка ошибок и время возврата к Кафке.
Тем не менее, я ожидаю, что это поможет вам получить представление о том, как структурировать свой код.

Комментарии:

1. Большое вам за это спасибо. Это помогло 🙂