Добавление 5-го прослушивателя Rabbit приводит к зависанию задания Quartz в заблокированном состоянии

#spring-boot #quartz-scheduler #spring-rabbit

#весенняя загрузка #quartz-планировщик #spring-rabbit

Вопрос:

У меня есть приложение Spring Boot, в котором есть 5 RabbitListener. У меня также есть 2 задания планировщика Quartz, которые выполняются через регулярные промежутки времени. когда я запускаю свое приложение Spring Boot, кажется, что задания выполняются один раз, но затем остаются в состоянии запуска ЗАБЛОКИРОВАННЫМИ и больше не запускаются. Я использую Spring Boot 2.1.2 и Quartz 2.3.0. Кто-нибудь может посоветовать?

Раньше у меня было 4 прослушивателя Rabbit, и у меня не было этой проблемы, но добавление 5-го вызывает это. Я попытался удалить существующие прослушиватели Rabbit, поэтому осталось только 4 прослушивателя Rabbit, включая мой новый, и проблема исчезла.

Вот прослушиватель Rabbit, который я хочу добавить:

 @Slf4j
@Component
public class MyListener
{
    @Autowired
    private MyProcess process;

    @RabbitListener(queues = "#{T(com.earthport.elm.amqp.RabbitBinding).LIQUIDITY_BALANCE_CREDITED.getQueue()}",
            containerFactory = "noDeadLetterRabbitListenerContainerFactory")
    public void onMessage(MyMessage message)
    {
        if (log.isDebugEnabled())
        {
            log.debug("Received message: {}", message);
        }

        try
        {
            process.run(message);
        }
        catch (Exception e)
        {
            throw new AmqpRejectAndDontRequeueException(e);
        }
    }
}
  

Вот моя заводская конфигурация контейнера прослушивателя:

     @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            Jackson2JsonMessageConverter messageConverter,
            RetryAdviceChainFactory deadLetterRetryAdviceChainFactory)
    {
        final SimpleRabbitListenerContainerFactory cf = new SimpleRabbitListenerContainerFactory();
        cf.setConnectionFactory(connectionFactory);
        cf.setConcurrentConsumers(1);
        cf.setErrorHandler(defaultErrorHandler);
        cf.setAdviceChain(deadLetterRetryAdviceChainFactory.createDefaultRetryChain());
        cf.setChannelTransacted(true);
        cf.setTaskExecutor(taskExecutor);
        cf.setMessageConverter(messageConverter);
        cf.setTransactionManager(transactionManager);
        return cf;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory noDeadLetterRabbitListenerContainerFactory(
            Jackson2JsonMessageConverter messageConverter,
            RetryAdviceChainFactory loggingRetryAdviceChainFactory)
    {
        final SimpleRabbitListenerContainerFactory cf = new SimpleRabbitListenerContainerFactory();
        cf.setConnectionFactory(connectionFactory);
        cf.setConcurrentConsumers(1);
        cf.setErrorHandler(defaultErrorHandler);
        cf.setAdviceChain(loggingRetryAdviceChainFactory.createRetryChain(
                new NoBackOffPolicy(), new NeverRetryPolicy()));
        cf.setChannelTransacted(true);
        cf.setTaskExecutor(taskExecutor);
        cf.setMessageConverter(messageConverter);
        cf.setTransactionManager(transactionManager);
        return cf;
    }
  

Вот моя заводская конфигурация контейнера прослушивателя:

     @Bean
    public JobFactory jobFactory(ApplicationContext applicationContext)
    {
        AutowiringSpringBeanJobFactory jobFactory = new AutowiringSpringBeanJobFactory();
        jobFactory.setApplicationContext(applicationContext);
        return jobFactory;
    }

    @Bean
    public SchedulerFactoryBean schedulerFactoryBean(
            DataSource dataSource,
            JobFactory jobFactory,
            Executor taskExecutor,
            PlatformTransactionManager platformTransactionManager)
    {
        Properties quartzProperties = new Properties();

        quartzProperties.setProperty("org.quartz.scheduler.instanceName", "ELM-Scheduler");
        quartzProperties.setProperty("org.quartz.scheduler.instanceId", "AUTO");
        quartzProperties.setProperty("org.quartz.jobStore.class", "org.quartz.impl.jdbcjobstore.JobStoreTX");
        quartzProperties.setProperty("org.quartz.jobStore.driverDelegateClass", "org.quartz.impl.jdbcjobstore.StdJDBCDelegate");
        quartzProperties.setProperty("org.quartz.jobStore.useProperties", "true");
        quartzProperties.setProperty("org.quartz.jobStore.tablePrefix", "QRTZ_");
        quartzProperties.setProperty("org.quartz.jobStore.isClustered", "true");

        SchedulerFactoryBean factory = new SchedulerFactoryBean();
        factory.setOverwriteExistingJobs(true);
        factory.setAutoStartup(true);
        factory.setDataSource(dataSource);
        factory.setJobFactory(jobFactory);
        factory.setTaskExecutor(taskExecutor);
        factory.setTransactionManager(platformTransactionManager);
        factory.setQuartzProperties(quartzProperties);

        if (jobs != null)
        {
            factory.setJobDetails(jobs.toArray(new JobDetail[jobs.size()]));
        }

        if (triggers != null amp;amp; schedulerEnabled)
        {
            factory.setTriggers(triggers.toArray(new Trigger[triggers.size()]));
        }

        return factory;
    }


    @Bean
    public Scheduler scheduler(SchedulerFactoryBean schedulerFactoryBean)
    {
        return schedulerFactoryBean.getScheduler();
    }

    public static SimpleTriggerFactoryBean createTrigger(JobDetail jobDetail, long pollFrequencyMs, long startDelayMillis)
    {
        SimpleTriggerFactoryBean factoryBean = new SimpleTriggerFactoryBean();
        factoryBean.setJobDetail(jobDetail);
        factoryBean.setStartDelay(startDelayMillis);
        factoryBean.setRepeatInterval(pollFrequencyMs);
        factoryBean.setRepeatCount(SimpleTrigger.REPEAT_INDEFINITELY);
        factoryBean.setMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_REMAINING_COUNT);
        return factoryBean;
    }
  

Вот общий TaskExcutor, используемый в моем Quartz и RabbitConfig:

 @Configuration
@EnableAsync
public class AsyncConfig
{
    @Value("${executor.maxPoolSize:50}")
    private int maxPoolSize;

    @Value("${executor.maxQueueCapcity:500}")
    private int maxQueueCapacity;

    @Bean
    public Executor taskExecutor()
    {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(maxQueueCapacity);
        return executor;
    }
}
  

Я ожидаю, что мои запланированные задания будут выполняться через регулярные промежутки времени. Когда это работает, я вижу в своей таблице QRTZ_TRIGGERS, что TRIGGER_STATE не заблокирован. Однако при добавлении моего 5-го прослушивателя это значение блокируется и остается там.

Комментарии:

1. Я серьезно сомневаюсь, что они связаны, если вы не используете общий TaskExecutor доступ с ограниченным количеством потоков. Покажите заводскую конфигурацию вашего контейнера-прослушивателя.

2. Я только что добавил конфигурацию к своему вопросу. Я также включил свою конфигурацию Quarts.

3. Также мои конфигурации Quartz и Rabbit используют один и тот же компонент TaskExecutor, который я добавил.

4. Какое executor.maxPoolSize свойство установлено? Если это больше 5, я не знаю, почему вы не получите более 5 потоков. Вы пробовали увеличивать corePoolSize ? Или используйте другой исполнитель для quartz. Вы можете использовать отладчик для просмотра очереди задач исполнителя задач.

5. Значение executor.maxPoolSize по умолчанию равно 50. Я увеличил corePoolSize с 5 до 10, и это решило проблему. Спасибо.