Как заставить Serdes работать с многоступенчатыми потоками kafka

#apache-kafka #apache-kafka-streams

#apache-kafka #apache-kafka-streams

Вопрос:

Я новичок в Kafka и создаю начальный проект, используя Twitter API в качестве источника данных. Я создал производителя, который может запрашивать API Twitter и отправляет данные в мою тему kafka с помощью сериализатора строк как для ключа, так и для значения. Мое приложение Kafka Stream считывает эти данные и подсчитывает количество слов, а также группирует по дате твита. Эта часть выполняется с помощью KTable под названием wordCounts, чтобы использовать его функциональность upsert. Структура этой KTable такова:

Ключ: {слово: exampleWord, дата: exampleDate}, значение: numberOfOccurences

Затем я пытаюсь реструктурировать данные в потоке KTable в плоскую структуру, чтобы позже я мог отправить их в базу данных. Вы можете увидеть это в объекте wordCountsStructured KStream. Это реструктурирует данные, чтобы они выглядели как структура ниже. Значение изначально является JSONObject, но я преобразую его в строку, чтобы соответствовать Serdes, которые я установил.

 Key: null, Value: {word: exampleWord, date: exampleDate, Counts: numberOfOccurences}
  

Однако, когда я пытаюсь отправить это в мою вторую тему kafka, я получаю сообщение об ошибке ниже.

Сериализатор (ключ: org.apache.kafka.common.serialization.StringSerializer / значение: org.apache.kafka.common.serialization.StringSerializer) несовместим с фактическим типом ключа или значения (тип ключа: com.google.gson.JSONObject / тип значения: com.google.gson.JSONObject). Измените Serdes по умолчанию в StreamConfig или укажите правильные Serdes через параметры метода.

Меня это смущает, поскольку KStream, который я отправляю в тему, имеет тип <String, String> . Кто-нибудь знает, как я мог бы это исправить?

 public class TwitterWordCounter {

private final JsonParser jsonParser = new JsonParser();

public Topology createTopology(){
    StreamsBuilder builder = new StreamsBuilder();


    KStream<String, String> textLines = builder.stream("test-topic2");
    KTable<JsonObject, Long> wordCounts = textLines
            //parse each tweet as a tweet object
            .mapValues(tweetString -> new Gson().fromJson(jsonParser.parse(tweetString).getAsJsonObject(), Tweet.class))
            //map each tweet object to a list of json objects, each of which containing a word from the tweet and the date of the tweet
            .flatMapValues(TwitterWordCounter::tweetWordDateMapper)
            //update the key so it matches the word-date combination so we can do a groupBy and count instances
            .selectKey((key, wordDate) -> wordDate)
            .groupByKey()
            .count(Materialized.as("Counts"));

    /*
        In order to structure the data so that it can be ingested into SQL, the value of each item in the stream must be straightforward: property, value
        so we have to:
         1. take the columns which include the dimensional data and put this into the value of the stream.
         2. lable the count with 'count' as the column name
     */
    KStream<String, String> wordCountsStructured = wordCounts.toStream()
            .map((key, value) -> new KeyValue<>(null, MapValuesToIncludeColumnData(key, value).toString()));

    KStream<String, String> wordCountsPeek = wordCountsStructured.peek(
            (key, value) -> System.out.println("key: "   key   "value:"   value)
    );

    wordCountsStructured.to("test-output2", Produced.with(Serdes.String(), Serdes.String()));

    return builder.build();
}

public static void main(String[] args) {
    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application1111");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "myIPAddress");
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    TwitterWordCounter wordCountApp = new TwitterWordCounter();

    KafkaStreams streams = new KafkaStreams(wordCountApp.createTopology(), config);
    streams.start();

    // shutdown hook to correctly close the streams application
    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

}

//this method is used for taking a tweet and transforming it to a representation of the words in it plus the date
public static List<JsonObject> tweetWordDateMapper(Tweet tweet) {
    try{

        List<String> words = Arrays.asList(tweet.tweetText.split("\W "));
        List<JsonObject> tweetsJson = new ArrayList<JsonObject>();
        for(String word: words) {
            JsonObject tweetJson = new JsonObject();
            tweetJson.add("date", new JsonPrimitive(tweet.formattedDate().toString()));
            tweetJson.add("word", new JsonPrimitive(word));
            tweetsJson.add(tweetJson);
        }

        return tweetsJson;
    }
    catch (Exception e) {
        System.out.println(e);
        System.out.println(tweet.serialize().toString());
        return new ArrayList<JsonObject>();
    }

}

public JsonObject MapValuesToIncludeColumnData(JsonObject key, Long countOfWord) {
    key.addProperty("count", countOfWord); //new JsonPrimitive(count));
    return key;
}
  

Ответ №1:

Поскольку вы выполняете операцию изменения ключа перед groupBy(), она создаст раздел перераспределения и для этого раздела будет использовать ключ по умолчанию, значение serdes, для которого вы установили значение String Serde .

Вы можете изменить groupBy() вызов groupBy(Grouped.with(StringSerde,JsonSerde) , и это должно помочь.