#groovy
#groovy
Вопрос:
Вопрос простой, и я удивлен, что он не появился сразу, когда я его искал.
У меня есть CSV-файл, потенциально очень большой, который необходимо обработать. Каждая строка должна передаваться процессору до тех пор, пока не будут обработаны все строки. Для чтения CSV-файла я буду использовать OpenCSV, который, по сути, предоставляет метод readNext(), который выдает мне следующую строку. Если больше нет доступных строк, все процессоры должны завершиться.
Для этого я создал действительно простой скрипт groovy, определил синхронный метод readNext() (поскольку чтение следующей строки на самом деле не занимает много времени), а затем создал несколько потоков, которые считывают следующую строку и обрабатывают ее. Это работает нормально, но…
Разве не должно быть встроенного решения, которое я мог бы просто использовать? Это не обработка коллекции gpars, потому что это всегда предполагает наличие существующей коллекции в памяти. Вместо этого я не могу позволить себе прочитать все это в память, а затем обработать, это приведет к исключениям outofmemory.
Итак …. у кого-нибудь есть хороший шаблон для обработки CSV-файла «построчно» с использованием пары рабочих потоков?
Ответ №1:
Одновременный доступ к файлу может быть не очень хорошей идеей, а обработка fork / join в GPars предназначена только для данных в памяти (коллекций). Я бы предложил последовательно считывать файл в список. Когда список достигнет определенного размера, одновременно обработайте записи в списке с помощью GPars, очистите список и затем перейдите к чтению строк.
Ответ №2:
Это может быть серьезной проблемой для участников. Субъект синхронного чтения может передавать строки CSV субъектам параллельного процессора. Например:
@Grab(group='org.codehaus.gpars', module='gpars', version='0.12')
import groovyx.gpars.actor.DefaultActor
import groovyx.gpars.actor.Actor
class CsvReader extends DefaultActor {
void act() {
loop {
react {
reply readCsv()
}
}
}
}
class CsvProcessor extends DefaultActor {
Actor reader
void act() {
loop {
reader.send(null)
react {
if (it == null)
terminate()
else
processCsv(it)
}
}
}
}
def N_PROCESSORS = 10
def reader = new CsvReader().start()
(0..<N_PROCESSORS).collect { new CsvProcessor(reader: reader).start() }*.join()
Комментарии:
1. Предполагаете ли вы в этом примере, что вызов readCsv() возвращает одну строку CSV? Просто хочу убедиться, что я правильно это читаю.
2. Да,
readCsv()
прочитал бы каждую строку последовательно. Когда достигнут конец файла, он вернет null, что позволяет процессорам знать, что конец достигнут, и они должныterminate()
.
Ответ №3:
Я просто завершаю реализацию подобной проблемы в Grails (вы не указываете, используете ли вы grails, обычный hibernate, обычный JDBC или что-то еще).
Я не знаю ничего из того, что вы можете получить из коробки. Вы могли бы рассмотреть интеграцию с Spring Batch, но в последний раз, когда я смотрел на нее, она показалась мне очень тяжелой (и не очень заводной).
Если вы используете обычный JDBC, делать то, что рекомендует Кристоф, вероятно, проще всего (читать в N строк и использовать GPars для одновременного просмотра этих строк).
Если вы используете grails или hibernate и хотите, чтобы ваши рабочие потоки имели доступ к контексту spring для внедрения зависимостей, все становится немного сложнее.
Я решил это с помощью плагина Grails Redis (отказ от ответственности: Я автор) и плагин Jesque, который является java-реализацией Resque.
Плагин Jesque позволяет создавать классы «Job», которые имеют метод «process» с произвольными параметрами, которые используются для обработки работы, поставленной в очередь в очереди Jesque. Вы можете развернуть столько рабочих, сколько захотите.
У меня есть загрузка файла, в который пользователь-администратор может отправить файл, он сохраняет файл на диск и ставит в очередь задание для созданного мной ProducerJob. Это задание ProducerJob прокручивается по файлу, для каждой строки оно помещает в очередь сообщение для получения ConsumerJob. Сообщение представляет собой просто отображение значений, считанных из CSV-файла.
ConsumerJob принимает эти значения и создает соответствующий объект домена для своей строки и сохраняет его в базе данных.
Мы уже использовали Redis в производстве, поэтому использование этого в качестве механизма очередей имело смысл. У нас была старая синхронная загрузка, которая последовательно выполняла загрузку файлов. В настоящее время я использую одного работника-производителя и 4 работника-потребителя, и загрузка данных таким образом происходит более чем в 100 раз быстрее, чем при старой загрузке (с гораздо лучшей обратной связью с конечным пользователем).
Я согласен с первоначальным вопросом о том, что, вероятно, есть место для упаковки чего-то подобного, поскольку это относительно распространенная вещь.
ОБНОВЛЕНИЕ: я разместил сообщение в блоге с простым примером выполнения импорта с помощью Redis Jesque.