Несколько фабрик синхронизации транзакций в одном потоке

#transactions #spring-integration

Вопрос:

У меня есть следующий поток: входящий потоковый адаптер SFTP считывает файл, который является JSON. Затем:

  • синхронизация транзакций регистрируется непосредственно на этом адаптере, чтобы переименовать входной файл при фиксации транзакции (потому что я с этим покончил).
  • прочитанный файл преобразуется в DTO с json-to-object помощью и передается трансформатору, который преобразует его во что-то другое
  • это «что-то еще» должно быть переведено на другой канал, но только после фиксации транзакции

В этом и заключается суть потока:

 lt;int-sftp:inbound-streaming-channel-adapter  session-factory="mySftpSessionFactory"  channel="channel1"  filename-pattern="*.JSON"  remote-directory-expression="'/mypath'"  max-fetch-size="10"gt;  lt;int:poller max-messages-per-poll="1" fixed-delay="1"  time-unit="MINUTES" error-channel="unexpectedErrorChannel"gt;  lt;int:transactional  synchronization-factory="synchFactory1"  transaction-manager="transactionManager" /gt;  lt;/int:pollergt; lt;/int-sftp:inbound-streaming-channel-adaptergt;  lt;int:transaction-synchronization-factory id="synchFactory1"gt;  lt;int:after-commit channel="onCommitRemoteFileRenameChannel" /gt; lt;/int:transaction-synchronization-factorygt;  lt;int:channel id="channel1" /gt;  lt;int:chain input-channel="channel1" output-channel="nullChannel"gt;  lt;int:stream-transformer charset="UTF-8" /gt;  lt;int:json-to-object-transformer type="com.example.MyDto" /gt;  lt;int:transformer ref="myTransformer"gt;  lt;int:transactional synchronization-factory="synchFactory2"  transaction-manager="transactionManager" /gt;  lt;/int:transformergt; lt;/int:chaingt;  lt;int:transaction-synchronization-factory id="synchFactory2"gt;  lt;int:after-commit channel="onCommitSecondFlowChannel" /gt; lt;/int:transaction-synchronization-factorygt;  lt;int:channel id="onCommitRemoteFileRenameChannel" /gt; lt;int-sftp:outbound-gateway  session-factory="mySftpSessionFactory"  request-channel="onCommitRemoteFileRenameChannel"   command="mv"  expression="headers[T(org.springframework.integration.file.FileHeaders).REMOTE_DIRECTORY].concat('/'.concat(headers[T(org.springframework.integration.file.FileHeaders).REMOTE_FILE]))"  rename-expression="headers[T(org.springframework.integration.file.FileHeaders).REMOTE_DIRECTORY].concat('/'.concat(headers[T(org.springframework.integration.file.FileHeaders).REMOTE_FILE].concat('.done')))"  requires-reply="false"  reply-channel="nullChannel" /gt;  

Второе lt;int:transactionalgt; , объявленное внутри трансформатора, предназначено не для фактического открытия новой транзакции (должно ПОТРЕБОВАТЬСЯ распространение по умолчанию, поэтому оно должно присоединиться к основной транзакции, открытой опрашивающим), а скорее для добавления второй синхронизации, которая запускает сообщение, возвращенное трансформатором, в канал, названный onCommitSecondFlowChannel для дальнейшей обработки.

Однако, похоже, это работает неправильно: в то время как удаленное переименование файла выполняется правильно, вторая синхронизация не применяется, так как я не получаю никаких сообщений, полученных от преобразователя в onCommitSecondFlowChannel . Я попытался отладить и вижу, что оба lt;int:transacationl-synchronization-factorygt; s анализируются, но метод второго ExpressionEvaluatingTransactionSynchronizationProcessor.processAfterCommit(IntegrationResourceHolder) никогда не выполняется.

Есть ли способ получить этот результат без объявления активатора службы и шлюза после трансформатора, чтобы разместить сообщение на этом втором канале потока?

Ответ №1:

Я думаю, что вы чрезмерно усложняете свой поток с помощью этой дополнительной синхронизации tx. На самом деле он не был предназначен для того, чтобы делать что-то в середине потока. Это скорее какой-то глобальный крючок с того момента, когда мы начинаем транзакцию. Поскольку вы этого не делаете в этом трансформаторе, следовательно, он не может быть применен.

Мы можем пересмотреть его в будущем, так как я понимаю ваши чувства, и действительно логично, что такая синхронизация tx должна присоединиться к существующему TX, если на данный момент не может начать новую.

В любом случае, я бы не стал этого делать, так как это делает поток не интуитивно понятным.

Я вижу, у вас есть это lt;int:chain input-channel="channel1" output-channel="nullChannel"gt; nullChannel в качестве вывода после этого lt;transformergt; .

Итак, как насчет того, чтобы lt;transactionalgt; полностью удалить это и разместить onCommitSecondFlowChannel в качестве вывода для этого lt;chaingt; ? В любом случае, похоже, что это конец вашего потока, поэтому нет необходимости откладывать что-то до конца транзакции.

Комментарии:

1. Спасибо Артему за вашу проницательность. Ну, первая версия потока действительно была той, которую вы описали, но нисходящий поток включает в себя несколько отдельных транзакций: onCommitSecondFlowChannel включение вывода цепочки приводит к тому, что «tx был настроен только на откат», если один такой tx откатывается, потому что опрашивающий (внешний) tx является совместным. Поэтому я попытался с помощью диспетчера преодолеть границу tx для опроса, но затем у меня возникли оптимистичные исключения блокировки, потому что оба потока изменяют одни и те же объекты слишком близко друг от друга. Отсюда и идея сериализации потоков (хорошо, если второй выполняется только при первой фиксации).

2. Затем я добавлю активатор службы после трансформатора, который вызывает шлюз во второй поток после фиксации транзакции: это некоторый код, которого я предпочел бы избежать, но, похоже, это необходимо.