#java #spring #apache-kafka #spring-kafka #spring-cloud-stream
#java #spring #apache-kafka #spring-kafka #spring-cloud-stream
Вопрос:
У меня есть рабочая настройка для потоков Spring Cloud Kafka с функциональным стилем программирования. Существует два варианта использования, которые настраиваются через application.properties
. Оба они работают по отдельности, но как только я активирую оба одновременно, я получаю ошибку сериализации для выходного потока второго варианта использования:
Exception in thread "ActivitiesAppId-05296224-5ea1-412a-aee4-1165870b5c75-StreamThread-1" org.apache.kafka.streams.errors.StreamsException:
Error encountered sending record to topic outputActivities for task 0_0 due to:
...
Caused by: org.apache.kafka.common.errors.SerializationException:
Can't serialize data [com.example.connector.model.Activity@497b37ff] for topic [outputActivities]
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException:
Incompatible types: declared root type ([simple type, class com.example.connector.model.Material]) vs com.example.connector.model.Activity
Последняя строка здесь важна, поскольку «объявленный корневой тип» относится к Material
классу, но не к Activity
классу, что, вероятно, является исходной ошибкой.
Опять же, когда я активирую только второй вариант использования перед запуском приложения, все работает нормально. Поэтому я предполагаю, что процессор «Material» каким-то образом вмешивается в процессор «Activities» (или его сериализатор), но я не знаю, когда и где.
Настройка
1.) пример использования: «Материалы»
- один входной поток -> преобразование -> один выходной поток
@Bean
public Function<KStream<String, MaterialRaw>, KStream<String, Material>> processMaterials() {...}
application.properties
spring.cloud.stream.kafka.streams.binder.functions.processMaterials.applicationId=MaterialsAppId
spring.cloud.stream.bindings.processMaterials-in-0.destination=inputMaterialsRaw
spring.cloud.stream.bindings.processMaterials-out-0.destination=outputMaterials
2.) пример использования: «Действия»
- два входных потока -> объединение -> один выходной поток
@Bean
public BiFunction<KStream<String, ActivityRaw>, KStream<String, Assignee>, KStream<String, Activity>> processActivities() {...}
application.properties
spring.cloud.stream.kafka.streams.binder.functions.processActivities.applicationId=ActivitiesAppId
spring.cloud.stream.bindings.processActivities-in-0.destination=inputActivitiesRaw
spring.cloud.stream.bindings.processActivities-in-1.destination=inputAssignees
spring.cloud.stream.bindings.processActivities-out-0.destination=outputActivities
Два процессора также определены как потоковая функция в application.properties
: spring.cloud.stream.function.definition=processActivities;processMaterials
Спасибо!
Обновление — Вот как я использую процессоры в коде:
Реализация
// Material model
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class MaterialRaw {
private String id;
private String name;
}
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class Material {
private String id;
private String name;
}
// Material processor
@Bean
public Function<KStream<String, MaterialRaw>, KStream<String, Material>> processMaterials() {
return materialsRawStream -> materialsRawStream .map((recordKey, materialRaw) -> {
// some transformation
final var newId = materialRaw.getId() "---foo";
final var newName = materialRaw.getName() "---bar";
final var material = new Material(newId, newName);
// output
return new KeyValue<>(recordKey, material);
};
}
// Activity model
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class ActivityRaw {
private String id;
private String name;
}
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class Assignee {
private String id;
private String assignedAt;
}
/**
* Combination of `ActivityRaw` and `Assignee`
*/
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class Activity {
private String id;
private Integer number;
private String assignedAt;
}
// Activity processor
@Bean
public BiFunction<KStream<String, ActivityRaw>, KStream<String, Assignee>, KStream<String, Activity>> processActivities() {
return (activitiesRawStream, assigneesStream) -> {
final var joinWindow = JoinWindows.of(Duration.ofDays(30));
final var streamJoined = StreamJoined.with(
Serdes.String(),
new JsonSerde<>(ActivityRaw.class),
new JsonSerde<>(Assignee.class)
);
final var joinedStream = activitiesRawStream.leftJoin(
assigneesStream,
new ActivityJoiner(),
joinWindow,
streamJoined
);
final var mappedStream = joinedStream.map((recordKey, activity) -> {
return new KeyValue<>(recordKey, activity);
});
return mappedStream;
};
}
Комментарии:
1. не могли бы вы поделиться MaterialRaw и Material model, чтобы эту проблему можно было воспроизвести локально?
2. @GovindaSakhare Сделано, но я думаю, что это как-то связано с процессорами / Spring, а не с самой моделью.
3. Любопытно. Как вы это исправили?
4. @GovindaSakhare «Готово» предназначалось для обновления сообщения SO с кодом, к сожалению, не для устранения проблемы.
5. @BennettDams Не могли бы вы опубликовать код для ActivityJoiner? Я думаю, что это отсутствует в приведенном выше фрагменте кода.
Ответ №1:
Оказывается, это проблема с тем, как связующее определяет Serde
типы, когда существует несколько функций с разными исходящими целевыми типами, один с Activity
, а другой с Material
в вашем случае. Нам нужно будет решить эту проблему в binder. Я создал проблему здесь.
В то же время, вы можете следовать этому обходному пути.
Создайте пользовательский Serde
класс, как показано ниже.
public class ActivitySerde extends JsonSerde<Activity> {}
Затем явно используйте это Serde
для исходящей вашей processActivities
функции с использованием конфигурации.
Например,,
spring.cloud.stream.kafka.streams.bindings.processActivities-out-0.producer.valueSerde=com.example.so65003575.ActivitySerde
Пожалуйста, измените пакет на соответствующий, если вы пытаетесь это обойти.
Вот еще один рекомендуемый подход. Если вы определяете компонент типа Serde
с целевым типом, который имеет приоритет, поскольку связующее будет выполнять сопоставление с KStream
типом. Следовательно, вы также можете сделать это, не определяя этот дополнительный класс в приведенном выше обходном пути.
@Bean
public Serde<Activity> activitySerde() {
return new JsonSerde(Activity.class);
}
Вот документы, в которых объясняются все эти детали.
Комментарии:
1. Это было оно! Поскольку я новичок в Kafka amp; Spring Cloud Stream: не могли бы вы сказать мне, делаю ли я что-то особенное? Я просто смущен тем, что это новая ошибка, поскольку я думал, что следовал основным принципам. Есть ли лучший / более распространенный способ сделать то, что я делаю?
2. Если вы строго полагаетесь на возможности вывода связующего для
Serde
типов, это ошибка. Однако, поскольку существуют способы решения этой проблемы, вы можете захотеть прибегнуть к этим обходным путям (я также обновляю ответ другим обходным путем при сбоеSerde
вывода). Мы попытаемся придумать неявный способ справиться с этим, хотя, поскольку вы столкнулись с этим.3. Обновил ответ другим рекомендуемым подходом.
Ответ №2:
Вам нужно указать, какой связующий файл использовать для каждой функции s.c.s.bindings.xxx.binder=...
.
Однако без этого я бы ожидал ошибки типа «найдено несколько связующих, но не указано значение по умолчанию», что и происходит с связующими канала сообщений.
Комментарии:
1. Не могли бы вы быть более конкретными? Как настроить «какой связующий использовать»? Прямо сейчас типы для ввода и вывода де- / сериализуются с помощью выведенных типов функции
processActivities
andprocessMaterials
, поэтому я не уверен, что и как установить с помощьюs.c.s.bindings.xxx.binder=...
.2. Я был введен в заблуждение вашим
>which each have their own binder
и думал, что вы определили 2 связующих для каждого docs.spring.io/spring-cloud-stream/docs/3.0.10.RELEASE/… Хотя это говорит о разных типах связующих, та же концепция применима к связующим того же типа.3. Извините! Проблема со связующим была только моей необразованной догадкой. Реальной проблемой является сериализация выходных данных, которая работает только тогда, когда активен только ОДИН процессор. Если у вас есть другая идея, почему это так, я с радостью приму ее.