#apache-kafka #apache-kafka-streams
#apache-kafka #apache-kafka-streams
Вопрос:
Я новичок в Kafka и создаю начальный проект, используя Twitter API в качестве источника данных. Я создал производителя, который может запрашивать API Twitter и отправляет данные в мою тему kafka с помощью сериализатора строк как для ключа, так и для значения. Мое приложение Kafka Stream считывает эти данные и подсчитывает количество слов, а также группирует по дате твита. Эта часть выполняется с помощью KTable под названием wordCounts, чтобы использовать его функциональность upsert. Структура этой KTable такова:
Ключ: {слово: exampleWord, дата: exampleDate}, значение: numberOfOccurences
Затем я пытаюсь реструктурировать данные в потоке KTable в плоскую структуру, чтобы позже я мог отправить их в базу данных. Вы можете увидеть это в объекте wordCountsStructured KStream. Это реструктурирует данные, чтобы они выглядели как структура ниже. Значение изначально является JSONObject, но я преобразую его в строку, чтобы соответствовать Serdes, которые я установил.
Key: null, Value: {word: exampleWord, date: exampleDate, Counts: numberOfOccurences}
Однако, когда я пытаюсь отправить это в мою вторую тему kafka, я получаю сообщение об ошибке ниже.
Сериализатор (ключ: org.apache.kafka.common.serialization.StringSerializer / значение: org.apache.kafka.common.serialization.StringSerializer) несовместим с фактическим типом ключа или значения (тип ключа: com.google.gson.JSONObject / тип значения: com.google.gson.JSONObject). Измените Serdes по умолчанию в StreamConfig или укажите правильные Serdes через параметры метода.
Меня это смущает, поскольку KStream, который я отправляю в тему, имеет тип <String, String>
. Кто-нибудь знает, как я мог бы это исправить?
public class TwitterWordCounter {
private final JsonParser jsonParser = new JsonParser();
public Topology createTopology(){
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("test-topic2");
KTable<JsonObject, Long> wordCounts = textLines
//parse each tweet as a tweet object
.mapValues(tweetString -> new Gson().fromJson(jsonParser.parse(tweetString).getAsJsonObject(), Tweet.class))
//map each tweet object to a list of json objects, each of which containing a word from the tweet and the date of the tweet
.flatMapValues(TwitterWordCounter::tweetWordDateMapper)
//update the key so it matches the word-date combination so we can do a groupBy and count instances
.selectKey((key, wordDate) -> wordDate)
.groupByKey()
.count(Materialized.as("Counts"));
/*
In order to structure the data so that it can be ingested into SQL, the value of each item in the stream must be straightforward: property, value
so we have to:
1. take the columns which include the dimensional data and put this into the value of the stream.
2. lable the count with 'count' as the column name
*/
KStream<String, String> wordCountsStructured = wordCounts.toStream()
.map((key, value) -> new KeyValue<>(null, MapValuesToIncludeColumnData(key, value).toString()));
KStream<String, String> wordCountsPeek = wordCountsStructured.peek(
(key, value) -> System.out.println("key: " key "value:" value)
);
wordCountsStructured.to("test-output2", Produced.with(Serdes.String(), Serdes.String()));
return builder.build();
}
public static void main(String[] args) {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application1111");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "myIPAddress");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
TwitterWordCounter wordCountApp = new TwitterWordCounter();
KafkaStreams streams = new KafkaStreams(wordCountApp.createTopology(), config);
streams.start();
// shutdown hook to correctly close the streams application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
//this method is used for taking a tweet and transforming it to a representation of the words in it plus the date
public static List<JsonObject> tweetWordDateMapper(Tweet tweet) {
try{
List<String> words = Arrays.asList(tweet.tweetText.split("\W "));
List<JsonObject> tweetsJson = new ArrayList<JsonObject>();
for(String word: words) {
JsonObject tweetJson = new JsonObject();
tweetJson.add("date", new JsonPrimitive(tweet.formattedDate().toString()));
tweetJson.add("word", new JsonPrimitive(word));
tweetsJson.add(tweetJson);
}
return tweetsJson;
}
catch (Exception e) {
System.out.println(e);
System.out.println(tweet.serialize().toString());
return new ArrayList<JsonObject>();
}
}
public JsonObject MapValuesToIncludeColumnData(JsonObject key, Long countOfWord) {
key.addProperty("count", countOfWord); //new JsonPrimitive(count));
return key;
}
Ответ №1:
Поскольку вы выполняете операцию изменения ключа перед groupBy(), она создаст раздел перераспределения и для этого раздела будет использовать ключ по умолчанию, значение serdes, для которого вы установили значение String Serde .
Вы можете изменить groupBy()
вызов groupBy(Grouped.with(StringSerde,JsonSerde)
, и это должно помочь.