#spring-boot #spring-kafka #spring-el
#spring-boot #spring-кафка #spring-el
Вопрос:
У меня есть это приложение для загрузки Spring.properties
list1=valueA,valueB
list2=valueC
list3=valueD,valueE
topics=list1,list2,list3
То, что я пытаюсь сделать, это использовать в topics
элементе @KafkaListener
аннотации значения свойства values of topics
Используя выражение
@KafkaListener(topics={"#{'${topics}'.split(',')}"})
Я получаю list1
, list2
, list3
в виде разделенной строки
Как я могу выполнить цикл по этому списку, чтобы получить valueA
, valueB
, valueC
, valueD
, valueE
?
Редактировать: я должен проанализировать topics
свойства, чтобы @KafkaListener регистрировался для получения сообщений из тем valueA
, valueB
, и т.д. valueC
Я читал, что можно вызвать метод таким образом:
@KafkaListener(topics="#parse(${topics})")
Итак, я написал этот метод:
public String[] parse(String s) {
ExpressionParser parser = new SpelExpressionParser();
return Arrays.stream(s.split(",").map(key -> (String)(parser.parse(key).getValue())).toArray(String[]::new);
}
Но parse
метод не вызывается
Итак, я попытался напрямую сделать это в аннотациях таким образом:
@KafkaListener(topics="#{Arrays.stream('${topics}'.split(',')).map(key->${key}).toArray(String[]::new)}")
Но также это решение дает мне ошибки.
Редактировать 2:
Изменение таким образом вызывается метод
@KafkaListener(topics="parse()")
@Bean
public String[] parse(String s) {
...
}
Проблема в том, как получить реквизиты «темы» внутри метода
Комментарии:
1. Непонятно, что вы имеете в виду;
topics
принимаетString[]
.2. ОК. Я пытаюсь сделать это :
@KafkaListener(topics="#{parse($(topics))}")
. Метод parse возвращаетArrays.stream(s.split(",")).map(p -> "${p}").toArray(String[]::new);
. Я знаю, что${p}
это не имеет смысла в map, но метод, однако, не вызывается.3. Это все еще не ясно; отредактируйте вопрос, чтобы точно показать, что вы пытаетесь сделать, и в чем проблема.
4. Я отредактировал вопрос. Надеюсь, теперь понятно.
Ответ №1:
Вы не можете вызывать произвольные методы подобным образом; вам нужно ссылаться на компонент @someBean.parse(...)
; использование #parse
требует регистрации статического метода как функции.
Тем не менее, это работает для меня и намного проще:
list1=valueA,valueB
list2=valueC
list3=valueD,valueE
topics=${list1},${list2},${list3}
и
@KafkaListener(id = "so64390079", topics = "#{'${topics}'.split(',')}")
Редактировать
Если вы не можете использовать заполнители topics
, это работает…
@SpringBootApplication
public class So64390079Application {
public static void main(String[] args) {
SpringApplication.run(So64390079Application.class, args);
}
@KafkaListener(id = "so64390079", topics = "#{@parser.parse('${topics}')}")
public void listen(String in) {
System.out.println(in);
}
}
@Component
class Parser implements EnvironmentAware {
private Environment environmment;
@Override
public void setEnvironment(Environment environment) {
this.environmment = environment;
}
public String[] parse(String[] topics) {
StringBuilder sb = new StringBuilder();
for (String topic : topics) {
sb.append(this.environmment.getProperty(topic));
sb.append(',');
}
return StringUtils.commaDelimitedListToStringArray(sb.toString().substring(0, sb.length() - 1));
}
}
Комментарии:
1. Я думал сделать это, но требования заключаются в том, что свойства тем должны быть в
topics=list1,list2,list3
формате.2. Кажется, работает, но он вызывается два раза, а второй раз выдает исключение StringIndexOutOfBoundException, потому что topics params пуст. Почему вызывается два раза?
3. Я понятия не имею; он вызывается только один раз в моем тестовом приложении; попробуйте установить точку останова, чтобы посмотреть, дает ли она какие-либо подсказки. Вы всегда можете добавить код для защиты от этого. (
if (sb.length() > 0
).4. Проверка в режиме отладки, в первый раз я получаю правильный вывод, но во второй раз я получаю ошибку. Очень странно.