#java #apache-kafka #apache-kafka-streams
Вопрос:
У меня есть небольшое приложение для подсчета количества цветов с помощью Apache Kafka —
public class FavouriteColor { private static final String INPUT_TOPIC_NAME = "favourite-colour-input"; private static final String OUTPUT_TOPIC_NAME = "favourite-colour-output"; private static final String INTERMEDIATE_TOPIC_NAME = "favourite-colour-output"; private static final String APPLICATION_ID = "favourite-colour-java"; public static void main(String[] args) { Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0"); StreamsBuilder builder = new StreamsBuilder(); KStreamlt;String, Stringgt; textLines = builder.stream(INPUT_TOPIC_NAME); KStreamlt;String, Stringgt; usersAndColours = textLines .filter((key, value) -gt; value.contains(",")) .selectKey((key, value) -gt; value.split(",")[0].toLowerCase()) .mapValues(value -gt; value.split(",")[1].toLowerCase()) .filter((user, colour) -gt; Arrays.asList("green", "blue", "red").contains(colour)); usersAndColours.to(INTERMEDIATE_TOPIC_NAME); KTablelt;String, Stringgt; usersAndColoursTable = builder.table(INTERMEDIATE_TOPIC_NAME); KTablelt;String, Longgt; favouriteColours = usersAndColoursTable .groupBy((user, colour) -gt; new KeyValuelt;gt;(colour, colour)) .count(Named.as("CountsByColours")); favouriteColours.toStream().to(OUTPUT_TOPIC_NAME, Produced.with(Serdes.String(), Serdes.Long())); KafkaStreams streams = new KafkaStreams(builder.build(), config); streams.cleanUp(); streams.start(); System.out.println(streams); Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } }
Темы создаются, и производители/ потребители запускаются с помощью терминала:
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic favourite-colour-input kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic user-keys-and-colours --config cleanup.policy=compact kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic favourite-colour-output --config cleanup.policy=compact kafka-console-consumer --bootstrap-server localhost:9092 --topic favourite-colour-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer kafka-console-producer --bootstrap-server localhost:9092 --topic favourite-colour-input
Я ввел в терминал следующие входные данные:
stephane,blue john,green stephane,red alice,red
Я получил ошибку в терминале потребителя:
stephane Processed a total of 1 messages [2021-11-27 21:31:58,155] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$) org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8 at org.apache.kafka.common.serialization.LongDeserializer.deserialize(LongDeserializer.java:26) at org.apache.kafka.common.serialization.LongDeserializer.deserialize(LongDeserializer.java:21) at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) at kafka.tools.DefaultMessageFormatter.$anonfun$writeTo$2(ConsoleConsumer.scala:519) at scala.Option.map(Option.scala:242) at kafka.tools.DefaultMessageFormatter.deserialize$1(ConsoleConsumer.scala:519) at kafka.tools.DefaultMessageFormatter.writeTo(ConsoleConsumer.scala:568) at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:115) at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75) at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:52) at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
В чем здесь проблема? Я провел краткое исследование и нашел аналогичные вопросы, задаваемые другими людьми, но, похоже, решения не работают для меня.
Ответ №1:
Вы определили десериализатор значений таким надолго, но похоже, что вместо этого ваши данные представляют собой строку.