#transactions #spring-integration
Вопрос:
У меня есть следующий поток: входящий потоковый адаптер SFTP считывает файл, который является JSON. Затем:
- синхронизация транзакций регистрируется непосредственно на этом адаптере, чтобы переименовать входной файл при фиксации транзакции (потому что я с этим покончил).
- прочитанный файл преобразуется в DTO с
json-to-object
помощью и передается трансформатору, который преобразует его во что-то другое - это «что-то еще» должно быть переведено на другой канал, но только после фиксации транзакции
В этом и заключается суть потока:
lt;int-sftp:inbound-streaming-channel-adapter session-factory="mySftpSessionFactory" channel="channel1" filename-pattern="*.JSON" remote-directory-expression="'/mypath'" max-fetch-size="10"gt; lt;int:poller max-messages-per-poll="1" fixed-delay="1" time-unit="MINUTES" error-channel="unexpectedErrorChannel"gt; lt;int:transactional synchronization-factory="synchFactory1" transaction-manager="transactionManager" /gt; lt;/int:pollergt; lt;/int-sftp:inbound-streaming-channel-adaptergt; lt;int:transaction-synchronization-factory id="synchFactory1"gt; lt;int:after-commit channel="onCommitRemoteFileRenameChannel" /gt; lt;/int:transaction-synchronization-factorygt; lt;int:channel id="channel1" /gt; lt;int:chain input-channel="channel1" output-channel="nullChannel"gt; lt;int:stream-transformer charset="UTF-8" /gt; lt;int:json-to-object-transformer type="com.example.MyDto" /gt; lt;int:transformer ref="myTransformer"gt; lt;int:transactional synchronization-factory="synchFactory2" transaction-manager="transactionManager" /gt; lt;/int:transformergt; lt;/int:chaingt; lt;int:transaction-synchronization-factory id="synchFactory2"gt; lt;int:after-commit channel="onCommitSecondFlowChannel" /gt; lt;/int:transaction-synchronization-factorygt; lt;int:channel id="onCommitRemoteFileRenameChannel" /gt; lt;int-sftp:outbound-gateway session-factory="mySftpSessionFactory" request-channel="onCommitRemoteFileRenameChannel" command="mv" expression="headers[T(org.springframework.integration.file.FileHeaders).REMOTE_DIRECTORY].concat('/'.concat(headers[T(org.springframework.integration.file.FileHeaders).REMOTE_FILE]))" rename-expression="headers[T(org.springframework.integration.file.FileHeaders).REMOTE_DIRECTORY].concat('/'.concat(headers[T(org.springframework.integration.file.FileHeaders).REMOTE_FILE].concat('.done')))" requires-reply="false" reply-channel="nullChannel" /gt;
Второе lt;int:transactionalgt;
, объявленное внутри трансформатора, предназначено не для фактического открытия новой транзакции (должно ПОТРЕБОВАТЬСЯ распространение по умолчанию, поэтому оно должно присоединиться к основной транзакции, открытой опрашивающим), а скорее для добавления второй синхронизации, которая запускает сообщение, возвращенное трансформатором, в канал, названный onCommitSecondFlowChannel
для дальнейшей обработки.
Однако, похоже, это работает неправильно: в то время как удаленное переименование файла выполняется правильно, вторая синхронизация не применяется, так как я не получаю никаких сообщений, полученных от преобразователя в onCommitSecondFlowChannel
. Я попытался отладить и вижу, что оба lt;int:transacationl-synchronization-factorygt;
s анализируются, но метод второго ExpressionEvaluatingTransactionSynchronizationProcessor.processAfterCommit(IntegrationResourceHolder)
никогда не выполняется.
Есть ли способ получить этот результат без объявления активатора службы и шлюза после трансформатора, чтобы разместить сообщение на этом втором канале потока?
Ответ №1:
Я думаю, что вы чрезмерно усложняете свой поток с помощью этой дополнительной синхронизации tx. На самом деле он не был предназначен для того, чтобы делать что-то в середине потока. Это скорее какой-то глобальный крючок с того момента, когда мы начинаем транзакцию. Поскольку вы этого не делаете в этом трансформаторе, следовательно, он не может быть применен.
Мы можем пересмотреть его в будущем, так как я понимаю ваши чувства, и действительно логично, что такая синхронизация tx должна присоединиться к существующему TX, если на данный момент не может начать новую.
В любом случае, я бы не стал этого делать, так как это делает поток не интуитивно понятным.
Я вижу, у вас есть это lt;int:chain input-channel="channel1" output-channel="nullChannel"gt;
— nullChannel
в качестве вывода после этого lt;transformergt;
.
Итак, как насчет того, чтобы lt;transactionalgt;
полностью удалить это и разместить onCommitSecondFlowChannel
в качестве вывода для этого lt;chaingt;
? В любом случае, похоже, что это конец вашего потока, поэтому нет необходимости откладывать что-то до конца транзакции.
Комментарии:
1. Спасибо Артему за вашу проницательность. Ну, первая версия потока действительно была той, которую вы описали, но нисходящий поток включает в себя несколько отдельных транзакций:
onCommitSecondFlowChannel
включение вывода цепочки приводит к тому, что «tx был настроен только на откат», если один такой tx откатывается, потому что опрашивающий (внешний) tx является совместным. Поэтому я попытался с помощью диспетчера преодолеть границу tx для опроса, но затем у меня возникли оптимистичные исключения блокировки, потому что оба потока изменяют одни и те же объекты слишком близко друг от друга. Отсюда и идея сериализации потоков (хорошо, если второй выполняется только при первой фиксации).2. Затем я добавлю активатор службы после трансформатора, который вызывает шлюз во второй поток после фиксации транзакции: это некоторый код, которого я предпочел бы избежать, но, похоже, это необходимо.