Контрольная точка с потоковой передачей файлов spark на java

#java #hadoop #spark-streaming

#java #hadoop #потоковая передача spark

Вопрос:

Я хочу реализовать контрольную точку с приложением потоковой передачи файлов spark для обработки всех необработанных файлов из hadoop, если в любом случае мое приложение потоковой передачи spark остановится / завершится. Я следую этому: руководство по программированию потоковой передачи, но не нашел JavaStreamingContextFactory. Пожалуйста, помогите мне, что мне делать.

Мой код

 public class StartAppWithCheckPoint {

    public static void main(String[] args) {
        
        try {
            
            String filePath = "hdfs://Master:9000/mmi_traffic/listenerTransaction/2020/*/*/*/"; 
            String checkpointDirectory = "hdfs://Mongo1:9000/probeAnalysis/checkpoint";
            SparkSession sparkSession = JavaSparkSessionSingleton.getInstance();

            JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
                  @Override public JavaStreamingContext create() {
                      
                    SparkConf sparkConf = new SparkConf().setAppName("ProbeAnalysis");
                    JavaSparkContext sc = new JavaSparkContext(sparkConf);  
                    JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(300));
                    JavaDStream<String> lines = jssc.textFileStream(filePath).cache();
                    
                    jssc.checkpoint(checkpointDirectory);
                    return jssc;
                  }
                };
                
            JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);
            
            context.start();
            context.awaitTermination();
            context.close();
            sparkSession.close();
            
        } catch(Exception e) {
            e.printStackTrace();
        }   
    }
}
  

Ответ №1:

Вы должны использовать контрольные точки

Для определения контрольных точек используйте преобразования с учетом состояния либо updateStateByKey или reduceByKeyAndWindow . В spark есть множество примеров — примеры предоставляются вместе с предварительно созданным spark и исходным кодом spark в git-hub. Для вашего конкретного см. JavaStatefulNetworkWordCount.java;