#spring-boot #quartz-scheduler #spring-rabbit
#весенняя загрузка #quartz-планировщик #spring-rabbit
Вопрос:
У меня есть приложение Spring Boot, в котором есть 5 RabbitListener. У меня также есть 2 задания планировщика Quartz, которые выполняются через регулярные промежутки времени. когда я запускаю свое приложение Spring Boot, кажется, что задания выполняются один раз, но затем остаются в состоянии запуска ЗАБЛОКИРОВАННЫМИ и больше не запускаются. Я использую Spring Boot 2.1.2 и Quartz 2.3.0. Кто-нибудь может посоветовать?
Раньше у меня было 4 прослушивателя Rabbit, и у меня не было этой проблемы, но добавление 5-го вызывает это. Я попытался удалить существующие прослушиватели Rabbit, поэтому осталось только 4 прослушивателя Rabbit, включая мой новый, и проблема исчезла.
Вот прослушиватель Rabbit, который я хочу добавить:
@Slf4j
@Component
public class MyListener
{
@Autowired
private MyProcess process;
@RabbitListener(queues = "#{T(com.earthport.elm.amqp.RabbitBinding).LIQUIDITY_BALANCE_CREDITED.getQueue()}",
containerFactory = "noDeadLetterRabbitListenerContainerFactory")
public void onMessage(MyMessage message)
{
if (log.isDebugEnabled())
{
log.debug("Received message: {}", message);
}
try
{
process.run(message);
}
catch (Exception e)
{
throw new AmqpRejectAndDontRequeueException(e);
}
}
}
Вот моя заводская конфигурация контейнера прослушивателя:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
Jackson2JsonMessageConverter messageConverter,
RetryAdviceChainFactory deadLetterRetryAdviceChainFactory)
{
final SimpleRabbitListenerContainerFactory cf = new SimpleRabbitListenerContainerFactory();
cf.setConnectionFactory(connectionFactory);
cf.setConcurrentConsumers(1);
cf.setErrorHandler(defaultErrorHandler);
cf.setAdviceChain(deadLetterRetryAdviceChainFactory.createDefaultRetryChain());
cf.setChannelTransacted(true);
cf.setTaskExecutor(taskExecutor);
cf.setMessageConverter(messageConverter);
cf.setTransactionManager(transactionManager);
return cf;
}
@Bean
public SimpleRabbitListenerContainerFactory noDeadLetterRabbitListenerContainerFactory(
Jackson2JsonMessageConverter messageConverter,
RetryAdviceChainFactory loggingRetryAdviceChainFactory)
{
final SimpleRabbitListenerContainerFactory cf = new SimpleRabbitListenerContainerFactory();
cf.setConnectionFactory(connectionFactory);
cf.setConcurrentConsumers(1);
cf.setErrorHandler(defaultErrorHandler);
cf.setAdviceChain(loggingRetryAdviceChainFactory.createRetryChain(
new NoBackOffPolicy(), new NeverRetryPolicy()));
cf.setChannelTransacted(true);
cf.setTaskExecutor(taskExecutor);
cf.setMessageConverter(messageConverter);
cf.setTransactionManager(transactionManager);
return cf;
}
Вот моя заводская конфигурация контейнера прослушивателя:
@Bean
public JobFactory jobFactory(ApplicationContext applicationContext)
{
AutowiringSpringBeanJobFactory jobFactory = new AutowiringSpringBeanJobFactory();
jobFactory.setApplicationContext(applicationContext);
return jobFactory;
}
@Bean
public SchedulerFactoryBean schedulerFactoryBean(
DataSource dataSource,
JobFactory jobFactory,
Executor taskExecutor,
PlatformTransactionManager platformTransactionManager)
{
Properties quartzProperties = new Properties();
quartzProperties.setProperty("org.quartz.scheduler.instanceName", "ELM-Scheduler");
quartzProperties.setProperty("org.quartz.scheduler.instanceId", "AUTO");
quartzProperties.setProperty("org.quartz.jobStore.class", "org.quartz.impl.jdbcjobstore.JobStoreTX");
quartzProperties.setProperty("org.quartz.jobStore.driverDelegateClass", "org.quartz.impl.jdbcjobstore.StdJDBCDelegate");
quartzProperties.setProperty("org.quartz.jobStore.useProperties", "true");
quartzProperties.setProperty("org.quartz.jobStore.tablePrefix", "QRTZ_");
quartzProperties.setProperty("org.quartz.jobStore.isClustered", "true");
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setOverwriteExistingJobs(true);
factory.setAutoStartup(true);
factory.setDataSource(dataSource);
factory.setJobFactory(jobFactory);
factory.setTaskExecutor(taskExecutor);
factory.setTransactionManager(platformTransactionManager);
factory.setQuartzProperties(quartzProperties);
if (jobs != null)
{
factory.setJobDetails(jobs.toArray(new JobDetail[jobs.size()]));
}
if (triggers != null amp;amp; schedulerEnabled)
{
factory.setTriggers(triggers.toArray(new Trigger[triggers.size()]));
}
return factory;
}
@Bean
public Scheduler scheduler(SchedulerFactoryBean schedulerFactoryBean)
{
return schedulerFactoryBean.getScheduler();
}
public static SimpleTriggerFactoryBean createTrigger(JobDetail jobDetail, long pollFrequencyMs, long startDelayMillis)
{
SimpleTriggerFactoryBean factoryBean = new SimpleTriggerFactoryBean();
factoryBean.setJobDetail(jobDetail);
factoryBean.setStartDelay(startDelayMillis);
factoryBean.setRepeatInterval(pollFrequencyMs);
factoryBean.setRepeatCount(SimpleTrigger.REPEAT_INDEFINITELY);
factoryBean.setMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_REMAINING_COUNT);
return factoryBean;
}
Вот общий TaskExcutor, используемый в моем Quartz и RabbitConfig:
@Configuration
@EnableAsync
public class AsyncConfig
{
@Value("${executor.maxPoolSize:50}")
private int maxPoolSize;
@Value("${executor.maxQueueCapcity:500}")
private int maxQueueCapacity;
@Bean
public Executor taskExecutor()
{
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(maxQueueCapacity);
return executor;
}
}
Я ожидаю, что мои запланированные задания будут выполняться через регулярные промежутки времени. Когда это работает, я вижу в своей таблице QRTZ_TRIGGERS, что TRIGGER_STATE не заблокирован. Однако при добавлении моего 5-го прослушивателя это значение блокируется и остается там.
Комментарии:
1. Я серьезно сомневаюсь, что они связаны, если вы не используете общий
TaskExecutor
доступ с ограниченным количеством потоков. Покажите заводскую конфигурацию вашего контейнера-прослушивателя.2. Я только что добавил конфигурацию к своему вопросу. Я также включил свою конфигурацию Quarts.
3. Также мои конфигурации Quartz и Rabbit используют один и тот же компонент TaskExecutor, который я добавил.
4. Какое
executor.maxPoolSize
свойство установлено? Если это больше 5, я не знаю, почему вы не получите более 5 потоков. Вы пробовали увеличиватьcorePoolSize
? Или используйте другой исполнитель для quartz. Вы можете использовать отладчик для просмотра очереди задач исполнителя задач.5. Значение executor.maxPoolSize по умолчанию равно 50. Я увеличил corePoolSize с 5 до 10, и это решило проблему. Спасибо.