Потоки Кафки: Исключение сериализации: Размер данных, полученных LongDeserializer, не равен 8

#java #apache-kafka #apache-kafka-streams

Вопрос:

У меня есть небольшое приложение для подсчета количества цветов с помощью Apache Kafka —

 public class FavouriteColor {    private static final String INPUT_TOPIC_NAME = "favourite-colour-input";  private static final String OUTPUT_TOPIC_NAME = "favourite-colour-output";  private static final String INTERMEDIATE_TOPIC_NAME = "favourite-colour-output";   private static final String APPLICATION_ID = "favourite-colour-java";    public static void main(String[] args) {   Properties config = new Properties();   config.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);  config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");  config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");  config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());  config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());   config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");   StreamsBuilder builder = new StreamsBuilder();   KStreamlt;String, Stringgt; textLines = builder.stream(INPUT_TOPIC_NAME);   KStreamlt;String, Stringgt; usersAndColours = textLines  .filter((key, value) -gt; value.contains(","))  .selectKey((key, value) -gt; value.split(",")[0].toLowerCase())  .mapValues(value -gt; value.split(",")[1].toLowerCase())  .filter((user, colour) -gt; Arrays.asList("green", "blue", "red").contains(colour));   usersAndColours.to(INTERMEDIATE_TOPIC_NAME);  KTablelt;String, Stringgt; usersAndColoursTable = builder.table(INTERMEDIATE_TOPIC_NAME);   KTablelt;String, Longgt; favouriteColours = usersAndColoursTable  .groupBy((user, colour) -gt; new KeyValuelt;gt;(colour, colour))  .count(Named.as("CountsByColours"));   favouriteColours.toStream().to(OUTPUT_TOPIC_NAME, Produced.with(Serdes.String(), Serdes.Long()));   KafkaStreams streams = new KafkaStreams(builder.build(), config);   streams.cleanUp();  streams.start();   System.out.println(streams);   Runtime.getRuntime().addShutdownHook(new Thread(streams::close));  } }  

Темы создаются, и производители/ потребители запускаются с помощью терминала:

 kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic favourite-colour-input  kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic user-keys-and-colours --config cleanup.policy=compact  kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic favourite-colour-output --config cleanup.policy=compact    kafka-console-consumer --bootstrap-server localhost:9092   --topic favourite-colour-output   --from-beginning   --formatter kafka.tools.DefaultMessageFormatter   --property print.key=true   --property print.value=true   --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer   --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer    kafka-console-producer --bootstrap-server localhost:9092 --topic favourite-colour-input  

Я ввел в терминал следующие входные данные:

 stephane,blue john,green stephane,red alice,red  

Я получил ошибку в терминале потребителя:

 stephane Processed a total of 1 messages [2021-11-27 21:31:58,155] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$) org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8  at org.apache.kafka.common.serialization.LongDeserializer.deserialize(LongDeserializer.java:26)  at org.apache.kafka.common.serialization.LongDeserializer.deserialize(LongDeserializer.java:21)  at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)  at kafka.tools.DefaultMessageFormatter.$anonfun$writeTo$2(ConsoleConsumer.scala:519)  at scala.Option.map(Option.scala:242)  at kafka.tools.DefaultMessageFormatter.deserialize$1(ConsoleConsumer.scala:519)  at kafka.tools.DefaultMessageFormatter.writeTo(ConsoleConsumer.scala:568)  at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:115)  at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75)  at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:52)  at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)  

В чем здесь проблема? Я провел краткое исследование и нашел аналогичные вопросы, задаваемые другими людьми, но, похоже, решения не работают для меня.

Ответ №1:

Вы определили десериализатор значений таким надолго, но похоже, что вместо этого ваши данные представляют собой строку.