Как наиболее эффективно обрабатывать строки CSV-файла с помощью Groovy / GPars?

#groovy

#groovy

Вопрос:

Вопрос простой, и я удивлен, что он не появился сразу, когда я его искал.

У меня есть CSV-файл, потенциально очень большой, который необходимо обработать. Каждая строка должна передаваться процессору до тех пор, пока не будут обработаны все строки. Для чтения CSV-файла я буду использовать OpenCSV, который, по сути, предоставляет метод readNext(), который выдает мне следующую строку. Если больше нет доступных строк, все процессоры должны завершиться.

Для этого я создал действительно простой скрипт groovy, определил синхронный метод readNext() (поскольку чтение следующей строки на самом деле не занимает много времени), а затем создал несколько потоков, которые считывают следующую строку и обрабатывают ее. Это работает нормально, но…

Разве не должно быть встроенного решения, которое я мог бы просто использовать? Это не обработка коллекции gpars, потому что это всегда предполагает наличие существующей коллекции в памяти. Вместо этого я не могу позволить себе прочитать все это в память, а затем обработать, это приведет к исключениям outofmemory.

Итак …. у кого-нибудь есть хороший шаблон для обработки CSV-файла «построчно» с использованием пары рабочих потоков?

Ответ №1:

Одновременный доступ к файлу может быть не очень хорошей идеей, а обработка fork / join в GPars предназначена только для данных в памяти (коллекций). Я бы предложил последовательно считывать файл в список. Когда список достигнет определенного размера, одновременно обработайте записи в списке с помощью GPars, очистите список и затем перейдите к чтению строк.

Ответ №2:

Это может быть серьезной проблемой для участников. Субъект синхронного чтения может передавать строки CSV субъектам параллельного процессора. Например:

 @Grab(group='org.codehaus.gpars', module='gpars', version='0.12')

import groovyx.gpars.actor.DefaultActor
import groovyx.gpars.actor.Actor

class CsvReader extends DefaultActor {
    void act() {
        loop {
            react {
                reply readCsv()
            }
        }
    }
}

class CsvProcessor extends DefaultActor {
    Actor reader
    void act() {
        loop {
            reader.send(null)
            react {
                if (it == null)
                    terminate()
                else
                    processCsv(it)
            }
        }
    }
}

def N_PROCESSORS = 10
def reader = new CsvReader().start()
(0..<N_PROCESSORS).collect { new CsvProcessor(reader: reader).start() }*.join()
  

Комментарии:

1. Предполагаете ли вы в этом примере, что вызов readCsv() возвращает одну строку CSV? Просто хочу убедиться, что я правильно это читаю.

2. Да, readCsv() прочитал бы каждую строку последовательно. Когда достигнут конец файла, он вернет null, что позволяет процессорам знать, что конец достигнут, и они должны terminate() .

Ответ №3:

Я просто завершаю реализацию подобной проблемы в Grails (вы не указываете, используете ли вы grails, обычный hibernate, обычный JDBC или что-то еще).

Я не знаю ничего из того, что вы можете получить из коробки. Вы могли бы рассмотреть интеграцию с Spring Batch, но в последний раз, когда я смотрел на нее, она показалась мне очень тяжелой (и не очень заводной).

Если вы используете обычный JDBC, делать то, что рекомендует Кристоф, вероятно, проще всего (читать в N строк и использовать GPars для одновременного просмотра этих строк).

Если вы используете grails или hibernate и хотите, чтобы ваши рабочие потоки имели доступ к контексту spring для внедрения зависимостей, все становится немного сложнее.

Я решил это с помощью плагина Grails Redis (отказ от ответственности: Я автор) и плагин Jesque, который является java-реализацией Resque.

Плагин Jesque позволяет создавать классы «Job», которые имеют метод «process» с произвольными параметрами, которые используются для обработки работы, поставленной в очередь в очереди Jesque. Вы можете развернуть столько рабочих, сколько захотите.

У меня есть загрузка файла, в который пользователь-администратор может отправить файл, он сохраняет файл на диск и ставит в очередь задание для созданного мной ProducerJob. Это задание ProducerJob прокручивается по файлу, для каждой строки оно помещает в очередь сообщение для получения ConsumerJob. Сообщение представляет собой просто отображение значений, считанных из CSV-файла.

ConsumerJob принимает эти значения и создает соответствующий объект домена для своей строки и сохраняет его в базе данных.

Мы уже использовали Redis в производстве, поэтому использование этого в качестве механизма очередей имело смысл. У нас была старая синхронная загрузка, которая последовательно выполняла загрузку файлов. В настоящее время я использую одного работника-производителя и 4 работника-потребителя, и загрузка данных таким образом происходит более чем в 100 раз быстрее, чем при старой загрузке (с гораздо лучшей обратной связью с конечным пользователем).

Я согласен с первоначальным вопросом о том, что, вероятно, есть место для упаковки чего-то подобного, поскольку это относительно распространенная вещь.

ОБНОВЛЕНИЕ: я разместил сообщение в блоге с простым примером выполнения импорта с помощью Redis Jesque.