Как регулярно загружать много файлов и начинать с того места, где он остановился?

#spring #spring-boot #spring-batch

#spring #spring-boot #spring-batch

Вопрос:

В каталог регулярно поступает много файлов для загрузки весенним пакетом. Мой пакет Spring Batch Jar запускается внешним планировщиком. Если в файле есть ошибка, я бы хотел, чтобы были загружены другие файлы и чтобы загрузка продолжалась с того места, где она была прервана для файла с ошибкой (после исправления моего файла)

Это мой очень простой пример :


SpringBatchSimpleTestApplication.java

 @SpringBootApplication
public class SpringBatchSimpleTestApplication {

    private static final Logger log = LoggerFactory.getLogger(SpringBatchSimpleTestApplication.class);
    
    public static void main(String[] args) throws Exception {
            
        System.exit(SpringApplication.exit(SpringApplication.run(SpringBatchSimpleTestApplication.class, args)));
    }
    
}
  

PersonJobConfiguration.java

 public class PersonJobConfiguration {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;
    
    @Value("classpath*:person*.csv")
    private Resource[] inputFiles;

    @Bean
    public MultiResourceItemReader<Person> multiResourceItemReader1() {
    return new MultiResourceItemReaderBuilder<Person>()                  
        .name("multiResourceItemReader1")
            .delegate(reader1())
            .resources(inputFiles)
            .build();
    }

    @Bean
    public FlatFileItemReader<Person> reader1() {
    return new FlatFileItemReaderBuilder<Person>() 
        .name("personItemReader")
        .lineTokenizer(new FixedLengthTokenizer()  {{ setNames("firstName", "lastName", "age'"); setColumns(new Range(1,4), new Range(5,6),new Range(7)); }})
        .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{ setTargetType(Person.class); }})
        .build();
    }

    @Bean
    public JdbcBatchItemWriter<Person> writer1(DataSource dataSource) {         
        JdbcBatchItemWriter<Person> jdbcBatchItemWriter = new JdbcBatchItemWriter<>();
        jdbcBatchItemWriter.setAssertUpdates(true);
        jdbcBatchItemWriter.setDataSource(dataSource);
        jdbcBatchItemWriter.setSql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)");
        jdbcBatchItemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());

        return jdbcBatchItemWriter;
    }

    @Bean
    public Job person_job(JobEndListener listener, Step step1) {
        return jobBuilderFactory.get("person_job")
            //.incrementer(new RunIdIncrementer())
            .listener(listener)
            .flow(step1)
            .end()
            .build();
    }

    @Bean
    public Step step1(JdbcBatchItemWriter<Person> writer) {
        return stepBuilderFactory.get("step1")
            .<Person, Person> chunk(10)
            .reader(multiResourceItemReader1())
            .writer(writer)
            .build();
    }

}
  

Person.java

 public class Person {

    private String lastName;
    private String firstName;
    private String age;
    

    public Person(String lastName, String firstName, String age) {
        super();
        this.lastName = lastName;
        this.firstName = firstName;
        this.age = age;
    }

    public String getAge() {
        return age;
    }

    public void setAge(String age) {
        this.age = age;
    }

    public Person() {
    }

    public Person(String firstName, String lastName) {
        this.firstName = firstName;
        this.lastName = lastName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getFirstName() {
        return firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    @Override
    public String toString() {
        return "firstName: "   firstName   ", lastName: "   lastName;
    }

}
  

Моя проблема :

  • Если я использую «.incrementer(новый RunIdIncrementer())» в задании, новое задание создается при каждом выполнении моего проекта. Следовательно, если загрузка не удалась, она начинается с начала файла, но не с того места, где она остановилась.

  • Если я не использую «.incrementer(new RunIdIncrementer())» в задании, оно работает, оно начинается с того места, где оно остановилось, но, если я добавляю новые файлы для загрузки, они не загружаются, потому что не создано новое задание.

Как загрузить некоторые файлы, регулярно поступающие в один и тот же каталог? Является ли лучший способ использовать «.incrementer(new RunIdIncrementer())» в этом случае ? Если нет, то как?

Чтобы продолжить загрузку с того места, где она была прервана, я не хочу вводить идентификатор каждого задания для перезапуска. Я хочу, чтобы это делалось автоматически. Должен ли я получать неудачные задания в базе данных, а затем перезапускать их? Если да, то как?

Заранее благодарю вас за вашу помощь. С уважением.

Комментарии:

1. Экземпляр задания должен быть определен для фиксированного набора данных (для возможности перезапуска), и это делается путем разумного выбора набора идентифицирующих параметров задания, каковы ваши параметры задания? Инкремент используется, когда существует четко определенная последовательность экземпляров заданий, есть ли у вас такая последовательность? Вам действительно нужен инкрементатор?

2. Мой принцип: — В папку поступает много файлов с одинаковой структурой — запускается пакет Spring Batch: он загружает все файлы папки в базу данных. — Затем новые файлы с той же структурой поступают в ту же папку — Пакет Spring запускается снова и должен загружать файлы в базу данных. — и так далее… В этом случае можем ли мы сказать, что существует четко определенная последовательность экземпляров заданий ? Обязательно ли мне использовать инкрементатор? (На данный момент я не использую JobParameters)

3. Как запускается ваша работа? Основано ли это на обычном расписании или с помощью пользовательского события? Можете ли вы поделиться кодом того, как вы запускаете задания? If there is an error in a file, I'd like that the other files be loaded and, that the loading pick up where it left off for the file with error (after having correct my file) : Пакет Spring Batch должен знать, как идентифицировать фиксированный набор данных, чтобы иметь возможность перезапустить с того места, где он остановился, поэтому вам нужно указать, как однозначно идентифицировать экземпляр задания, используя параметры определения задания. Выбор параметров задания имеет решающее значение для правильной разработки перезапускаемого пакетного задания Spring.

4. Я только что добавил больше уточнений к своей проблеме.

5. хорошо, спасибо. Я добавил ответ, надеюсь, это поможет.

Ответ №1:

Что здесь усложняет, так это то, что одно задание обрабатывает несколько файлов одновременно.

Следуя философии Unix, согласно которой одна вещь выполняет одно дело и делает это хорошо, я рекомендую сделать так, чтобы каждое задание обрабатывалось в одном файле. Вы можете создать метод по расписанию, который считывает содержимое каталога и запускает задание для каждого файла (уникальное имя файла является параметром задания). Успешно загруженные файлы могут быть перемещены в другой каталог. Теперь, если задание завершается с ошибкой, это не влияет на загрузку других файлов, и при следующем запуске файл с ошибкой будет перезапущен с того места, где он был прерван.

Этот подход имеет ряд преимуществ:

  • Масштабируемость: это позволяет запускать несколько заданий параллельно (подход чтения с несколькими источниками будет загружать файлы последовательно)
  • Отказоустойчивость: это позволяет перезапустить сбойный файл с того места, где он остановился
  • Для этого не требуется инкремент.
  • И т. Д

Выбор параметров задания имеет решающее значение для правильной разработки перезапускаемого пакетного задания Spring.

Комментарии:

1. Большое вам спасибо за ваш ответ. Если я хорошо понимаю: — Я должен создать столько запланированных методов, сколько у меня есть каталогов, в которых у меня есть файлы для загрузки. — Затем каждый из этих методов просматривает файлы каталогов, и для каждого файла я должен запустить задание с параметром имени загружаемого файла. — Если загрузка завершается неудачно, я должен исправить свой файл и перезапустить пакет Spring Batch с указанием в параметре имени файла с ошибкой. Правильно ли это? Это лучший способ сделать?

2. все правильно, за исключением наличия запланированного метода для каждого каталога. Я не понимаю, зачем вам это нужно. Метод по расписанию может проходить через один или несколько каталогов, содержащих входные файлы, и запускать задание для каждого файла. Пожалуйста, примите ответ, если это помогло.

3. Я подумал сделать один запланированный метод по каталогу, потому что я хотел запускать загрузки в разные моменты по соображениям производительности.

4. Здравствуйте, я пытался, как вы сказали, запустить задание по файлу. Я каждый раз помещаю имя файла в JobParameter и только его . Он хорошо загружает все мои файлы. Однако, если я попытаюсь загрузить уже загруженный файл, пакет Spring загрузит его во второй раз. Это нормально?

5. Нет, этого не должно быть, если предыдущий запуск не удался. Если вы попытаетесь запустить тот же экземпляр задания, который был завершен для данного файла, вы должны получить JobInstanceAlreadyCompleteException . Вы уверены, что нет инкрементера, который добавляет другой параметр?