#apache-flink #flink-streaming
#apache-flink #flink-потоковая передача
Вопрос:
У меня есть скрипт Flink на Java с разъемом Kafka. Я получаю данные из Kafka без проблем, первый шаг, который я делаю .map, чтобы получить временную метку из сообщений. Чтобы использовать временные окна событий, я извлек из данных временную метку в миллисекундах и вернул ее flink. Для этого я использовал «assignTimestampsAndWatermarks»
DataStream<String> kafkaData = env.addSource(new FlinkKafkaConsumer("CorID_0", new SimpleStringSchema(), p));
kafkaData.map(new MapFunction<
String, Tuple19<String, String, String, String, String,
String, Double, Long, Double, Long,
Long, Integer, Long, Double, Long,
Double, Double, Integer, Double>>()
{
public Tuple19<String, String, String, String, String,
String, Double, Long, Double, Long,
Long, Integer, Long, Double, Long,
Double, Double, Integer, Double> map(String value)
{
String[] words = value.split(",");
return new Tuple19<String, String, String, String, String,
String, Double, Long, Double, Long,
Long, Integer, Long, Double, Long,
Double, Double, Integer, Double>
(words[0], words[1], words[2], words[3], words[4], words[5], Double.parseDouble(words[6]),
Long.parseLong(words[7]), Double.parseDouble(words[8]), Long.parseLong(words[9]),
Long.parseLong(words[10]), Integer.parseInt(words[11]),
Long.parseLong(words[12]), Double.parseDouble(words[13]),
Long.parseLong(words[14]), Double.parseDouble(words[15]),
Double.parseDouble(words[16]), Integer.parseInt(words[17]),
Double.parseDouble(words[18]));
}
})
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple19<String, String, String, String, String,
String, Double, Long, Double, Long,
Long, Integer, Long, Double, Long,
Double, Double, Integer, Double>>()
{
private final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
public long extractAscendingTimestamp(Tuple19<String, String, String, String, String,
String, Double, Long, Double, Long,
Long, Integer, Long, Double, Long,
Double, Double, Integer, Double> value)
{
try
{
Timestamp ts = new Timestamp(sdf.parse(value.f3).getTime());
return ts.getTime();
} catch (Exception e)
{
throw new RuntimeException("Parsing Error");
}
}
});
Второй шаг — это начало вычисления. Я пытаюсь выполнить некоторые манипуляции с данными, и для этого мне нужно получить данные из сообщения kafka, вот где я застрял в основном.
DataStream<String> largeDelta = kafkaData .keyBy(new KeySelector<Tuple19<String,String,String,String,String,
String,Double,Long,Double,Long,
Long,Integer,Long,Double,Long,
Double,Double, Integer,Double>, String>()
{
public String getKey(Tuple19<String,String,String,String,String,
String,Double,Long,Double,Long,
Long,Integer,Long,Double,Long,
Double,Double, Integer,Double> value)
{
return value.f2;
}
})
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.process(new TrackChanges(5));
largeDelta.writeAsText("/Alert.txt");
env.execute("ABCD");
Проблема в том, что у меня есть сообщение об ошибке, в котором говорится: «Не удается разрешить метод ‘keyBy(anonymous org.apache.flink.api.java.функции ….’
Любая помощь была бы очень кстати, поскольку я изо всех сил пытаюсь понять, чего мне не хватает.
Спасибо
Ответ №1:
Я предполагаю, что вы new MapFunction()...
преобразуете входящие String
в a Tuple2<String, String>
, так как в противном KeySelector<Tuple2<String, String>, String>
случае наличие a не имело бы смысла.
Если это так, то вам нужно присвоить результат kafkaData.map(new MapFunction<...
потоку данных<Tuple2<String, String>> бла, а затем использовать это с вашим keyBy
.
Хотя, сказав это, я не понимаю, как у вас a keyBy().window()
из a Tuple2<String, String>
в результате a DataStream<String> largeDelta
. Так что похоже на несколько проблем.
Кроме того, для простых селекторов ключей вместо определения анонимной функции используйте лямбда-выражение. Например. kafkaData.keyBy(r -> r.f1)
сделал бы это.
Комментарии:
1. Как вы правильно указали, отсутствуют недостающие части. Я попытался упростить свой вопрос, используя Tuple2 вместо tuple19, который я сейчас использую. Я отредактирую вопрос, чтобы уточнить, в чем проблема.
2. Я бы настоятельно не советовал использовать a
Tuple19
, поскольку это делает код очень трудным для понимания (и реализации исправлено — например, что именно находится.f17
в вашем кортеже?). Просто определите POJO с соответствующими получателями / установщиками.3. И мой первоначальный ответ объясняет проблему, с которой вы столкнулись. Когда вы вызываете
kafkaData.map(new MapFunction<...
, это возвращает НОВЫЙ поток типаDataStream<Tuple19<...>>
, но вы не фиксируете этот результат в локальной переменной, которую вы затем использовали бы для своего.keyBy()
.4. Спасибо! POJO имеет большой смысл, а также объяснение логично, я понял это с теоретической точки зрения. То, что я пытался сделать, это именно то, что вы предложили, но за последние пару дней мне не удалось сделать это должным образом. Как я могу .map вернуть данные обратно в flink для управления окнами и использовать эту .map для ключей. Не могли бы вы привести мне пример, в котором вы записываете внутри локальной переменной .map и используете как для работы с окнами, так и для .keyBy? Я предполагаю, что как только я получу это, я смогу использовать одну и ту же переменную для всего анализа, так что это было бы чрезвычайно полезно.