Несовместимые типы: список CSV-записей java

#java #list #apache-spark #rdd

#java #Список #apache-spark #rdd

Вопрос:

Я новичок в Spark-Java , я хочу получить подсписок из списка в Java 8 . Затем я конвертирую его в RDD. Я сделал это в приведенном ниже коде:

             List<CSVRecord> inputRecords = readInputLayer(actorSystem, inputCatalog, inputCatalogVersion);
            LOGGER.info("Number of partition "  inputRecords.size());

            List<CSVRecord> inputRecordsTmp = inputRecords.stream().limit(100).collect(Collectors.toList());


            JavaRDD<CSVRecord> inputRecordsJavaRDD = JavaSparkContext.emptyRDD();
            for (List<CSVRecord> partition: inputRecordsTmp ){
                JavaRDD<CSVRecord> inputRecordsTmpRDD = (JavaRDD<CSVRecord>) JavaSparkContext.parallelize(partition);
                inputRecordsJavaRDD = JavaSparkContext.union(inputRecordsJavaRDD,inputRecordsTmpRDD);

            }

        LOGGER.info("Number of lines to insert JAVA RDD ="  inputRecordsJavaRDD.count());
  

Но я получил ошибку в цикле для,
он не принимает List<CSVRecord> partition:

 Incompatible types:
Required: org.apache.commons.csvRecord
Found: java.util.list  <org.apache.commons.csvRecord>
  

Как я могу это исправить?
Спасибо

Комментарии:

1. Зачем использовать RDD вместо фрейма данных?

Ответ №1:

РЕДАКТИРОВАТЬ: Просто взглянул на документацию. parallelize фактически принимает список, но это означает только то, что вам вообще не нужен foreach. Вы должны быть в состоянии просто передать свои inputRecordsTmp права в parallelize и извлечь из этого свой RDD.

Тем не менее, поскольку foreach сформулирован неправильно, я оставлю это здесь для получения дополнительной информации:

Прежде всего, ваш цикл for-each не выглядит для меня абсолютно правильным. При написании цикла for-each-loop на Java предполагается, что часть перед двоеточием относится к типу элемента в коллекции, которую вы перебираете.

В вашем случае у вас есть список, содержащий объекты CSVRecord. В for-each вы в основном говорите «для каждого CSVRecord в этом списке объектов CSVRecord …». Однако вы написали «для каждого списка объектов CSVRecord в этом списке объектов CSVRecord …», что не имеет особого смысла.

Java уже знает, что inputRecordsTmp это список CSVRecords, поэтому нет необходимости указывать это снова где-либо. Вместо этого вы хотите сообщить, что для каждого объекта в списке вы хотите извлечь этот объект в переменную ( partition в вашем случае), чтобы вы могли использовать этот извлеченный объект внутри вашего цикла.

Поэтому вместо того, чтобы писать

 for (List<CSVRecord> partition: inputRecordsTmp ){
  

Вам пришлось бы написать

 for (CSVRecord record: inputRecordsTmp ){
  

Это может решить вашу проблему. Если нет, не могли бы вы, пожалуйста, добавить информацию о том, в какой именно строке происходит сбой?

Комментарии:

1. Спасибо за ваш naswer. Ошибка кода в строке цикла для (раздела List<CSVRecord>){. Итак, вы правы, я должен использовать в цикле одну запись, если в списке CSVRecord. Это может решить половину проблемы, потому что, когда я выполняю ваше решение, я получаю другую ошибку при распараллеливании (разделении). Так что это нормально, потому что я пытаюсь преобразовать список в RDD, а раздел не является списком. В моем примере выше я хочу просто взять 100 элементов из списка<CSVRecord>, зациклить его, затем я должен вернуть RDD, как я сделал в моем примере, но я ошибся в несовместимости типов.

2. @Dorine Спасибо за ваш комментарий! Вы видели мою правку о parallelize ? Поскольку он принимает List , насколько я вижу, этого должно быть достаточно, если вы только что передали свой список со 100 записями в качестве аргумента. Цикл for не должен быть необходимым.

3. Я поступил так, как вы описали выше, я просто преобразовал список в RDD, используя функцию parallelize. Спасибо

4. Рад, что смог помочь!