Axon — SubscribingEvent против процессора TrackingEvent

#java #domain-driven-design #cqrs #event-sourcing #axon

#java #дизайн, управляемый доменом #cqrs #источник событий #axon

Вопрос:

В настоящее время я использую SubscribingEvent processor в Axon. Используя это, все выполняется в одном потоке (поскольку я хочу выполнить команду и применить события к проекции в одном потоке), чтобы убедиться, что либо все сохранено в DB, либо все откатывается.

Что произойдет, если мы используем TrackingEvent processor, в случае, если эта команда успешно выполнена в Aggregate и что события, которые генерирует Aggregate, сохраняются в БД, но приложение завершается сбоем непосредственно перед выполнением прогнозов (это означает, что не сохраняется в БД)? Будет ли приложение после перезапуска продолжать проецировать эти события?

В моем случае я выполняю проекцию при вызове REST, поэтому, я думаю, для меня имеет смысл использовать SubscribingEvent processor (так что либо все в порядке, либо ничего). В случае, если я использую процессор TrackingEvent, и приложение выходит из строя между сохранением и проектированием — у меня будет несогласованное состояние. И даже если проекция будет перезапущена при следующей загрузке (как я предполагаю), клиент снова отправит ту же команду (думая, что это сбой), но что произойдет в совокупности, если он получит ту же команду во второй раз?

Ответ №1:

позвольте мне попытаться дать вам некоторое представление об этом!

Что произойдет, если мы используем TrackingEvent processor, в случае, если эта команда успешно выполнена в Aggregate и что события, которые генерирует Aggregate, сохраняются в БД, но приложение завершается сбоем непосредственно перед выполнением прогнозов (это означает, что не сохраняется в БД)?

TrackingEventProcessor В Axon отслеживается, какие события он обработал, с помощью TrackingToken . Процессор отслеживания обновит только TrackingToken после вызова всех компонентов обработки событий (например, компонентов, обновляющих ваши прогнозы). Таким образом, при перезапуске в этот момент времени процессор отслеживания снова обработает событие. При этом он пытается быть в курсе всего вашего потока событий.

В случае, если я использую процессор TrackingEvent, и приложение выходит из строя между сохранением и проектированием — у меня будет несогласованное состояние.

Это зависит от того, как вы реализовали свое приложение. Похоже, что все довольно тесно связано, и при этом не учитывается тот факт, что вам придется иметь дело с возможной согласованностью. Я знаю, это легче сказать, чем сделать, но то, что ваш интерфейс не ожидает немедленного ответа при вводе команды, в конечном итоге упростит систему. Опять же, это сильно зависит от того, можете ли вы повлиять на точку зрения пользовательского интерфейса на выполнение действий; не уверен, возможно ли это в вашей ситуации.

И даже если проекция будет перезапущена при следующей загрузке (как я предполагаю), клиент снова отправит ту же команду (думая, что это сбой), но что произойдет в совокупности, если он получит ту же команду во второй раз?

Это полностью зависит от того, как вы реализовали свой Aggregate. Агрегат должен принимать решение, может ли команда быть выполнена или нет. Если частью этого процесса принятия решений является обеспечение того, чтобы предыдущая команда не была идентична текущей операции, то вам нужно будет учесть это в @CommandHandler аннотированных функциях.

Я надеюсь, что это даст вам некоторое представление, Боян, и не стесняйтесь отвечать на дополнительные вопросы, чтобы получить представление об этой части Axon.

Комментарии:

1. Спасибо. Теперь это понятнее. Просто еще одно уточнение, связанное с TrackingEventProcessor . Вы говорите, что обновление TrackingToken произойдет после вызова всех обработчиков событий. Это означает, что если у меня есть 2 проецирующих компонента, и компонент 1 успешно сохранил проекцию в db, и непосредственно перед тем, как второй компонент сохранит проекцию в db — компонент будет уничтожен. При перезапуске 1-й проецирующий компонент и 2-й снова получат одно и то же событие? Это означает, что мне нужно будет обрабатывать дедупликацию событий также в прогнозах (в данном случае компонент 1, поскольку компонент 2 его еще не обработал)?

2. Ах, извините за некоторую расплывчатость в этой области, позвольте мне уточнить. Axon выдаст все обработчики событий в рамках a TrackingEventProcessor в одной транзакции (используя UnitOfWork его возможность запускать и останавливать транзакцию). Таким образом, если вашему первому обработчику событий удастся обновить проекцию, а второму не удастся, вся транзакция будет откатана. Таким образом, вам не придется обрабатывать дедупликацию в этом пространстве.