#java #apache-flink
#java #apache-flink
Вопрос:
Я новичок в flink, и я пытаюсь прочитать поток из kafka, однако я обрабатываю дублирующиеся данные, и мне интересно, почему?
Я знаю, что проблема возникла из flink, потому что, когда я писал простого потребителя на Java, я не получал повторяющихся данных
flink-connector-kafka_2.11 версия 1.10.0
flink версия 1.11
есть ли какие-либо проблемы с проверкой, обрабатывает ли flink только один раз данные, предоставленные kafka?
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaConsumer consumer = new KafkaConsumer("fashion","172.16.3.241:9092","fashion","org.apache.kafka.common.serialization.ByteBufferDeserializer");
FlinkKafkaConsumer<JsonNode> stream_consumer = new FlinkKafkaConsumer<>(consumer.getTopic(), new DeserializationSchema<JsonNode>() {
private final ObjectMapper objMapper = new ObjectMapper();
@Override
public JsonNode deserialize(byte[] bytes) throws IOException {
return objMapper.readValue(bytes,JsonNode.class);
}
@Override
public boolean isEndOfStream(JsonNode jsonNode) {
return false;
}
@Override
public TypeInformation<JsonNode> getProducedType() {
return TypeExtractor.getForClass(JsonNode.class);
}
}, consumer.getProperties());
DataStream<JsonNode> tweets = env.addSource(stream_consumer);
tweets.flatMap(new getTweetSchema());
env.execute("Flink Streaming Java API Skeleton");
}
private static class getTweetSchema implements FlatMapFunction<JsonNode, Tweet>{
private static final long serialVersionUID = -6867736771747690202L;
private JSONObject objTweet;
public void flatMap(JsonNode tweet, Collector<Tweet> out) throws JSONException, ParseException{
try{
if (objTweet == null){
objTweet = new JSONObject(tweet.asText());
}
HashSet<String> hashtag = new HashSet<>();
String text = objTweet.get("text").toString();
DateFormat dateFormat = new SimpleDateFormat("EEE MMM d HH:mm:ss Z yyyy", Locale.ENGLISH );
Date created_at = dateFormat.parse(objTweet.get("created_at").toString());
String source = objTweet.get("source").toString();
source = source.substring(source.length() - 11).replaceAll("</a>","");
String lang = objTweet.get("lang").toString();
Boolean isRT = text.matches("^RT.*");
Long id = Long.parseLong(objTweet.get("id").toString());
if (objTweet.has("extended_tweet")){
JSONArray arr = objTweet.getJSONObject("extended_tweet").getJSONObject("entities").getJSONArray("hashtags");
if(!(arr.isEmpty())){
for(int i = 0; i< arr.length();i ){
hashtag.add(arr.getJSONObject(i).get("text").toString());
}
System.out.println(arr);
}
}
out.collect(new Tweet(id, text,created_at,source,lang,isRT,hashtag));
}catch (JSONException | ParseException e){
System.out.println("e");
throw e;
}
}
}
Комментарии:
1. (1) Похоже, вы используете неправильную версию соединителя — вам нужно использовать соединитель 1.11.x с Flink 1.11.x. (2) Я не вижу приемника. Для каждого задания Flink требуется хотя бы один приемник. (3) Как вы заметили, что существует дублирование?
2. да, я забыл добавить .print(), я попробую версию соединителя и посмотрю, работает ли она, спасибо!!
3. это не меняется, всегда с одной и той же проблемой @DavidAnderson