Как динамически применять запланированный потребитель kafka на основе темы?

#java #apache-kafka

#java #apache-kafka

Вопрос:

В настоящее время я борюсь с потребителем в kafka, который может каким-то образом запланировать выполнение на будущее.

Подводя итог: у меня есть хранилище больших данных (файл .csv), и записи содержат 2 столбца: временную метку и значение. Я пытаюсь обработать эти значения на основе их метки времени. Первая запись должна быть немедленно использована kafka, следующая должна быть обработана в будущем с задержкой «временная метка текущей записи — временная метка предыдущей записи» (это не очень большая разница, всего несколько секунд = результат будет в миллисекундах) и так далее.

Поэтому, по сути, я не могу найти решение для реализации потребителя в kafka, который принимает каждую запись на основе метки времени и использует эту точную задержку. Мне нужно просто смоделировать эти значения, и они должны быть вставлены в БД в соответствии с этой задержкой, чтобы работать должным образом.

Я пытался обходить потоки с исполнителями, но для больших данных это неправильный способ.

Я попытался создать динамические темы для производителей на основе метки времени, а затем подписаться на них, а затем каким-то образом обработать с помощью очереди. Это не сработало.

Я ожидаю, что kafka будет использовать каждую запись с задержкой, основанной на отметке времени.

Комментарии:

1. Пожалуйста, обратите внимание, что Kafka получает (использует) сообщения от Producer . Потребитель Kafka просто считывает сообщения из Kafka и обрабатывает поток сообщений независимо от задержки между ними.

2. Если вы хотите задержать потребителя, почему бы вам просто не добавить a Thread.sleep() в цикл потребителей while на основе ранее прочитанного значения?

3. Потому что у меня есть несколько идентичных наборов данных, которые должны выполняться одновременно (или очень близко друг к другу) в системной временной метке. Если я просто переведу поток в режим ожидания, 2 идентичных значения будут иметь разные временные метки, даже не близко друг к другу.

Ответ №1:

Я ожидаю, что kafka будет использовать каждую запись с задержкой, основанной на отметке времени

Если у вас есть определенная задержка между сообщениями, то Kafka не является подходящим решением. Когда вы отправляете сообщения в Kafka, в большинстве сценариев вы используете сеть. Что может добавить свою собственную, непредсказуемую задержку. Kafka выполняется как другой процесс, и никто не может гарантировать, в какой момент этот процесс будет готов к приему следующего сообщения. ОС может приостановить процесс, GC может запуститься и т.д. Это добавляет еще одну задержку, которую никто не мог предсказать.
Кроме того, Kafka не предназначен для работы со временем получения сообщения. Он больше ориентирован на порядок сообщений, низкую задержку и высокую пропускную способность, но не на время.

Комментарии:

1. Спасибо за ваш ответ. Да, я тоже думал об этом. Итак, какое решение вы бы мне порекомендовали? На мой взгляд, я думаю, что решением был бы периодический планировщик задач (с фиксированным периодическим временем задержки, например, 1 секунда), который проверяет хранилище, например таблицу из базы данных с сохраненными ранее объектами.

2. @AlexSween Непонятно, какова ваша цель. Вероятно, существуют и другие решения, которые не требуют обработки сообщений с определенной задержкой. Поскольку это было бы не по теме, не могли бы вы написать мне напрямую?

3. @AlexSween Вероятно, вам нужна база данных временных рядов , такая как InfluxDB.