#java #spring #hadoop #spring-boot #spring-batch
#java #spring #hadoop #spring-boot #spring-batch
Вопрос:
Предположим, у меня есть 10 записей, и некоторые из них являются поврежденными записями, так как spring будет обрабатывать перезапуск.
Пример предположим, что записи № 3 и 7 повреждены, и они переходят в другой редуктор, тогда как spring будет обрабатывать перезапуск 1. как он будет поддерживать очередь для отслеживания последнего сбоя. 2. какими различными способами мы можем решить эту проблему
Ответ №1:
SpringBatch будет делать именно то, что вы говорите SpringBatch делать.
Перезапуск для SpringBatch означает запуск того же задания, которое не удалось выполнить, с тем же набором входных параметров. Однако будет создан новый экземпляр (выполнение) этого задания.
Задание будет выполняться с тем же набором данных, с которым выполнялся неудачный экземпляр задания. В общем, не стоит изменять набор входных данных для вашего задания — входные данные задания MapReduce должны быть неизменяемыми (я полагаю, вы не будете изменять тот же набор данных, который вы используете в качестве входных).
В вашем случае задание, скорее всего, завершится с BatchStatus.COMPLETED
помощью, если вы не добавите очень специфическую логику на последнем шаге вашего задания SpringBatch. Этот последний шаг проверит все записи, и если какие-либо неработающие записи будут обнаружены искусственно, статус задания будет установлен BatchStatus.FAILED
как показано ниже:
jobExecution.setStatus(BatchStatus.FAILED)
Теперь, как перезапустить задание, это хороший вопрос, на который я отвечу через несколько минут.
Однако, прежде чем перезаписать вопрос, который вам нужно задать: если набор входных данных для вашего задания MapReduce и код вашего задания MapReduce не изменились, как restrt вам поможет?
Я думаю, вам нужно иметь какой-то набор данных, в который вы сбрасываете все неверные записи, которые не удалось обработать исходному заданию MapReduce. Вам решать, как обрабатывать эти неработающие записи.
В любом случае, перезапустить задание SpringBatch легко, если вы знаете, какой идентификатор failed jobExecution
. Ниже приведен код:
final Long restartId = jobOperator.restart(failedJobId);
final JobExecution restartExecution = jobExplorer.getJobExecution(restartId);
Редактировать
Прочитайте об интерфейсах ItemReader, ItemWriter и ItemProcessor Я думаю, что вы можете добиться отслеживания с помощью CompositeItemProcessor. В Hadoop каждая запись в файле должна иметь уникальный идентификатор. Итак, я думаю, вы можете сохранить список идентификаторов неверной записи в контексте задания. Вызовите обновление JobParameter
, которое вы создали бы при первом запуске задания badRecordsList
. Теперь, когда вы перезапускаете / возобновляете свою работу, вы прочитаете значение badRecordsList
и получите ссылку.
Комментарии:
1. Согласен, но проблема здесь в том, что предположим, что у меня есть 10 записей и 3 редуктора, и предположим, что 3-я запись переходит в редуктор1, а 7-я запись в редуктор 3, а редуктор 2 обрабатывает 3 записи 4-6, тогда как и из чего будет работать логика перезапуска? Будет ли он снова обрабатывать обработанные записи. ПОДДЕРЖИВАЕТ ли spring somwhere обработанные записи?