потоковая передача с использованием ConstantInputDStream в java

#apache-spark #streaming

#apache-spark #потоковая передача

Вопрос:

Я пытаюсь использовать ConstantInputDStream в программе потоковой передачи Java spark, но не могу этого сделать. Я делаю:

             final SparkConf sparkConf2 = new SparkConf().setAppName("NetworkWordCount");
            final JavaStreamingContext ssc2 = new JavaStreamingContext(sparkConf2, new Duration(10000));
            final List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
            JavaRDD<Integer> distData = ssc2.sparkContext().parallelize(data);
            final JavaDStream<Integer> numStream = new JavaDStream<Integer>(new ConstantInputDStream<Integer>(ssc2, distData));
  

Но последнее утверждение выдает ошибку компиляции: «ошибка: конструктор ConstantInputDStream в классе ConstantInputDStream<‘T’> не может быть применен к заданным типам;»

В чем может быть проблема и как это исправить?

Комментарии:

1. попробуйте этот ConstantInputDStream(ssc2, distData)

2. вот как конструктор определяется в соответствии с общедоступным ConstantInputDStream документов (StreamingContext _ssc, RDD<T> rdd, scala.reflect.classstag<T> доказательство $1)

Ответ №1:

Вам нужно добавить

 ClassTag<Integer> classTag = ClassTag$.MODULE$.apply(Integer.class);
final JavaDStream<Integer> numStream = new JavaDStream<Integer>(new ConstantInputDStream<Integer>(ssc2, distData,classTag));
  

Ответ №2:

Я думаю, что API, возможно, изменился. Я взял ваш код в качестве отправной точки и добавил ответ Вишну, и в итоге получилось следующее:

        try (JavaStreamingContext streamCtxt = new JavaStreamingContext(sparkContext, new Duration(1000))) {
            final List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
            JavaRDD<Integer> distData = streamCtxt.sparkContext().parallelize(data);
            ClassTag<Integer> evidence = ClassTag$.MODULE$.apply(Integer.class);
            ConstantInputDStream<Integer> integerConstantInputDStream =
                    new ConstantInputDStream<>(streamCtxt.ssc(), distData.rdd(), evidence);
            ArrayList<Integer> list = new ArrayList();
            final JavaDStream<Integer> javaDStream = JavaDStream.fromDStream(integerConstantInputDStream, evidence);
            javaDStream.foreachRDD(r -> list.addAll(r.collect()));
            streamCtxt.start();
            streamCtxt.awaitTerminationOrTimeout(2000);
            streamCtxt.stop();
            log.info("here is the list: "   list.stream().map(j->String.valueOf(j)).collect(Collectors.joining(",")));       
        }
  

Вывод:

 here is the list: 1,2,3,4,5,1,2,3,4,5