#apache-kafka #apache-storm #esper #bolt #spout
#apache-kafka #apache-storm #эспер #bolt #spout
Вопрос:
Я новичок в Apache Storm.
Я пытаюсь разработать систему потоковой обработки в реальном времени, используя Apache Kafka, Storm и ESPER CEP engine.
Для этого у меня есть один KafkaSpout, который будет отправлять потоки в Bolts (в котором есть мои запросы CEP) для фильтрации потока.
Я уже создал топологию и пытаюсь запустить ее в локальном кластере
Проблема в том, что запрос CEP, выполняемый в моих bolts, требует пакетов кортежей для выполнения оконных операций над потоками. И в моей топологии KafkaSpout отправляет Болтам для обработки только один кортеж за раз. Итак, мой запрос CEP работает не так, как ожидалось.
Я использую KafkaSpout по умолчанию в Storm. Есть ли какой-либо способ, которым я могу отправить несколько разных кортежей одновременно в Bolts? Некоторые настройки конфигурации могут это сделать или мне нужно создать свой собственный KafkaSpout для этого?
Пожалуйста, помогите!!
Моя топология:
TopologyBuilder builder = новый TopologyBuilder();
builder.setSpout(«KafkaSpout», новый KafkaSpout<>(KafkaSpoutConfig.builder(«localhost:» 9092, «weatherdata»).setProp(ConsumerConfig.GROUP_ID_CONFIG, «weather-consumer-group»).build()),4);
builder.setBolt(«A», новые FeatureSelectionBolt(), 2).globalGrouping(«KafkaSpout»);
builder.setBolt(«B», новый TrendDetectionBolt(), 2).Группировка в случайном порядке(«A»)
Я использую 2 Bolts и один spout.
Мой запрос esper, выполняемый в Bolt A, является
выберите первый (e), последний (e) из weatherEvent.win:длина (3) как e
Здесь я пытаюсь получить первое и последнее событие из окна длиной три из потока событий. Но я получаю одно и то же первое и последнее событие, потому что KafkaSpout отправляет только один кортеж за раз.
Ответ №1:
Spout не может этого сделать, но вы можете использовать любую поддержку окон Storm https://storm.apache.org/releases/2.0.0-SNAPSHOT/Windowing.html или просто напишите модуль агрегации bolt и поместите его между spout и остальной топологией.
Итак, ваша топология должна быть spout -> aggregator -> feature selection -> trend detection
.
Я бы порекомендовал вам попробовать встроенную поддержку работы с окнами, но если вы предпочитаете написать свою собственную агрегацию, вашему bolt действительно просто нужно получить некоторое количество кортежей (например, 3) и выдать новый кортеж, содержащий все значения.
Агрегатор bolt должен выполнять что-то вроде
private List<Tuple> buffered;
execute(Tuple input) {
if (buffered.size != 2) {
buffered.add(input)
return
}
Tuple first = buffered.get(0)
Tuple second = buffered.get(1)
Values aggregate = new Values(first.getValues(), second.getValues(), input.getValues())
List<Tuple> anchors = List.of(first, second, input)
collector.emit(anchors, aggregate)
collector.ack(first, second, input)
buffered.clear()
}
Таким образом, вы получите один кортеж, содержащий содержимое 3 входных кортежей.
Комментарии:
1. Я указываю длину windows и длину скользящего окна, например: builder.setBolt(«FeatureSelectionBolt», new FeatureSelectionBolt().withWindow(новый BaseWindowedBolt. Count(3), новый BaseWindowedBolt. Количество (5)), 2) Но я все равно получаю исключение java.lang. Исключение IllegalArgumentException: длина окна не указана org.apache.storm.topology. WindowedBoltExecutor.validate(WindowedBoltExecutor.java:126) ~[storm-core-1.2.1.jar:1.2.1] org.apache.storm.topology. WindowedBoltExecutor.initWindowManager(WindowedBoltExecutor.java: 200) ~ [storm-core-1.2.1.jar: 1.2.1] В чем здесь проблема?
2. Не уверен, по-моему, все выглядит нормально. Я предполагаю, что FeatureSelectionBolt расширяет BaseWindowedBolt? А также то, что вы не расширяете BaseWindowedBolt никакими bolts, которые вы не вызываете . При включенном окне?
3. Да, FeatureSelectionBolt расширяет BaseWindowedBolt. Кроме того, у меня нет ни одного bolts, который не расширяет BaseWindowedBolt и все еще вызывает .withWindow() . Поэтому не уверен, в чем причина проблемы. К вашему сведению, у меня есть еще два болта (которые расширяют BaseRichBolt, а не BaseWindowedBolt), которые берут кортежи из FeatureSelection Bolt. Кроме того, я отправляю свою топологию в локальный кластер. Я надеюсь, что это не создает проблемы
4. Я не уверен, в чем проблема, но если вы работаете с localcluster, вы можете довольно легко ее отладить. Исключение поступает из github.com/apache/storm/blob/v1.2.1/storm-core/src/jvm/org /… потому что windowLengthCount равно нулю. Это происходит только в том случае, github.com/apache/storm/blob/v1.2.1/storm-core/src/jvm/org /… равно false.
5. Конфигурация добавляется в карту stormConf при вызове . withWindow здесь github.com/apache/storm/blob/v1.2.1/storm-core/src/jvm/org /… . Попробуйте выполнить отладку, чтобы увидеть, сможете ли вы определить, почему этой конфигурации нет в stormConf. Я посмотрю, смогу ли я запустить наш пример оконной топологии в LocalCluster.