PayloadTypeRouter напрямую отправляет в трансформатор без промежуточного канала?

#spring #spring-boot #spring-integration #messaging #enterprise-integration

#spring #spring-boot #spring-интеграция #обмен сообщениями #корпоративная интеграция

Вопрос:

Мне нужно использовать a PayloadTypeRouter и хочу отправить маршрутизируемое сообщение непосредственно в a Transformer , Filter или ServiceActivator . Все должно быть настроено с помощью Kotlin DSL (или Java DSL).

В настоящее время симпатичная часть кода выглядит так:

     @Bean
    fun routeAzureC2DMessage() = integrationFlow {
        channel(IoTHubChannelNames.IOTHUB_TO_DEVICE_CHANNEL)
        route<IotHubC2DRequestMessage<IotHubC2DRequest>> {
            when (it.payload) {
                is IotHubDesiredPropertyUpdate -> IoTHubChannelNames.DESIRED_PROPERTY_UPDATE
                is IotHubMessageToDevice -> IoTHubChannelNames.MESSAGE_TO_DEVICE
            }
        }
    }
 

а затем продолжается (одна сторона маршрутизации)

     @Bean
    fun processMessageToDevice() = integrationFlow {
        channel(IoTHubChannelNames.MESSAGE_TO_DEVICE)
        filter(StructureFilter())
        transform(MessageTransformer())
        channel(SharedChannelNames.CLOUD2DEVICE)
    }
 

Я хотел бы избавиться от ненужного канала IoTHubChannelNames.MESSAGE_TO_DEVICE . Я попробовал несколько подходов, и в другой части проекта я понял что-то вроде этого (Java DSL)

 IntegrationFlows
            .from(channelName)
            .route({ message: IotHubMessage -> message.javaClass }) { router: RouterSpec<Class<*>?, MethodInvokingRouter?> ->
                router
                    .subFlowMapping(DeviceToCloudMessage::class.java) {
                        it.handle(gateway, "handleD2CMessage")
                    }
                    .subFlowMapping(DeviceTwinUpdateMessage::class.java) {
                        it.handle(gateway, "handleDeviceTwinReportedProperty")
                    }
            }
            .get()
 

Являются subFlowMapping ли s единственным способом избавиться от промежуточных каналов? Я хотел бы решение, в котором я все еще могу использовать when (it.payload) , а затем вместо имени канала / канала может возвращать новую integrationFlow или какую-либо другую форму определения потока.

Ответ №1:

Единственное решение на данный момент действительно через API:

 inline fun <reified P, T> route(
        crossinline function: (P) -> T,
        crossinline configurer: KotlinRouterSpec<T, MethodInvokingRouter>.() -> Unit) {
 

То, что вы спрашиваете с этим when(...) is синтаксисом, на данный момент не поддерживается.

Не стесняйтесь поднимать проблему GH по этому вопросу и делиться как можно более подробной информацией о том, что это такое с точки зрения Kotlin и как это можно использовать в Spring Integration DSL.

Обновить

С другой стороны, это не так уж плохо с текущей поддержкой Kotlin:

             route<Int, Boolean>({ it % 2 == 0 }) {
                subFlowMapping(true) { handle<Int> { p, _ -> p * 2 } }
                subFlowMapping(false) { handle<Int> { p, _ -> p * 3 } }
            }
 

Итак, аргументом route() метода является a when() и subFlowMapping() is -> выводится в качестве integrationFlow результата построения. Итак, вероятно, мы не будем использовать Kotlin when() , который не даст нам слишком большого выигрыша, если мы не проиграем subFlowMapping в пользу -> operator …

ОБНОВЛЕНИЕ 2

После дополнительных размышлений об этом и поиска возможного решения я должен отозвать свой запрос на when() запрос функции такого рода.

Основная проблема в том, что IntegrationFlow вместе со всей его конфигурацией она должна быть зарегистрирована в контексте приложения заранее, перед использованием. То, что вы запрашиваете с when() помощью функции in the original router, — это не то, что фреймворк может обнаружить и обработать для вас. Эта функция не является частью фреймворка, чтобы брать на себя ответственность за полученный результат.

Ну, мы могли бы проверить IntegrationFlow возврат, чтобы принять решение о том, как вызывать, но нет никакой гарантии, что поток, который вы собираетесь вернуть из этой функции, является зарегистрированным компонентом. И когда мы действительно регистрируем его и пытаемся использовать в такой функции, это не отличается от того, что мы имеем до сих пор с каналами, возвращающимися из этой функции, и их отображением где-то в каком-то потоковом компоненте.

Мы можем автоматически зарегистрироваться IntegrationFlow как компонент в некоторых разработанных для этого инструкциях, например subFlowMapping() . В любом случае это делается только один раз, на этапе настройки. Но это не так хорошо делать, когда код конечного пользователя возвращает потоки во время выполнения. Лучше возвращать каналы или некоторые другие ключи, где у нас есть отображение для существующих потоков.

Я лично предпочитаю не игнорировать MessageChannel абстракцию и использовать ее, когда мне нужно распределить логику между разными потоками. Код в одном потоке выглядит более чистым, когда он линейный и представляет собой единую логическую единицу работы. Однако другие потоки могут быть повторно использованы в другой логике. Мне нужно было бы только указать на входные каналы этих потоков из других мест!

Тем не менее, мой главный момент: мы должны зарегистрироваться и IntegrationFlow , прежде чем мы собираемся отправить ему сообщение.

Комментарии:

1. Спасибо! Важной особенностью (по крайней мере, для меня) kotlin when() является то, что он предоставляет вам «проверки исчерпываемости времени компиляции». Это просто означает: если я введу новый подтип сообщения в моем первом примере выше, я получу ошибку времени компиляции против реализация subflow, в которой ошибка будет только во время выполнения, вошла в some errorChannel . Поэтому я думаю, что все еще может быть полезно иметь это в Kotlin DSL.

2. Не уверен, что вы имеете в виду, если мы действительно используем общий тип (время компиляции) из вывода функции маршрута в эти вложенные приложения.

3. Смотрите ОБНОВЛЕНИЕ 2. -1 Спасибо за ваш запрос.