Как создать канал очереди LIFO

#java #spring #kotlin #spring-integration

#java #spring #kotlin #spring-интеграция

Вопрос:

В настоящее время я использую канал очереди в моем потоке интеграции, но он использует способ извлечения FIFO. Есть ли способ изменить его на LIFO?

Кроме того, есть ли способ удалять сообщения из очереди на основе свойства?

Я подозреваю, что мне нужно будет использовать PriorityChannel для переключения с FIFO на LIFO, но я не могу понять, как это будет достигнуто.

 @Bean
    fun notificationChannel(): MessageChannel {
        return MessageChannels.queue().get()
    }
  

Например, мой канал очереди будет заполнен сообщениями, содержащими идентификатор пользователя. Поскольку меня интересует только последнее состояние сообщения с идентификатором пользователя, я хочу использовать LIFO и удалять все сообщения с тем же идентификатором пользователя, что и последнее сообщение.

Ответ №1:

Похоже, вам на самом деле не нужен LIFO, вам просто нужна последняя версия для данного условия.

Но, несмотря на это, Queue это довольно простой интерфейс; в нем всего несколько методов, и QueueChannel в нем используются poll() только offer() , size() и, ,.

Таким образом, должно быть просто создать пользовательскую очередь, например, на основе потокобезопасности ConcurrentHashMap<String, Message<?>> с ключом, являющимся вашим условием.

Комментарии:

1. Как мне поступить, чтобы более старые сообщения с тем же идентификатором пользователя в очереди были отброшены?

2. Если он основан на a Map , новые записи с тем же ключом будут заменены, и в нем будет только последняя.

3. Если вы хотите сохранить порядок для разных идентификаторов пользователей, вы можете использовать комбинацию a LinkedHashSet<String> для идентификаторов пользователей и Map<String, Message<?>> для сообщений. LinkedHashSet Сохраняется исходный порядок при замене элемента.

Ответ №2:

Канал очереди принимает Queue в качестве параметра конструктора public QueueChannel(Queue<Message<?>> queue) , который предоставляет spring-integration DSL public static QueueChannelSpec queue(Queue<Message<?>> queue) . Таким образом, вы можете использовать Collections.asLifoQueue(..) в качестве аргумента вышеуказанного фабричного метода, чтобы получить желаемое поведение.