Как отправить несколько (разных) кортежей из одного KafkaSpout одновременно в bolt?

#apache-kafka #apache-storm #esper #bolt #spout

#apache-kafka #apache-storm #эспер #bolt #spout

Вопрос:

Я новичок в Apache Storm.

Я пытаюсь разработать систему потоковой обработки в реальном времени, используя Apache Kafka, Storm и ESPER CEP engine.

Для этого у меня есть один KafkaSpout, который будет отправлять потоки в Bolts (в котором есть мои запросы CEP) для фильтрации потока.

Я уже создал топологию и пытаюсь запустить ее в локальном кластере

Проблема в том, что запрос CEP, выполняемый в моих bolts, требует пакетов кортежей для выполнения оконных операций над потоками. И в моей топологии KafkaSpout отправляет Болтам для обработки только один кортеж за раз. Итак, мой запрос CEP работает не так, как ожидалось.

Я использую KafkaSpout по умолчанию в Storm. Есть ли какой-либо способ, которым я могу отправить несколько разных кортежей одновременно в Bolts? Некоторые настройки конфигурации могут это сделать или мне нужно создать свой собственный KafkaSpout для этого?

Пожалуйста, помогите!!

Моя топология:

TopologyBuilder builder = новый TopologyBuilder();

builder.setSpout(«KafkaSpout», новый KafkaSpout<>(KafkaSpoutConfig.builder(«localhost:» 9092, «weatherdata»).setProp(ConsumerConfig.GROUP_ID_CONFIG, «weather-consumer-group»).build()),4);

builder.setBolt(«A», новые FeatureSelectionBolt(), 2).globalGrouping(«KafkaSpout»);

builder.setBolt(«B», новый TrendDetectionBolt(), 2).Группировка в случайном порядке(«A»)

Я использую 2 Bolts и один spout.

Мой запрос esper, выполняемый в Bolt A, является

выберите первый (e), последний (e) из weatherEvent.win:длина (3) как e

Здесь я пытаюсь получить первое и последнее событие из окна длиной три из потока событий. Но я получаю одно и то же первое и последнее событие, потому что KafkaSpout отправляет только один кортеж за раз.

Ответ №1:

Spout не может этого сделать, но вы можете использовать любую поддержку окон Storm https://storm.apache.org/releases/2.0.0-SNAPSHOT/Windowing.html или просто напишите модуль агрегации bolt и поместите его между spout и остальной топологией.

Итак, ваша топология должна быть spout -> aggregator -> feature selection -> trend detection .

Я бы порекомендовал вам попробовать встроенную поддержку работы с окнами, но если вы предпочитаете написать свою собственную агрегацию, вашему bolt действительно просто нужно получить некоторое количество кортежей (например, 3) и выдать новый кортеж, содержащий все значения.

Агрегатор bolt должен выполнять что-то вроде

 private List<Tuple> buffered;

execute(Tuple input) {
  if (buffered.size != 2) {
    buffered.add(input)
    return
  }
  Tuple first = buffered.get(0)
  Tuple second = buffered.get(1)
  Values aggregate = new Values(first.getValues(), second.getValues(), input.getValues())
  List<Tuple> anchors = List.of(first, second, input)
  collector.emit(anchors, aggregate)
  collector.ack(first, second, input)
  buffered.clear()
}
  

Таким образом, вы получите один кортеж, содержащий содержимое 3 входных кортежей.

Комментарии:

1. Я указываю длину windows и длину скользящего окна, например: builder.setBolt(«FeatureSelectionBolt», new FeatureSelectionBolt().withWindow(новый BaseWindowedBolt. Count(3), новый BaseWindowedBolt. Количество (5)), 2) Но я все равно получаю исключение java.lang. Исключение IllegalArgumentException: длина окна не указана org.apache.storm.topology. WindowedBoltExecutor.validate(WindowedBoltExecutor.java:126) ~[storm-core-1.2.1.jar:1.2.1] org.apache.storm.topology. WindowedBoltExecutor.initWindowManager(WindowedBoltExecutor.java: 200) ~ [storm-core-1.2.1.jar: 1.2.1] В чем здесь проблема?

2. Не уверен, по-моему, все выглядит нормально. Я предполагаю, что FeatureSelectionBolt расширяет BaseWindowedBolt? А также то, что вы не расширяете BaseWindowedBolt никакими bolts, которые вы не вызываете . При включенном окне?

3. Да, FeatureSelectionBolt расширяет BaseWindowedBolt. Кроме того, у меня нет ни одного bolts, который не расширяет BaseWindowedBolt и все еще вызывает .withWindow() . Поэтому не уверен, в чем причина проблемы. К вашему сведению, у меня есть еще два болта (которые расширяют BaseRichBolt, а не BaseWindowedBolt), которые берут кортежи из FeatureSelection Bolt. Кроме того, я отправляю свою топологию в локальный кластер. Я надеюсь, что это не создает проблемы

4. Я не уверен, в чем проблема, но если вы работаете с localcluster, вы можете довольно легко ее отладить. Исключение поступает из github.com/apache/storm/blob/v1.2.1/storm-core/src/jvm/org /… потому что windowLengthCount равно нулю. Это происходит только в том случае, github.com/apache/storm/blob/v1.2.1/storm-core/src/jvm/org /… равно false.

5. Конфигурация добавляется в карту stormConf при вызове . withWindow здесь github.com/apache/storm/blob/v1.2.1/storm-core/src/jvm/org /… . Попробуйте выполнить отладку, чтобы увидеть, сможете ли вы определить, почему этой конфигурации нет в stormConf. Я посмотрю, смогу ли я запустить наш пример оконной топологии в LocalCluster.