#apache-storm
#apache-storm
Вопрос:
Я пытаюсь понять состояние топологии в случае тайм-аута обработки кортежа (не в режиме Trident) Предположим, что во время обработки кортежа в некотором bolt был достигнут порог тайм-аута. В этом случае носик снова отправляет начальный кортеж (с тем же идентификатором сообщения, насколько я понимаю). Теперь допустим, что Bolt завершает обработку кортежа, выдает и подтверждает кортеж. В этом сценарии :
- Будет ли неудачный кортеж по-прежнему обрабатываться топологией, даже если поток выдал новый начальный кортеж?
- Если да, то как будет выглядеть база данных кортежей acker (поскольку создается новая база данных с тем же начальным идентификатором кортежа), что произойдет с предыдущей исходной базой данных?
- что произойдет, когда acker получит подтверждение и отправит с идентификаторами привязки предыдущего DAG ?
Ответ №1:
1: Да, сбой кортежа продолжается. Причина этого в том, что было бы слишком дорого пытаться остановить продолжение неудачного кортежа, поскольку носик должен был бы сообщить всем болтам о сбое.
2: Я думаю, что здесь есть небольшое недоразумение. Когда spout отправляет кортеж, идентификатор сообщения не тот, который Storm использует для отслеживания этого кортежа базы данных / дерева внутри. Вместо этого исполнитель spout генерирует случайный идентификатор (назовите его rootId
) и локально сохраняет отображение rootId -> messageId
. Идентификатор сообщения никогда не покидает исполнителя spout и не распространяется на болты.
Когда исполнитель spout отправляет кортеж дальше, он включает rootId
. Это то rootId
, что используется acker и bolts для идентификации дерева кортежей.
Наконец, когда дерево полностью подтверждено или кортеж завершается с ошибкой, исполнителю spout сообщается, что соответствующий rootId
результат был успешным или неудачным, и он ищет оригинал messageId
в своем локальном отображении.
Поскольку новый emit с тем же messageId
самым получает new rootId
, нет никакой связи между неудачными и новыми кортежами. Storm считает, что они полностью разделены.
Я немного упростил вышесказанное для ясности, чтобы обрабатывать поток, испускаемый несколькими болтами, задействован еще один набор случайных идентификаторов ( anchorId
). Концептуально вы можете представить ситуацию, когда у вас
spout -> bolt1
-> bolt2
обрабатывается так, как если бы топология была
spout -> splitterBolt -> bolt1
-> bolt2
3: Допустим, время ожидания вашего кортежа истекло. Исполнителю spout было сообщено, что rootId
произошел сбой. Когда это происходит, вызывается исполнитель spout spout.fail(msgId)
, а затем удаляет отображение на rootId -> messageId
карте.
Когда acker получает подтверждение, он может отправить подтверждение в spout, если дерево полностью подтверждено. Когда spout получает подтверждение, у него нет ничего, соответствующего rootId
сохраненному, поэтому подтверждение игнорируется.
Если вам интересно взглянуть на код, его можно найти по адресу https://github.com/apache/storm/blob/b48e10559b65e834884d59887b30fc86d2988c20/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java#L109. Вызывается rootId -> messageId
сопоставление pending
.