#apache-storm
#apache-storm
Вопрос:
Например, у меня есть топология, состоящая из 1 носика и 4 болтов
spout A -> bolt B -> bolt C -> bolt E
-> bolt D
Только если некоторый условный оператор в болте B имеет значение true, он передает кортеж болту C и болту D.
И только если некоторый условный оператор в болте C имеет значение true, он передает кортеж в болт E.
Таким образом, один кортеж может достигать только болта B или (болта C и D).
Я использую BaseBasicBolt, который, насколько мне известно, автоматически активируется после вызова collector.emit.
Например, метод execute в bolt B выглядит следующим образом
public class boltB extends BaseBasicBolt {
public void execute(Tuple tuple, BasicOutputCollector collector) {
...some logic goes here
if (response.getCount() > 0) {
collector.emit(new Values(tuple.getString(0)));
}
}
}
Поэтому, если collector.emit не вызывается, я думаю, что кортеж из spout не удался, потому что я вижу из пользовательского интерфейса storm, что почти все кортежи из spout не удалось.
В этом случае, где я должен вызвать ‘ack’, чтобы spout не рассматривал его как неудачный кортеж?
Ответ №1:
То, что вы делаете, соответствует логике, которую вы реализуете. Вам не нужно явно вызывать ack()
. При использовании BaseBasicBolt
каждый кортеж проверяется после execute()
метода by BasicBoltExecutor
. Для неудачных кортежей вам следует проверить наличие исключений. Также попробуйте просмотреть пользовательский интерфейс Storm на предмет аномалий в кортеже, отправленном / выполненном / сбойном для каждого носика и болта.
Ответ №2:
Когда у вас есть BaseBasicBolt — подтверждение выполняется за вас, даже если вы ничего не излучаете.
Экземпляр BaseBasicBolt выполняется в BasicBoltExecutor, чей метод execute() показан ниже:
public void execute(Tuple input) {
_collector.setContext(input);
try {
_bolt.execute(input, _collector);
_collector.getOutputter().ack(input);
} catch(FailedException e) {
if(e instanceof ReportedFailedException) {
_collector.reportError(e);
}
_collector.getOutputter().fail(input);
}
}
Итак, чтобы остановить обработку кортежа, просто не выполняйте, после вызова execute он будет подтвержден. Поскольку больше нет болтов для запуска, будет вызван обратный вызов ack в spout
Надеюсь, это ответит на ваши вопросы