#java #apache-kafka #apache-kafka-streams #confluent-schema-registry
#java #апачи-кафка #apache-kafka-потоки #слияние-схема-реестр
Вопрос:
Я создаю приложение Kafka Streams, и мои тематические данные поступают из Protobuf. Для этого мы можем создавать привязки кода Java. Тем не менее, я изо всех сил пытаюсь использовать правильный serde для использования моих данных из темы. Может кто-нибудь, пожалуйста, поделиться, что плохого я здесь делаю.
Ниже приведено определение свойств, которое я использую:
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app-id-config");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "my-broker:my-port");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, KafkaProtobufSerde.class);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
Мой класс Serde
public class AppSerdes extends Serdes {
public static KafkaProtobufSerde<ProtobufClass1> createConfiguredSerde1() {
KafkaProtobufSerde<ProtobufClass1> serde = new KafkaProtobufSerde<ProtobufClass1>();
Map<String, Object> serdeConfig = getSerdeConfig();
serde.configure(serdeConfig, false);
return serde;
}
public static KafkaProtobufSerde<ProtobufClass2> createConfiguredSerde2() {
KafkaProtobufSerde<ProtobufClass2> serde = new KafkaProtobufSerde<ProtobufClass2>();
Map<String, Object> serdeConfig = getSerdeConfig();
serde.configure(serdeConfig, false);
return serde;
}
private static Map<String, Object> getSerdeConfig() {
Map<String, Object> serdeConfig = new HashMap<>();
serdeConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
return serdeConfig;
}
}
И вот как я создаю экземпляры KStream и KTable:
StreamsBuilder streamBuilder = new StreamsBuilder();
KTable<String, ProtobufClass1> table = streamBuilder.table("topic1",
Consumed.with(AppSerdes.String(), AppSerdes.createConfiguredSerde1()));
KStream<String, ProtobufClass2> stream = streamBuilder.stream("topic2".
Consumed.with(AppSerdes.String(), AppSerdes.createConfiguredSerde2()));
Тем не менее, я получаю сообщение об ошибке ниже:
org.apache.kafka.streams.ошибки.StreamsException: ClassCastException, вызывающий процессор. Соответствуют ли типы входных данных процессора десериализованным типам? Проверьте настройку Serde и измените Serde по умолчанию в StreamConfig или укажите правильные Serde с помощью параметров метода. Убедитесь, что процессор может принимать десериализованный ввод типа key: java.lang.Строка и значение: com.google.protobuf.DynamicMessage. Обратите внимание, что, хотя неправильные Serde являются частой причиной ошибки, приведенное исключение может иметь другую причину (например, в пользовательском коде). Например, если процессор подключается к хранилищу, но неправильно генерирует обобщения, во время обработки может возникнуть исключение приведения класса, но причиной не будет неправильный Serde. в org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185) в org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java: 273) в org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java: 252) в org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java: 219) в org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86) в org.apache.kafka.streams.processor.internals.StreamTask.lambda$ обрабатывает $ 1 (StreamTask.java:703) в org.apache.кафка.потоки.процессор.внутренности.метрики.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) в org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java: 703) в org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java: 1105) в org.apache.kafka.streams.processor.internals.StreamThread.RunOnce(StreamThread.java:647) в org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) в org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java: 512) Вызвано: java.lang.Исключение ClassCastException: com.google.protobuf.DynamicMessage не может быть передано в iit.datahub.party.system_crm.v1.CustomerAddressBase$CustomerAddressBaseEntity в org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda $internalSelectKey $ 0(KStreamImpl.java:234) в org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41) в org.апачи.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) в org.apache.kafka.streams.processor.internals.ProcessorNode.lambda $process $2 (ProcessorNode.java:181) в org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) в org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) … еще 11
Комментарии:
1. Я никогда не использовал protobuf. Но там сказано
com.google.protobuf.DynamicMessage cannot be cast to iit.datahub.party.system_crm.v1.CustomerAddressBase$CustomerAddressBaseEntity
, что я думаю, что попытался бы выяснить, что это за классcom.google.protobuf.DynamicMessage
.2. У меня такая же ошибка
DynamicMessage cannot be cast to
Ответ №1:
Я могу решить эту проблему, изменив это с
Map<String, Object> serdeConfig = getSerdeConfig();
Для
Map<String, String> serdeConfig = getSerdeConfig();
В качестве ключа и значения для этой карты используются обе строки.