#java #apache-kafka
#java #apache-kafka
Вопрос:
В настоящее время я борюсь с потребителем в kafka, который может каким-то образом запланировать выполнение на будущее.
Подводя итог: у меня есть хранилище больших данных (файл .csv), и записи содержат 2 столбца: временную метку и значение. Я пытаюсь обработать эти значения на основе их метки времени. Первая запись должна быть немедленно использована kafka, следующая должна быть обработана в будущем с задержкой «временная метка текущей записи — временная метка предыдущей записи» (это не очень большая разница, всего несколько секунд = результат будет в миллисекундах) и так далее.
Поэтому, по сути, я не могу найти решение для реализации потребителя в kafka, который принимает каждую запись на основе метки времени и использует эту точную задержку. Мне нужно просто смоделировать эти значения, и они должны быть вставлены в БД в соответствии с этой задержкой, чтобы работать должным образом.
Я пытался обходить потоки с исполнителями, но для больших данных это неправильный способ.
Я попытался создать динамические темы для производителей на основе метки времени, а затем подписаться на них, а затем каким-то образом обработать с помощью очереди. Это не сработало.
Я ожидаю, что kafka будет использовать каждую запись с задержкой, основанной на отметке времени.
Комментарии:
1. Пожалуйста, обратите внимание, что Kafka получает (использует) сообщения от
Producer
. Потребитель Kafka просто считывает сообщения из Kafka и обрабатывает поток сообщений независимо от задержки между ними.2. Если вы хотите задержать потребителя, почему бы вам просто не добавить a
Thread.sleep()
в цикл потребителейwhile
на основе ранее прочитанного значения?3. Потому что у меня есть несколько идентичных наборов данных, которые должны выполняться одновременно (или очень близко друг к другу) в системной временной метке. Если я просто переведу поток в режим ожидания, 2 идентичных значения будут иметь разные временные метки, даже не близко друг к другу.
Ответ №1:
Я ожидаю, что kafka будет использовать каждую запись с задержкой, основанной на отметке времени
Если у вас есть определенная задержка между сообщениями, то Kafka не является подходящим решением. Когда вы отправляете сообщения в Kafka, в большинстве сценариев вы используете сеть. Что может добавить свою собственную, непредсказуемую задержку. Kafka выполняется как другой процесс, и никто не может гарантировать, в какой момент этот процесс будет готов к приему следующего сообщения. ОС может приостановить процесс, GC может запуститься и т.д. Это добавляет еще одну задержку, которую никто не мог предсказать.
Кроме того, Kafka не предназначен для работы со временем получения сообщения. Он больше ориентирован на порядок сообщений, низкую задержку и высокую пропускную способность, но не на время.
Комментарии:
1. Спасибо за ваш ответ. Да, я тоже думал об этом. Итак, какое решение вы бы мне порекомендовали? На мой взгляд, я думаю, что решением был бы периодический планировщик задач (с фиксированным периодическим временем задержки, например, 1 секунда), который проверяет хранилище, например таблицу из базы данных с сохраненными ранее объектами.
2. @AlexSween Непонятно, какова ваша цель. Вероятно, существуют и другие решения, которые не требуют обработки сообщений с определенной задержкой. Поскольку это было бы не по теме, не могли бы вы написать мне напрямую?
3. @AlexSween Вероятно, вам нужна база данных временных рядов , такая как InfluxDB.