#scala #http4s #fs2
Вопрос:
я нахожусь в своем путешествии, чтобы углубить свои знания в fs2, и хочу попробовать fs2-кафку для варианта использования, в котором я заменил бы поток akka. Идея проста: считывайте данные из кафки и отправляйте данные по http-запросу в приемник, а затем возвращайтесь к кафке в случае успеха. До сих пор я не могу разобраться в http-части. В akka stream / akka http у вас есть готовый поток для этого https://doc.akka.io/docs/akka-http/current/client-side/host-level.html#using-a-host-connection-pool
Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool]
Которые безупречно интегрируются с потоком akka.
Я пытался понять, могу ли я сделать что-то подобное с http4s и fs2 .
У кого-нибудь есть какая-либо ссылка, пример кода, блог и что еще, что показывает, как сделать такую интеграцию. До сих пор единственное, о чем я мог думать, — это обернуть поток в метод использования клиентского ресурса, т. е.
BlazeClientBuilder[IO](IORuntime.global.compute).resource.use { ..... run stream here ..... }
Даже тогда я не уверен во всем этом
Комментарии:
1.
someStrem.evalMap(data => sendHttpRequest(someClient, data))
— ГдеsomeClient
было создано подобноеStream.resource(SomeClientBuilder....build).flatMap { someClient => ??? }
? Или в чем был вопрос? как создать клиента? как выполнить http-запрос?2. Я думаю, вы почти ответили на мой вопрос. Я искал подтверждения тому, что хорошо понимаю, что нужно делать. Я работал с akka stream очень долгое время, и, как вы знаете, у них есть обширная документация и множество примеров. Это не относится к fs2 или fs2-кафке (для сравнения). Так что этот конкретный вариант использования-это то, что нужно выяснить самостоятельно, основываясь на том, насколько хорошо вы понимаете суть экосистемы.
3. Хотя я становлюсь лучше с экосистемой all cats-effect, я все еще учусь и все еще не чувствую себя уверенно на 100%. Поэтому я ищу готовый пример, подобный тому, что у вас есть в документации akka, чтобы подтвердить, что я иду правильным путем
4. Итак, вопрос заключался в том, как вы создаете клиента и используете его в своем потоке для фактической отправки данных, которые вы используете из Кафки ? Можете ли вы предоставить полную линию основной идеи, после этого я выясню все остальное. Спасибо
5. Но я думаю, что уже могу работать с тем, что вы предоставили 🙂
Ответ №1:
Дело в экосистеме уровня типов заключается в том, что все это просто библиотека, вам не нужны примеры того, как многие из них взаимодействуют друг с другом, вам просто нужно понять, как работает каждая библиотека и основные правила композиции.
def createClient(/** whatever arguments you need */): Resource[IO, Client[IO]] = {
// Fill this based on the documentation of the client of your choice:
// I would recommend the ember client from http4s:
// https://http4s.org/v0.23/api/org/http4s/ember/client/emberclientbuilder
}
def sendHttpRequest(client: Client[IO])(data: Data): IO[Result] = {
// Fill this based on the documentation of your client:
// https://http4s.org/v0.23/client/
// https://http4s.org/v0.23/api/org/http4s/client/client
}
def getStreamOfRecords(/** whatever arguments you need */): Stream[IO, CommittableConsumerRecord[IO, Key, Data]] = {
// Fill this based on the documentation of fs2-kafka:
// https://fd4s.github.io/fs2-kafka/docs/consumers
}
def program(/** whatever arguments you need */): Stream[IO, Unit] = {
// Based on the documentation of fs2 and fs2-kafka I would guess something like this:
Stream.fromResource(createClient(...)).flatMap { client =>
getStreamOfRecords(...).evalMapFilter { committable =>
sendHttpRequest(client)(data = committable.record).map { result =>
if (result.isSuccess) Some(committable.offset)
else None
}
}.through(commitBatchWithin(...))
}
}
object Main extends IOApp.Simple {
override final val run: IO[Unit] =
program(...).compile.drain
}
Обратите внимание, что я написал все это у себя в голове, и, просто взглянув на документацию, вам нужно многое изменить (особенно типы, такие как Data
amp; Result
). А также настройка таких вещей, как обработка ошибок и время возврата к Кафке.
Тем не менее, я ожидаю, что это поможет вам получить представление о том, как структурировать свой код.
Комментарии:
1. Большое вам за это спасибо. Это помогло 🙂