Spring Cloud Kafka: не удается сериализовать данные для выходного потока, когда активны два процессора

#java #spring #apache-kafka #spring-kafka #spring-cloud-stream

#java #spring #apache-kafka #spring-kafka #spring-cloud-stream

Вопрос:

У меня есть рабочая настройка для потоков Spring Cloud Kafka с функциональным стилем программирования. Существует два варианта использования, которые настраиваются через application.properties . Оба они работают по отдельности, но как только я активирую оба одновременно, я получаю ошибку сериализации для выходного потока второго варианта использования:

 Exception in thread "ActivitiesAppId-05296224-5ea1-412a-aee4-1165870b5c75-StreamThread-1" org.apache.kafka.streams.errors.StreamsException:
Error encountered sending record to topic outputActivities for task 0_0 due to:
...
Caused by: org.apache.kafka.common.errors.SerializationException:
Can't serialize data [com.example.connector.model.Activity@497b37ff] for topic [outputActivities]
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException:
Incompatible types: declared root type ([simple type, class com.example.connector.model.Material]) vs com.example.connector.model.Activity
  

Последняя строка здесь важна, поскольку «объявленный корневой тип» относится к Material классу, но не к Activity классу, что, вероятно, является исходной ошибкой.

Опять же, когда я активирую только второй вариант использования перед запуском приложения, все работает нормально. Поэтому я предполагаю, что процессор «Material» каким-то образом вмешивается в процессор «Activities» (или его сериализатор), но я не знаю, когда и где.


Настройка

1.) пример использования: «Материалы»

  • один входной поток -> преобразование -> один выходной поток
 @Bean
public Function<KStream<String, MaterialRaw>, KStream<String, Material>> processMaterials() {...}
  

application.properties

 spring.cloud.stream.kafka.streams.binder.functions.processMaterials.applicationId=MaterialsAppId
spring.cloud.stream.bindings.processMaterials-in-0.destination=inputMaterialsRaw
spring.cloud.stream.bindings.processMaterials-out-0.destination=outputMaterials
  

2.) пример использования: «Действия»

  • два входных потока -> объединение -> один выходной поток
 @Bean
public BiFunction<KStream<String, ActivityRaw>, KStream<String, Assignee>, KStream<String, Activity>> processActivities() {...}
  

application.properties

 spring.cloud.stream.kafka.streams.binder.functions.processActivities.applicationId=ActivitiesAppId
spring.cloud.stream.bindings.processActivities-in-0.destination=inputActivitiesRaw
spring.cloud.stream.bindings.processActivities-in-1.destination=inputAssignees
spring.cloud.stream.bindings.processActivities-out-0.destination=outputActivities
  

Два процессора также определены как потоковая функция в application.properties : spring.cloud.stream.function.definition=processActivities;processMaterials

Спасибо!

Обновление — Вот как я использую процессоры в коде:

Реализация

 // Material model
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class MaterialRaw {
    private String id;
    private String name;
}

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class Material {
    private String id;
    private String name;
}

// Material processor
@Bean
public Function<KStream<String, MaterialRaw>, KStream<String, Material>> processMaterials() {
    return materialsRawStream -> materialsRawStream .map((recordKey, materialRaw) -> {
        // some transformation
        final var newId = materialRaw.getId()   "---foo";
        final var newName = materialRaw.getName()   "---bar";
        final var material = new Material(newId, newName);

        // output
        return new KeyValue<>(recordKey, material); 
    };
}
  
 // Activity model
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class ActivityRaw {
    private String id;
    private String name;
}

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class Assignee {
    private String id;
    private String assignedAt;
}

/**
 * Combination of `ActivityRaw` and `Assignee`
 */
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class Activity {
    private String id;
    private Integer number;
    private String assignedAt;
}

// Activity processor
@Bean
public BiFunction<KStream<String, ActivityRaw>, KStream<String, Assignee>, KStream<String, Activity>> processActivities() {
    return (activitiesRawStream, assigneesStream) -> { 
        final var joinWindow = JoinWindows.of(Duration.ofDays(30));

        final var streamJoined = StreamJoined.with(
            Serdes.String(),
            new JsonSerde<>(ActivityRaw.class),
            new JsonSerde<>(Assignee.class)
        );

        final var joinedStream = activitiesRawStream.leftJoin(
            assigneesStream,
            new ActivityJoiner(),
            joinWindow,
            streamJoined
        );

        final var mappedStream = joinedStream.map((recordKey, activity) -> {
            return new KeyValue<>(recordKey, activity);
        });

        return mappedStream;
    };
}
  

Комментарии:

1. не могли бы вы поделиться MaterialRaw и Material model, чтобы эту проблему можно было воспроизвести локально?

2. @GovindaSakhare Сделано, но я думаю, что это как-то связано с процессорами / Spring, а не с самой моделью.

3. Любопытно. Как вы это исправили?

4. @GovindaSakhare «Готово» предназначалось для обновления сообщения SO с кодом, к сожалению, не для устранения проблемы.

5. @BennettDams Не могли бы вы опубликовать код для ActivityJoiner? Я думаю, что это отсутствует в приведенном выше фрагменте кода.

Ответ №1:

Оказывается, это проблема с тем, как связующее определяет Serde типы, когда существует несколько функций с разными исходящими целевыми типами, один с Activity , а другой с Material в вашем случае. Нам нужно будет решить эту проблему в binder. Я создал проблему здесь.

В то же время, вы можете следовать этому обходному пути.

Создайте пользовательский Serde класс, как показано ниже.

 public class ActivitySerde extends JsonSerde<Activity> {}
  

Затем явно используйте это Serde для исходящей вашей processActivities функции с использованием конфигурации.

Например,,

 spring.cloud.stream.kafka.streams.bindings.processActivities-out-0.producer.valueSerde=com.example.so65003575.ActivitySerde
  

Пожалуйста, измените пакет на соответствующий, если вы пытаетесь это обойти.

Вот еще один рекомендуемый подход. Если вы определяете компонент типа Serde с целевым типом, который имеет приоритет, поскольку связующее будет выполнять сопоставление с KStream типом. Следовательно, вы также можете сделать это, не определяя этот дополнительный класс в приведенном выше обходном пути.

 @Bean
public Serde<Activity> activitySerde() {
  return new JsonSerde(Activity.class);
}
  

Вот документы, в которых объясняются все эти детали.

Комментарии:

1. Это было оно! Поскольку я новичок в Kafka amp; Spring Cloud Stream: не могли бы вы сказать мне, делаю ли я что-то особенное? Я просто смущен тем, что это новая ошибка, поскольку я думал, что следовал основным принципам. Есть ли лучший / более распространенный способ сделать то, что я делаю?

2. Если вы строго полагаетесь на возможности вывода связующего для Serde типов, это ошибка. Однако, поскольку существуют способы решения этой проблемы, вы можете захотеть прибегнуть к этим обходным путям (я также обновляю ответ другим обходным путем при сбое Serde вывода). Мы попытаемся придумать неявный способ справиться с этим, хотя, поскольку вы столкнулись с этим.

3. Обновил ответ другим рекомендуемым подходом.

Ответ №2:

Вам нужно указать, какой связующий файл использовать для каждой функции s.c.s.bindings.xxx.binder=... .

Однако без этого я бы ожидал ошибки типа «найдено несколько связующих, но не указано значение по умолчанию», что и происходит с связующими канала сообщений.

Комментарии:

1. Не могли бы вы быть более конкретными? Как настроить «какой связующий использовать»? Прямо сейчас типы для ввода и вывода де- / сериализуются с помощью выведенных типов функции processActivities and processMaterials , поэтому я не уверен, что и как установить с помощью s.c.s.bindings.xxx.binder=... .

2. Я был введен в заблуждение вашим >which each have their own binder и думал, что вы определили 2 связующих для каждого docs.spring.io/spring-cloud-stream/docs/3.0.10.RELEASE/… Хотя это говорит о разных типах связующих, та же концепция применима к связующим того же типа.

3. Извините! Проблема со связующим была только моей необразованной догадкой. Реальной проблемой является сериализация выходных данных, которая работает только тогда, когда активен только ОДИН процессор. Если у вас есть другая идея, почему это так, я с радостью приму ее.