Sidekiq обрабатывает повторную очередь при обработке больших данных

#ruby-on-rails #mongodb #heroku #sidekiq #idempotent

#ruby-on-rails #mongodb #heroku #sidekiq #идемпотент

Вопрос:

Смотрите обновленный вопрос ниже.

Оригинальный вопрос:

В моем текущем проекте Rails мне нужно проанализировать большой файл данных xml / csv и сохранить его в mongodb. Прямо сейчас я использую эти шаги:

  1. Получите загруженный файл от пользователя, сохраните данные в mongodb
  2. Используйте sidekiq для выполнения асинхронной обработки данных в mongodb.
  3. После завершения процесса удалите необработанные данные.

Для небольших и средних данных на localhost описанные выше шаги выполняются хорошо. Но в heroku я использую hirefire для динамического масштабирования рабочего dyno вверх и вниз. Когда рабочий все еще обрабатывает большие данные, hirefire видит пустую очередь и сокращает рабочий режим. Это отправляет процессу сигнал завершения и оставляет процесс в незавершенном состоянии.

Я ищу лучший способ выполнить синтаксический анализ, разрешить прерывание процесса синтаксического анализа в любое время (сохранение текущего состояния при получении сигнала завершения) и разрешить повторную постановку процесса в очередь.

Прямо сейчас я использую Model.delay.parse_file и он не попадает в повторную очередь.

Обновить

Прочитав sidekiq wiki, я нашел статью об управлении заданиями. Кто-нибудь может объяснить код, как он работает и как он сохраняет свое состояние при получении сигнала SIGTERM, а рабочий получает повторную очередь?

Есть ли какой-либо альтернативный способ обработки завершения задания, сохранения текущего состояния и продолжения с последней позиции?

Спасибо,

Ответ №1:

Возможно, было бы проще объяснить процесс и шаги высокого уровня, привести пример реализации (урезанную версию той, которую я использую), а затем рассказать о throw и catch:

  1. Вставьте необработанные строки csv с увеличивающимся индексом (чтобы иметь возможность возобновить работу с определенной строки / индекса позже)
  2. Обработайте CSV, останавливая каждый «фрагмент», чтобы проверить, выполнено ли задание, проверяя, Sidekiq::Fetcher.done? возвращает ли значение true
  3. Когда выборка выполняется done? , сохраните индекс текущего обрабатываемого элемента у пользователя и выполните возврат, чтобы задание completes и управление были возвращены sidekiq.
  4. Обратите внимание, что если задание все еще выполняется после короткого тайм-аута (по умолчанию 20 секунд), задание будет остановлено.
  5. Затем, когда задание выполняется снова, просто начните с того места, на котором вы остановились в прошлый раз (или на 0)

Пример:

     class UserCSVImportWorker
      include Sidekiq::Worker

      def perform(user_id)
        user = User.find(user_id)

        items = user.raw_csv_items.where(:index => {'$gte' => user.last_csv_index.to_i})
        items.each_with_index do |item, i|
          if (i 1 % 100) == 0 amp;amp; Sidekiq::Fetcher.done?
            user.update(last_csv_index: item.index)

            return
          end

          # Process the item as normal
        end
      end
    end
  

Вышеупомянутый класс гарантирует, что каждые 100 элементов мы проверяем, что выборка не завершена (прокси для того, было ли запущено завершение работы), и завершает выполнение задания. Однако перед завершением выполнения мы обновляем пользователя последним index , который был обработан, чтобы в следующий раз мы могли начать с того, на чем остановились.

throw catch — это способ реализовать эту вышеупомянутую функциональность немного чище (возможно), но это немного похоже на использование волокон, хорошая концепция, но ее трудно охватить. Технически throw catch больше похож на goto, чем это обычно удобно большинству людей.

Редактировать

Также вы не смогли вызвать Sidekiq::Fetcher.done? и записать last_csv_index в каждой строке или в каждом обработанном фрагменте строк, таким образом, если ваш рабочий будет убит, не имея возможности записать last_csv_index , вы все равно сможете возобновить «закрытие» с того места, где вы остановились.

Комментарии:

1. Привет @nort, у меня к тебе дополнительный вопрос. Как обрабатывать Sidekiq::Fetcher.done? внутренний цикл в другом классе? При выполнении у меня есть только один вызов для ModelContainer.find(model_id).parse_data

2. на самом деле все должно работать нормально, хотя и немного не инкапсулировано… по имеющейся у меня логике я склонен использовать workers, а не models, чтобы модели оставались тонкими.

Ответ №2:

Вы пытаетесь обратиться к концепции идемпотентности, идее о том, что многократная обработка чего-либо с потенциальными неполными циклами не вызывает проблем. ( https://github.com/mperham/sidekiq/wiki/Best-Practices#2-make-your-jobs-idempotent-and-transactional )

Возможные шаги вперед

  1. Разделите файл на части и обработайте эти части заданием для каждой части.
  2. Поднимите пороговое значение для hirefire, чтобы оно масштабировалось, когда задания, вероятно, будут полностью завершены (10 минут)
  3. Не разрешайте hirefire уменьшать масштаб во время выполнения задания (установите клавишу redis при запуске и clear по завершении)
  4. Отслеживайте ход выполнения задания по мере его обработки и продолжайте с того места, на котором вы остановились, если задание завершается.