Потоки Kafka — проблема с использованием Protobuf serde

#java #apache-kafka #apache-kafka-streams #confluent-schema-registry

#java #апачи-кафка #apache-kafka-потоки #слияние-схема-реестр

Вопрос:

Я создаю приложение Kafka Streams, и мои тематические данные поступают из Protobuf. Для этого мы можем создавать привязки кода Java. Тем не менее, я изо всех сил пытаюсь использовать правильный serde для использования моих данных из темы. Может кто-нибудь, пожалуйста, поделиться, что плохого я здесь делаю.

Ниже приведено определение свойств, которое я использую:

 Properties properties = new Properties();
    properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app-id-config");
    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "my-broker:my-port");
    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, KafkaProtobufSerde.class);

    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 

Мой класс Serde

 public class AppSerdes extends Serdes {

    public static KafkaProtobufSerde<ProtobufClass1> createConfiguredSerde1() {
        KafkaProtobufSerde<ProtobufClass1> serde = new KafkaProtobufSerde<ProtobufClass1>();
        Map<String, Object> serdeConfig = getSerdeConfig();
        serde.configure(serdeConfig, false);
        return serde;
    }

    public static KafkaProtobufSerde<ProtobufClass2> createConfiguredSerde2() {
        KafkaProtobufSerde<ProtobufClass2> serde = new KafkaProtobufSerde<ProtobufClass2>();
        Map<String, Object> serdeConfig = getSerdeConfig();
        serde.configure(serdeConfig, false);
        return serde;
    }

    private static Map<String, Object> getSerdeConfig() {
        Map<String, Object> serdeConfig = new HashMap<>();
        serdeConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");

        return serdeConfig;
    }
}
 

И вот как я создаю экземпляры KStream и KTable:

 StreamsBuilder streamBuilder = new StreamsBuilder();
    KTable<String, ProtobufClass1> table = streamBuilder.table("topic1",
            Consumed.with(AppSerdes.String(), AppSerdes.createConfiguredSerde1()));
    KStream<String, ProtobufClass2> stream = streamBuilder.stream("topic2".
            Consumed.with(AppSerdes.String(), AppSerdes.createConfiguredSerde2()));
 

Тем не менее, я получаю сообщение об ошибке ниже:

org.apache.kafka.streams.ошибки.StreamsException: ClassCastException, вызывающий процессор. Соответствуют ли типы входных данных процессора десериализованным типам? Проверьте настройку Serde и измените Serde по умолчанию в StreamConfig или укажите правильные Serde с помощью параметров метода. Убедитесь, что процессор может принимать десериализованный ввод типа key: java.lang.Строка и значение: com.google.protobuf.DynamicMessage. Обратите внимание, что, хотя неправильные Serde являются частой причиной ошибки, приведенное исключение может иметь другую причину (например, в пользовательском коде). Например, если процессор подключается к хранилищу, но неправильно генерирует обобщения, во время обработки может возникнуть исключение приведения класса, но причиной не будет неправильный Serde. в org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185) в org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java: 273) в org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java: 252) в org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java: 219) в org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86) в org.apache.kafka.streams.processor.internals.StreamTask.lambda$ обрабатывает $ 1 (StreamTask.java:703) в org.apache.кафка.потоки.процессор.внутренности.метрики.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) в org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java: 703) в org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java: 1105) в org.apache.kafka.streams.processor.internals.StreamThread.RunOnce(StreamThread.java:647) в org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) в org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java: 512) Вызвано: java.lang.Исключение ClassCastException: com.google.protobuf.DynamicMessage не может быть передано в iit.datahub.party.system_crm.v1.CustomerAddressBase$CustomerAddressBaseEntity в org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda $internalSelectKey $ 0(KStreamImpl.java:234) в org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41) в org.апачи.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) в org.apache.kafka.streams.processor.internals.ProcessorNode.lambda $process $2 (ProcessorNode.java:181) в org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) в org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) … еще 11

Комментарии:

1. Я никогда не использовал protobuf. Но там сказано com.google.protobuf.DynamicMessage cannot be cast to iit.datahub.party.system_crm.v1.CustomerAddressBase$CustomerAddressBaseEntity , что я думаю, что попытался бы выяснить, что это за класс com.google.protobuf.DynamicMessage .

2. У меня такая же ошибка DynamicMessage cannot be cast to

Ответ №1:

Я могу решить эту проблему, изменив это с

 Map<String, Object> serdeConfig = getSerdeConfig();
 

Для

 Map<String, String> serdeConfig = getSerdeConfig();
 

В качестве ключа и значения для этой карты используются обе строки.