Запуск пакетного задания Spring в асинхронном методе вызывает ленивое исключение

#java #spring-boot #spring-batch #spring-async

Вопрос:

Мне нужно запустить пакетное задание spring асинхронным методом, но я получил ленивое исключение в обработчике элементов:

Это мой Подход:

Служба, содержащая асинхронный метод, выглядит следующим образом:

 @Service
@RequiredArgsConstructor
@Slf4j
public class BatchService {

    @Qualifier(value = "attachmentsJob")
    private final Job job;
    @Qualifier(value = "asyncJobLauncher")
    private final JobLauncher jobLauncher;
    private final JobLogService jobLogService;

    @Async
    public void migrateAttachments() {
        JobLogDB jobLogDB = jobLogService.createStartJobLog(job);
        try {
            Map<String, JobParameter> parameters = new HashMap<>();
            parameters.put("jobId", new JobParameter(jobLogDB.getId()));
            jobLauncher.run(job, new JobParameters(parameters));
        } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
            e.printStackTrace();
            jobLogService.markJobAsFailed(jobLogDB.getId());
        }
    }
}
 

Конфигурация пакета является:

 @Configuration
@RequiredArgsConstructor
@EnableBatchProcessing
public class BatchConfig {

    private final JobBuilderFactory jobBuilderFactory;
    private final JobRepository jobRepository;
    private final StepBuilderFactory stepBuilderFactory;
    private final MessageRepository messageRepository;
    private final PlatformTransactionManager platformTransactionManager;
    private final MessageDbAttachmentsProcessor messageDbAttachmentsProcessor;

    @Bean(name = "asyncBatchTaskExecutor")
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(64);
        executor.setMaxPoolSize(64);
        executor.setQueueCapacity(64);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setThreadNamePrefix("MigrateAttachmentsThread-");
        executor.initialize();
        return executor;
    }

    /**
     * Batch Reader
     */
    @Bean
    public RepositoryItemReader<MessageDB> reader() {
        return new RepositoryItemReaderBuilder<MessageDB>().name("readerName")
                .repository(messageRepository)
                .methodName("findAllByTaskIdIsNotNull")
                .pageSize(10)
                .sorts(Collections.singletonMap("taskId", Sort.Direction.ASC))
                .build();
    }

    /**
     * Batch Processor
     */
    @Bean
    public AsyncItemProcessor<MessageDB, MessageDB> processor() {
        AsyncItemProcessor<MessageDB, MessageDB> asyncItemProcessor = new AsyncItemProcessor<>();
        asyncItemProcessor.setDelegate(messageDbAttachmentsProcessor);
        asyncItemProcessor.setTaskExecutor(taskExecutor());
        return asyncItemProcessor;
    }

    /**
     * Batch Writer
     */
    @Bean
    public RepositoryItemWriter<MessageDB> writer() {
        RepositoryItemWriter repositoryItemWriter = new RepositoryItemWriter();
        repositoryItemWriter.setRepository(messageRepository);
        repositoryItemWriter.setMethodName("save");
        return repositoryItemWriter;
    }

    @Bean
    public AsyncItemWriter<MessageDB> asyncWriter() {
        AsyncItemWriter<MessageDB> asyncItemWriter = new AsyncItemWriter<>();
        asyncItemWriter.setDelegate(writer());
        return asyncItemWriter;
    }

    @Bean(name = "attachmentsJob")
    public Job migrateTaskAttachmentsJob(JobCompletionNotificationListener listener, Step step) {
        return jobBuilderFactory.get("taskAttachmentsJob")
                .incrementer(new RunIdIncrementer())
                .listener(listener)
                .flow(step)
                .end()
                .build();
    }

    @Bean
    public Step step() {
        return stepBuilderFactory.get("step")
                .<MessageDB, Future<MessageDB>>chunk(5)
                .reader(reader())
                .processor(processor())
                .writer(asyncWriter())
                .transactionManager(platformTransactionManager)
                .build();
    }

    @Bean(name = "asyncJobLauncher")
    public JobLauncher asyncJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(taskExecutor());
        return jobLauncher;
    }
}
 

Когда я вызываю метод в MessageDbAttachmentsProcessor, я получаю ленивое исключение:

 org.hibernate.LazyInitializationException: failed to lazily initialize a collection of role: xxx.xxx.xxx.xxx, could not initialize proxy - no Session
    at org.hibernate.collection.internal.AbstractPersistentCollection.throwLazyInitializationException(AbstractPersistentCollection.java:606)
    at org.hibernate.collection.internal.AbstractPersistentCollection.withTemporarySessionIfNeeded(AbstractPersistentCollection.java:218)
    at org.hibernate.collection.internal.AbstractPersistentCollection.initialize(AbstractPersistentCollection.java:585)
    at org.hibernate.collection.internal.AbstractPersistentCollection.write(AbstractPersistentCollection.java:409)
    at org.hibernate.collection.internal.PersistentBag.add(PersistentBag.java:407)

 

I tried to fix it by adding @Transactional(propagation = Propagation.REQUIRES_NEW) on migrateAttachments method but without success, after I added I got following exception:

 java.lang.IllegalStateException: Existing transaction detected in JobRepository. Please fix this and try again (e.g. remove @Transactional annotations from client).
 

I do not know what to change and in which direction to modify. Any help is welcomed.