#java #list #apache-spark #rdd
#java #Список #apache-spark #rdd
Вопрос:
Я новичок в Spark-Java
, я хочу получить подсписок из списка в Java 8
. Затем я конвертирую его в RDD. Я сделал это в приведенном ниже коде:
List<CSVRecord> inputRecords = readInputLayer(actorSystem, inputCatalog, inputCatalogVersion);
LOGGER.info("Number of partition " inputRecords.size());
List<CSVRecord> inputRecordsTmp = inputRecords.stream().limit(100).collect(Collectors.toList());
JavaRDD<CSVRecord> inputRecordsJavaRDD = JavaSparkContext.emptyRDD();
for (List<CSVRecord> partition: inputRecordsTmp ){
JavaRDD<CSVRecord> inputRecordsTmpRDD = (JavaRDD<CSVRecord>) JavaSparkContext.parallelize(partition);
inputRecordsJavaRDD = JavaSparkContext.union(inputRecordsJavaRDD,inputRecordsTmpRDD);
}
LOGGER.info("Number of lines to insert JAVA RDD =" inputRecordsJavaRDD.count());
Но я получил ошибку в цикле для,
он не принимает List<CSVRecord> partition:
Incompatible types:
Required: org.apache.commons.csvRecord
Found: java.util.list <org.apache.commons.csvRecord>
Как я могу это исправить?
Спасибо
Комментарии:
1. Зачем использовать RDD вместо фрейма данных?
Ответ №1:
РЕДАКТИРОВАТЬ: Просто взглянул на документацию. parallelize
фактически принимает список, но это означает только то, что вам вообще не нужен foreach. Вы должны быть в состоянии просто передать свои inputRecordsTmp
права в parallelize и извлечь из этого свой RDD.
Тем не менее, поскольку foreach сформулирован неправильно, я оставлю это здесь для получения дополнительной информации:
Прежде всего, ваш цикл for-each не выглядит для меня абсолютно правильным. При написании цикла for-each-loop на Java предполагается, что часть перед двоеточием относится к типу элемента в коллекции, которую вы перебираете.
В вашем случае у вас есть список, содержащий объекты CSVRecord. В for-each вы в основном говорите «для каждого CSVRecord в этом списке объектов CSVRecord …». Однако вы написали «для каждого списка объектов CSVRecord в этом списке объектов CSVRecord …», что не имеет особого смысла.
Java уже знает, что inputRecordsTmp
это список CSVRecords, поэтому нет необходимости указывать это снова где-либо. Вместо этого вы хотите сообщить, что для каждого объекта в списке вы хотите извлечь этот объект в переменную ( partition
в вашем случае), чтобы вы могли использовать этот извлеченный объект внутри вашего цикла.
Поэтому вместо того, чтобы писать
for (List<CSVRecord> partition: inputRecordsTmp ){
Вам пришлось бы написать
for (CSVRecord record: inputRecordsTmp ){
Это может решить вашу проблему. Если нет, не могли бы вы, пожалуйста, добавить информацию о том, в какой именно строке происходит сбой?
Комментарии:
1. Спасибо за ваш naswer. Ошибка кода в строке цикла для (раздела List<CSVRecord>){. Итак, вы правы, я должен использовать в цикле одну запись, если в списке CSVRecord. Это может решить половину проблемы, потому что, когда я выполняю ваше решение, я получаю другую ошибку при распараллеливании (разделении). Так что это нормально, потому что я пытаюсь преобразовать список в RDD, а раздел не является списком. В моем примере выше я хочу просто взять 100 элементов из списка<CSVRecord>, зациклить его, затем я должен вернуть RDD, как я сделал в моем примере, но я ошибся в несовместимости типов.
2. @Dorine Спасибо за ваш комментарий! Вы видели мою правку о
parallelize
? Поскольку он принимаетList
, насколько я вижу, этого должно быть достаточно, если вы только что передали свой список со 100 записями в качестве аргумента. Цикл for не должен быть необходимым.3. Я поступил так, как вы описали выше, я просто преобразовал список в RDD, используя функцию parallelize. Спасибо
4. Рад, что смог помочь!