#java #spring-boot #spring-batch #spring-async
Вопрос:
Мне нужно запустить пакетное задание spring асинхронным методом, но я получил ленивое исключение в обработчике элементов:
Это мой Подход:
Служба, содержащая асинхронный метод, выглядит следующим образом:
@Service
@RequiredArgsConstructor
@Slf4j
public class BatchService {
@Qualifier(value = "attachmentsJob")
private final Job job;
@Qualifier(value = "asyncJobLauncher")
private final JobLauncher jobLauncher;
private final JobLogService jobLogService;
@Async
public void migrateAttachments() {
JobLogDB jobLogDB = jobLogService.createStartJobLog(job);
try {
Map<String, JobParameter> parameters = new HashMap<>();
parameters.put("jobId", new JobParameter(jobLogDB.getId()));
jobLauncher.run(job, new JobParameters(parameters));
} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
e.printStackTrace();
jobLogService.markJobAsFailed(jobLogDB.getId());
}
}
}
Конфигурация пакета является:
@Configuration
@RequiredArgsConstructor
@EnableBatchProcessing
public class BatchConfig {
private final JobBuilderFactory jobBuilderFactory;
private final JobRepository jobRepository;
private final StepBuilderFactory stepBuilderFactory;
private final MessageRepository messageRepository;
private final PlatformTransactionManager platformTransactionManager;
private final MessageDbAttachmentsProcessor messageDbAttachmentsProcessor;
@Bean(name = "asyncBatchTaskExecutor")
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(64);
executor.setMaxPoolSize(64);
executor.setQueueCapacity(64);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("MigrateAttachmentsThread-");
executor.initialize();
return executor;
}
/**
* Batch Reader
*/
@Bean
public RepositoryItemReader<MessageDB> reader() {
return new RepositoryItemReaderBuilder<MessageDB>().name("readerName")
.repository(messageRepository)
.methodName("findAllByTaskIdIsNotNull")
.pageSize(10)
.sorts(Collections.singletonMap("taskId", Sort.Direction.ASC))
.build();
}
/**
* Batch Processor
*/
@Bean
public AsyncItemProcessor<MessageDB, MessageDB> processor() {
AsyncItemProcessor<MessageDB, MessageDB> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(messageDbAttachmentsProcessor);
asyncItemProcessor.setTaskExecutor(taskExecutor());
return asyncItemProcessor;
}
/**
* Batch Writer
*/
@Bean
public RepositoryItemWriter<MessageDB> writer() {
RepositoryItemWriter repositoryItemWriter = new RepositoryItemWriter();
repositoryItemWriter.setRepository(messageRepository);
repositoryItemWriter.setMethodName("save");
return repositoryItemWriter;
}
@Bean
public AsyncItemWriter<MessageDB> asyncWriter() {
AsyncItemWriter<MessageDB> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(writer());
return asyncItemWriter;
}
@Bean(name = "attachmentsJob")
public Job migrateTaskAttachmentsJob(JobCompletionNotificationListener listener, Step step) {
return jobBuilderFactory.get("taskAttachmentsJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(step)
.end()
.build();
}
@Bean
public Step step() {
return stepBuilderFactory.get("step")
.<MessageDB, Future<MessageDB>>chunk(5)
.reader(reader())
.processor(processor())
.writer(asyncWriter())
.transactionManager(platformTransactionManager)
.build();
}
@Bean(name = "asyncJobLauncher")
public JobLauncher asyncJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(taskExecutor());
return jobLauncher;
}
}
Когда я вызываю метод в MessageDbAttachmentsProcessor, я получаю ленивое исключение:
org.hibernate.LazyInitializationException: failed to lazily initialize a collection of role: xxx.xxx.xxx.xxx, could not initialize proxy - no Session
at org.hibernate.collection.internal.AbstractPersistentCollection.throwLazyInitializationException(AbstractPersistentCollection.java:606)
at org.hibernate.collection.internal.AbstractPersistentCollection.withTemporarySessionIfNeeded(AbstractPersistentCollection.java:218)
at org.hibernate.collection.internal.AbstractPersistentCollection.initialize(AbstractPersistentCollection.java:585)
at org.hibernate.collection.internal.AbstractPersistentCollection.write(AbstractPersistentCollection.java:409)
at org.hibernate.collection.internal.PersistentBag.add(PersistentBag.java:407)
I tried to fix it by adding @Transactional(propagation = Propagation.REQUIRES_NEW) on migrateAttachments method but without success, after I added I got following exception:
java.lang.IllegalStateException: Existing transaction detected in JobRepository. Please fix this and try again (e.g. remove @Transactional annotations from client).
I do not know what to change and in which direction to modify. Any help is welcomed.