#java #hadoop #spark-streaming
#java #hadoop #потоковая передача spark
Вопрос:
Я хочу реализовать контрольную точку с приложением потоковой передачи файлов spark для обработки всех необработанных файлов из hadoop, если в любом случае мое приложение потоковой передачи spark остановится / завершится. Я следую этому: руководство по программированию потоковой передачи, но не нашел JavaStreamingContextFactory. Пожалуйста, помогите мне, что мне делать.
Мой код
public class StartAppWithCheckPoint {
public static void main(String[] args) {
try {
String filePath = "hdfs://Master:9000/mmi_traffic/listenerTransaction/2020/*/*/*/";
String checkpointDirectory = "hdfs://Mongo1:9000/probeAnalysis/checkpoint";
SparkSession sparkSession = JavaSparkSessionSingleton.getInstance();
JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
@Override public JavaStreamingContext create() {
SparkConf sparkConf = new SparkConf().setAppName("ProbeAnalysis");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(300));
JavaDStream<String> lines = jssc.textFileStream(filePath).cache();
jssc.checkpoint(checkpointDirectory);
return jssc;
}
};
JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);
context.start();
context.awaitTermination();
context.close();
sparkSession.close();
} catch(Exception e) {
e.printStackTrace();
}
}
}
Ответ №1:
Вы должны использовать контрольные точки
Для определения контрольных точек используйте преобразования с учетом состояния либо updateStateByKey
или reduceByKeyAndWindow
. В spark есть множество примеров — примеры предоставляются вместе с предварительно созданным spark и исходным кодом spark в git-hub. Для вашего конкретного см. JavaStatefulNetworkWordCount.java;