Отказ от использования .keyBy с помощью keySelector, берущего данные из Kafka

#apache-flink #flink-streaming

#apache-flink #flink-потоковая передача

Вопрос:

У меня есть скрипт Flink на Java с разъемом Kafka. Я получаю данные из Kafka без проблем, первый шаг, который я делаю .map, чтобы получить временную метку из сообщений. Чтобы использовать временные окна событий, я извлек из данных временную метку в миллисекундах и вернул ее flink. Для этого я использовал «assignTimestampsAndWatermarks»

  DataStream<String> kafkaData = env.addSource(new FlinkKafkaConsumer("CorID_0", new SimpleStringSchema(), p));

    kafkaData.map(new MapFunction<
            String, Tuple19<String, String, String, String, String,
            String, Double, Long, Double, Long,
            Long, Integer, Long, Double, Long,
            Double, Double, Integer, Double>>()
    {
        public Tuple19<String, String, String, String, String,
                String, Double, Long, Double, Long,
                Long, Integer, Long, Double, Long,
                Double, Double, Integer, Double> map(String value)
        {
            String[] words = value.split(",");
            return new Tuple19<String, String, String, String, String,
                    String, Double, Long, Double, Long,
                    Long, Integer, Long, Double, Long,
                    Double, Double, Integer, Double>
                    (words[0], words[1], words[2], words[3], words[4], words[5], Double.parseDouble(words[6]),
                            Long.parseLong(words[7]), Double.parseDouble(words[8]), Long.parseLong(words[9]),
                            Long.parseLong(words[10]), Integer.parseInt(words[11]),
                            Long.parseLong(words[12]), Double.parseDouble(words[13]),
                            Long.parseLong(words[14]), Double.parseDouble(words[15]),
                            Double.parseDouble(words[16]), Integer.parseInt(words[17]),
                            Double.parseDouble(words[18]));
        }
    })

            .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple19<String, String, String, String, String,
                    String, Double, Long, Double, Long,
                    Long, Integer, Long, Double, Long,
                    Double, Double, Integer, Double>>()
            {
                private final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
                
                public long extractAscendingTimestamp(Tuple19<String, String, String, String, String,
                        String, Double, Long, Double, Long,
                        Long, Integer, Long, Double, Long,
                        Double, Double, Integer, Double> value)
                {
                    try
                    {
                        Timestamp ts = new Timestamp(sdf.parse(value.f3).getTime());
                        return ts.getTime();
                    } catch (Exception e)
                    {
                        throw new RuntimeException("Parsing Error");
                    }
                }
            });
  

Второй шаг — это начало вычисления. Я пытаюсь выполнить некоторые манипуляции с данными, и для этого мне нужно получить данные из сообщения kafka, вот где я застрял в основном.

 DataStream<String> largeDelta = kafkaData .keyBy(new KeySelector<Tuple19<String,String,String,String,String,
                String,Double,Long,Double,Long,
                Long,Integer,Long,Double,Long,
                Double,Double, Integer,Double>, String>()
                {
            public String getKey(Tuple19<String,String,String,String,String,
                    String,Double,Long,Double,Long,
                    Long,Integer,Long,Double,Long,
                    Double,Double, Integer,Double> value)
                {
                return value.f2;
                }
                })

                 .window(TumblingEventTimeWindows.of(Time.minutes(5)))
                 .process(new TrackChanges(5));
        largeDelta.writeAsText("/Alert.txt");
        env.execute("ABCD");
  

Проблема в том, что у меня есть сообщение об ошибке, в котором говорится: «Не удается разрешить метод ‘keyBy(anonymous org.apache.flink.api.java.функции ….’

введите описание изображения здесь

Любая помощь была бы очень кстати, поскольку я изо всех сил пытаюсь понять, чего мне не хватает.

Спасибо

Ответ №1:

Я предполагаю, что вы new MapFunction()... преобразуете входящие String в a Tuple2<String, String> , так как в противном KeySelector<Tuple2<String, String>, String> случае наличие a не имело бы смысла.

Если это так, то вам нужно присвоить результат kafkaData.map(new MapFunction<... потоку данных<Tuple2<String, String>> бла, а затем использовать это с вашим keyBy .

Хотя, сказав это, я не понимаю, как у вас a keyBy().window() из a Tuple2<String, String> в результате a DataStream<String> largeDelta . Так что похоже на несколько проблем.

Кроме того, для простых селекторов ключей вместо определения анонимной функции используйте лямбда-выражение. Например. kafkaData.keyBy(r -> r.f1) сделал бы это.

Комментарии:

1. Как вы правильно указали, отсутствуют недостающие части. Я попытался упростить свой вопрос, используя Tuple2 вместо tuple19, который я сейчас использую. Я отредактирую вопрос, чтобы уточнить, в чем проблема.

2. Я бы настоятельно не советовал использовать a Tuple19 , поскольку это делает код очень трудным для понимания (и реализации исправлено — например, что именно находится .f17 в вашем кортеже?). Просто определите POJO с соответствующими получателями / установщиками.

3. И мой первоначальный ответ объясняет проблему, с которой вы столкнулись. Когда вы вызываете kafkaData.map(new MapFunction<... , это возвращает НОВЫЙ поток типа DataStream<Tuple19<...>> , но вы не фиксируете этот результат в локальной переменной, которую вы затем использовали бы для своего .keyBy() .

4. Спасибо! POJO имеет большой смысл, а также объяснение логично, я понял это с теоретической точки зрения. То, что я пытался сделать, это именно то, что вы предложили, но за последние пару дней мне не удалось сделать это должным образом. Как я могу .map вернуть данные обратно в flink для управления окнами и использовать эту .map для ключей. Не могли бы вы привести мне пример, в котором вы записываете внутри локальной переменной .map и используете как для работы с окнами, так и для .keyBy? Я предполагаю, что как только я получу это, я смогу использовать одну и ту же переменную для всего анализа, так что это было бы чрезвычайно полезно.