#java #apache-kafka #spring-data #spring-kafka
#java #apache-kafka #spring-данные #spring-kafka
Вопрос:
Я работаю над приложением, которое использует Kafka для получения сообщений из нескольких тем, сохраняя данные по мере их поступления.
С этой целью я использую класс @Service с несколькими методами, аннотированными @KafkaListener . Рассмотрим это:
@Transactional
@KafkaListener(topics = MyFirstMessage.TOPIC, autoStartup = "false", containerFactory = "myFirstKafkaListenerContainerFactory")
public void handleMyFirstMessage(ConsumerRecord<String, MyFirstMessage> record, Acknowledgment acknowledgment) throws Exception {
MyFirstMessage message = consume(record, acknowledgment);
try {
doHandle(record.key(), message);
} catch (Exception e) {
TransactionInterceptor.currentTransactionStatus().setRollbackOnly();
} finally {
acknowledgment.acknowledge();
}
}
@Transactional
@KafkaListener(topics = MySecondMessage.TOPIC, autoStartup = "false", containerFactory = "mySecondKafkaListenerContainerFactory")
public void handleMySecondMessage(ConsumerRecord<String, MySecondMessage> record, Acknowledgment acknowledgment) throws Exception {
MySecondMessage message = consume(record, acknowledgment);
try {
doHandle(record.key(), message);
} catch (Exception e) {
TransactionInterceptor.currentTransactionStatus().setRollbackOnly();
} finally {
acknowledgment.acknowledge();
}
}
Пожалуйста, не обращайте внимания на информацию о setRollbackOnly, это не имеет отношения к этому вопросу.
Важно то, что методы doHandle() в каждом прослушивателе выполняют вставки в таблицу, которые иногда терпят неудачу, потому что автоматически сгенерированные ключи оказываются неуникальными после завершения окончательной фиксации.
Происходит то, что каждый метод doHandle() будет увеличивать ключевой столбец в своих собственных небольших транзакциях, и только один из них «выиграет» эту гонку. Другой завершится ошибкой во время фиксации с нарушением неуникального ограничения.
Как лучше всего справиться с этим? Как мне «синхронизировать» транзакции, чтобы они выполнялись как жемчужины в строке вместо всех сразу?
Я подумываю об использовании какого-то семафора или блокировки для сериализации вещей, но это пахнет как решение со многими подводными камнями. Если бы существовал общий шаблон или структура, помогающие решить эту проблему, мне было бы гораздо удобнее ее реализовать.
Ответ №1:
Смотрите документацию.
Использование @Transactional
для DB и a KafkaTransactionManager
в контейнере слушателя аналогично использованию a ChainedKafkaTransactionManager
(настроенного с обоими TM) в контейнере. Передача данных в БД фиксируется, за ней следует Kafka, когда слушатель завершает работу в обычном режиме.
Когда слушатель выдает исключение, обе транзакции откатываются в том же порядке.
Это setRollbackOnly
определенно относится к этому вопросу, поскольку при этом вы не откатываете транзакцию kafka.
Комментарии:
1. Привет, Гэри, я надеялся, что ты заметишь этот вопрос 🙂 Спасибо за отзыв, но я думаю, что здесь мы говорим не об одном и том же. Два аннотированных метода вызываются одновременно (я предполагаю, что потребителями в нескольких потоках), что приводит к запуску двух транзакций, что, в свою очередь, приводит к сбою одной из них в конце, когда происходит фиксация. Если spring будет вызывать методы прослушивателя по одному, этого не произойдет.
2. Что касается setRollback, это сделано намеренно. Исключение полностью «обрабатывается», если хотите, путем отката DB tx (без повторной попытки Kafka или чего-либо подобного). Мы отправляем подтверждение Kafka в блоке finally, чтобы указать, что мы хотим перейти к следующему сообщению.
3. Извините за отключение; мое предложение состояло в том, чтобы разрешить второй сбой и откат, чтобы он был повторно доставлен. Нет; два контейнера независимы и выполняются в отдельных потоках, поэтому Spring не может вызывать обоих слушателей в одном потоке с такой конфигурацией. Одним из вариантов было бы использовать один метод для получения необработанных данных (
ByteArrayDeserializer
) и подписки на обе темы, а затем преобразовать вMy*Message
объекты внутри метода. Таким образом, вы получите только один поток.4. Я тоже думал об этом, но это совсем не кажется оптимальным. Но если я позволю исключению DataIntegrityViolationException распространяться из прослушивателя, тогда db tx откатится, и вся операция перезапустится. И поскольку мое уникальное ограничение не будет нарушено во второй раз, это означает, что оно будет выполнено при первой повторной попытке — и решит мою проблему. Это что-то вроде того, что вы предложили, поэтому я собираюсь принять этот ответ. Еще раз спасибо за то, что всегда помогал здесь, в Stackoverflow, Гэри — очень признателен.