Почему я получаю дублированные данные из flink?

#java #apache-flink

#java #apache-flink

Вопрос:

Я новичок в flink, и я пытаюсь прочитать поток из kafka, однако я обрабатываю дублирующиеся данные, и мне интересно, почему?

Я знаю, что проблема возникла из flink, потому что, когда я писал простого потребителя на Java, я не получал повторяющихся данных

flink-connector-kafka_2.11 версия 1.10.0
flink версия 1.11

есть ли какие-либо проблемы с проверкой, обрабатывает ли flink только один раз данные, предоставленные kafka?

     public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        KafkaConsumer consumer = new KafkaConsumer("fashion","172.16.3.241:9092","fashion","org.apache.kafka.common.serialization.ByteBufferDeserializer");
        FlinkKafkaConsumer<JsonNode> stream_consumer = new FlinkKafkaConsumer<>(consumer.getTopic(), new DeserializationSchema<JsonNode>() {
            private final ObjectMapper objMapper = new ObjectMapper();
            @Override
            public JsonNode deserialize(byte[] bytes) throws IOException {
                return objMapper.readValue(bytes,JsonNode.class);
            }

            @Override
            public boolean isEndOfStream(JsonNode jsonNode) {
                return false;
            }

            @Override
            public TypeInformation<JsonNode> getProducedType() {
                return TypeExtractor.getForClass(JsonNode.class);
            }
        }, consumer.getProperties());
        DataStream<JsonNode> tweets = env.addSource(stream_consumer);
        tweets.flatMap(new getTweetSchema());
        env.execute("Flink Streaming Java API Skeleton");

    }
  
 private static class getTweetSchema implements FlatMapFunction<JsonNode, Tweet>{
        private static final long serialVersionUID = -6867736771747690202L;
        private JSONObject objTweet;
        public void flatMap(JsonNode tweet, Collector<Tweet> out) throws JSONException, ParseException{
            try{
                if (objTweet == null){
                    objTweet = new JSONObject(tweet.asText());
                }
                HashSet<String> hashtag = new HashSet<>();
                String text = objTweet.get("text").toString();
                DateFormat dateFormat = new SimpleDateFormat("EEE MMM d HH:mm:ss Z yyyy", Locale.ENGLISH );
                Date created_at = dateFormat.parse(objTweet.get("created_at").toString());
                String source = objTweet.get("source").toString();
                source = source.substring(source.length() - 11).replaceAll("</a>","");
                String lang = objTweet.get("lang").toString();
                Boolean isRT = text.matches("^RT.*");
                Long id = Long.parseLong(objTweet.get("id").toString());
                if (objTweet.has("extended_tweet")){
                    JSONArray arr = objTweet.getJSONObject("extended_tweet").getJSONObject("entities").getJSONArray("hashtags");
                    if(!(arr.isEmpty())){
                        for(int i = 0; i< arr.length();i  ){
                            hashtag.add(arr.getJSONObject(i).get("text").toString());
                        }
                        System.out.println(arr);
                    }
                }
                out.collect(new Tweet(id, text,created_at,source,lang,isRT,hashtag));
            }catch (JSONException | ParseException e){
                System.out.println("e");
                throw e;
            }
        }
    }
  

Комментарии:

1. (1) Похоже, вы используете неправильную версию соединителя — вам нужно использовать соединитель 1.11.x с Flink 1.11.x. (2) Я не вижу приемника. Для каждого задания Flink требуется хотя бы один приемник. (3) Как вы заметили, что существует дублирование?

2. да, я забыл добавить .print(), я попробую версию соединителя и посмотрю, работает ли она, спасибо!!

3. это не меняется, всегда с одной и той же проблемой @DavidAnderson