Вопросы о таймауте кортежа Apache Storm

#apache-storm

#apache-storm

Вопрос:

Я пытаюсь понять состояние топологии в случае тайм-аута обработки кортежа (не в режиме Trident) Предположим, что во время обработки кортежа в некотором bolt был достигнут порог тайм-аута. В этом случае носик снова отправляет начальный кортеж (с тем же идентификатором сообщения, насколько я понимаю). Теперь допустим, что Bolt завершает обработку кортежа, выдает и подтверждает кортеж. В этом сценарии :

  1. Будет ли неудачный кортеж по-прежнему обрабатываться топологией, даже если поток выдал новый начальный кортеж?
  2. Если да, то как будет выглядеть база данных кортежей acker (поскольку создается новая база данных с тем же начальным идентификатором кортежа), что произойдет с предыдущей исходной базой данных?
  3. что произойдет, когда acker получит подтверждение и отправит с идентификаторами привязки предыдущего DAG ?

Ответ №1:

1: Да, сбой кортежа продолжается. Причина этого в том, что было бы слишком дорого пытаться остановить продолжение неудачного кортежа, поскольку носик должен был бы сообщить всем болтам о сбое.

2: Я думаю, что здесь есть небольшое недоразумение. Когда spout отправляет кортеж, идентификатор сообщения не тот, который Storm использует для отслеживания этого кортежа базы данных / дерева внутри. Вместо этого исполнитель spout генерирует случайный идентификатор (назовите его rootId ) и локально сохраняет отображение rootId -> messageId . Идентификатор сообщения никогда не покидает исполнителя spout и не распространяется на болты.

Когда исполнитель spout отправляет кортеж дальше, он включает rootId . Это то rootId , что используется acker и bolts для идентификации дерева кортежей.

Наконец, когда дерево полностью подтверждено или кортеж завершается с ошибкой, исполнителю spout сообщается, что соответствующий rootId результат был успешным или неудачным, и он ищет оригинал messageId в своем локальном отображении.

Поскольку новый emit с тем же messageId самым получает new rootId , нет никакой связи между неудачными и новыми кортежами. Storm считает, что они полностью разделены.

Я немного упростил вышесказанное для ясности, чтобы обрабатывать поток, испускаемый несколькими болтами, задействован еще один набор случайных идентификаторов ( anchorId ). Концептуально вы можете представить ситуацию, когда у вас

 spout -> bolt1
      -> bolt2
  

обрабатывается так, как если бы топология была

 spout -> splitterBolt -> bolt1
                      -> bolt2
  

3: Допустим, время ожидания вашего кортежа истекло. Исполнителю spout было сообщено, что rootId произошел сбой. Когда это происходит, вызывается исполнитель spout spout.fail(msgId) , а затем удаляет отображение на rootId -> messageId карте.

Когда acker получает подтверждение, он может отправить подтверждение в spout, если дерево полностью подтверждено. Когда spout получает подтверждение, у него нет ничего, соответствующего rootId сохраненному, поэтому подтверждение игнорируется.

Если вам интересно взглянуть на код, его можно найти по адресу https://github.com/apache/storm/blob/b48e10559b65e834884d59887b30fc86d2988c20/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java#L109. Вызывается rootId -> messageId сопоставление pending .