остановить обработку кортежа в определенном болте

#apache-storm

#apache-storm

Вопрос:

Например, у меня есть топология, состоящая из 1 носика и 4 болтов

 spout A -> bolt B -> bolt C -> bolt E
                  -> bolt D
  

Только если некоторый условный оператор в болте B имеет значение true, он передает кортеж болту C и болту D.

И только если некоторый условный оператор в болте C имеет значение true, он передает кортеж в болт E.

Таким образом, один кортеж может достигать только болта B или (болта C и D).

Я использую BaseBasicBolt, который, насколько мне известно, автоматически активируется после вызова collector.emit.

Например, метод execute в bolt B выглядит следующим образом

 public class boltB extends BaseBasicBolt {
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        ...some logic goes here
        if (response.getCount() > 0) {
            collector.emit(new Values(tuple.getString(0)));
        }
    }
}
  

Поэтому, если collector.emit не вызывается, я думаю, что кортеж из spout не удался, потому что я вижу из пользовательского интерфейса storm, что почти все кортежи из spout не удалось.

В этом случае, где я должен вызвать ‘ack’, чтобы spout не рассматривал его как неудачный кортеж?

Ответ №1:

То, что вы делаете, соответствует логике, которую вы реализуете. Вам не нужно явно вызывать ack() . При использовании BaseBasicBolt каждый кортеж проверяется после execute() метода by BasicBoltExecutor . Для неудачных кортежей вам следует проверить наличие исключений. Также попробуйте просмотреть пользовательский интерфейс Storm на предмет аномалий в кортеже, отправленном / выполненном / сбойном для каждого носика и болта.

Ответ №2:

Когда у вас есть BaseBasicBolt — подтверждение выполняется за вас, даже если вы ничего не излучаете.

Экземпляр BaseBasicBolt выполняется в BasicBoltExecutor, чей метод execute() показан ниже:

 public void execute(Tuple input) {
     _collector.setContext(input);
     try {
         _bolt.execute(input, _collector);
         _collector.getOutputter().ack(input);
     } catch(FailedException e) {
         if(e instanceof ReportedFailedException) {
             _collector.reportError(e);
         }
         _collector.getOutputter().fail(input);
     }
}
  

Итак, чтобы остановить обработку кортежа, просто не выполняйте, после вызова execute он будет подтвержден. Поскольку больше нет болтов для запуска, будет вызван обратный вызов ack в spout

Надеюсь, это ответит на ваши вопросы