Почему ThreadPoolTaskScheduler запускает задачу несколько раз одновременно?

#java #spring

Вопрос:

Я разработал приложение mqtt с помощью spring и Netty. Периодически происходит MqttPingScheduleHandler отправка запросов на пинг. У него есть @PostConstruct метод, который запускает выполняемую задачу (внутренний класс) ThreadPoolTaskScheduler . Если ответ ping не получен в течение нескольких секунд (20 секунд), публикуется PingTimeoutEvent a. Вот как это происходит в первый раз.

 2021-09-21 18:58:41.861 ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler$RunnableTask run
FINE: Sent ping request. ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler$RunnableTask@14b83570. Message MqttMessage[fixedHeader=MqttFixedHeader[messageType=PINGREQ, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=0], variableHeader=, payload=].
2021-09-21 18:59:01.873 ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler$RunnableTask run
WARNING: Ping response was not received for keepAlive time. ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler$RunnableTask@14b83570
2021-09-21 18:59:01.875 ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler publishPingTimeoutEvent
WARNING: Publish PingTimeoutEvent. ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler@13b5ae8
2021-09-21 18:59:01.876 ru.maxeltr.mqttClient.Mqtt.MqttClientImpl reconnect
INFO: Start reconnect! Attempt 1.
 

Все ломается во второй раз, когда ответ на пинг снова не получен. Несколько событий публикуются с минимальной разницей во времени (1-3 мс). Количество одновременных запусков может достигать 10 и более.

 2021-09-21 19:01:28.778 ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler$RunnableTask run
FINE: Sent ping request. ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler$RunnableTask@6f6c557a. Message MqttMessage[fixedHeader=MqttFixedHeader[messageType=PINGREQ, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=0], variableHeader=, payload=].
2021-09-21 19:01:28.779 ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler$RunnableTask run <-- It should be run in 20 seconds!
WARNING: Ping response was not received for keepAlive time. ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler$RunnableTask@6f6c557a
2021-09-21 19:01:28.779 ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler publishPingTimeoutEvent
WARNING: Publish PingTimeoutEvent. ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler@4dbee29a
2021-09-21 19:01:28.779 ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler$RunnableTask run <-- It should be run in 40 seconds!
WARNING: Ping response was not received for keepAlive time. ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler$RunnableTask@6f6c557a
2021-09-21 19:01:28.779 ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler publishPingTimeoutEvent
WARNING: Publish PingTimeoutEvent. ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler@4dbee29a
2021-09-21 19:01:28.780 ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler$RunnableTask run <-- It should be run in 60 seconds and so on!
WARNING: Ping response was not received for keepAlive time. ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler$RunnableTask@6f6c557a
2021-09-21 19:01:28.780 ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler publishPingTimeoutEvent
WARNING: Publish PingTimeoutEvent. ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler@4dbee29a
2021-09-21 19:01:28.782 ru.maxeltr.mqttClient.Mqtt.MqttClientImpl reconnect
INFO: Start reconnect! Attempt 2.
 

Я не могу понять, что происходит. Я думаю run() , что метод многократно запущен ThreadPoolTaskScheduler . Почему run() метод выполняется несколько раз одновременно?

 /*
 * For scheduling a ping request with a fixed delay which is defined by config (MqttPingScheduleHandler)
 */
@Bean
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
    ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
    threadPoolTaskScheduler.setPoolSize(5);
    threadPoolTaskScheduler.setThreadNamePrefix("ThreadPoolTaskScheduler");
    return threadPoolTaskScheduler;
}

/*
 * For scheduling a ping request with a fixed delay which is defined by config (MqttPingScheduleHandler)
 */
@Bean
public PeriodicTrigger pingPeriodicTrigger(Config config) {
    String keepAliveTimer = config.getProperty("keepAliveTimer", "20");
    if (keepAliveTimer.trim().isEmpty()) {
        throw new IllegalStateException("Invalid keepAliveTimer property");
    }
    PeriodicTrigger periodicTrigger = new PeriodicTrigger(Long.parseLong(keepAliveTimer, 10), TimeUnit.SECONDS);
    periodicTrigger.setFixedRate(true);
    periodicTrigger.setInitialDelay(Long.parseLong(keepAliveTimer, 10));
    return periodicTrigger;
}
 
   

  public class MqttPingScheduleHandler extends ChannelInboundHandlerAdapter {
    
        private static final Logger logger = Logger.getLogger(MqttPingHandler.class.getName());
        private final Config config;
        private ApplicationEventPublisher applicationEventPublisher;
        private ThreadPoolTaskScheduler taskScheduler;
        private PeriodicTrigger periodicTrigger;
        private boolean pingRequestWasSent;
        private ChannelHandlerContext ctx;
        private ScheduledFuture<?> future;
        
        public MqttPingScheduleHandler(Config config, ThreadPoolTaskScheduler taskScheduler, PeriodicTrigger periodicTrigger, ApplicationEventPublisher applicationEventPublisher) {
            this.config = config;
            this.taskScheduler = taskScheduler;
            this.periodicTrigger = periodicTrigger;
            this.applicationEventPublisher = applicationEventPublisher;
            logger.log(Level.FINE, String.format("Create ping handler: %s", this));
            System.out.println(String.format("Create ping handler: %s", this));
        }
        
        @PostConstruct
        public void scheduleRunnableWithCronTrigger() {
            this.future = taskScheduler.schedule(new RunnableTask(), periodicTrigger);
        }
        
        private void publishPingTimeoutEvent() {
            applicationEventPublisher.publishEvent(new PingTimeoutEvent(this, "Ping response was not received for keepAlive time."));
            System.out.println(String.format("Publish PingTimeoutEvent. %s", this));
            logger.log(Level.WARNING, String.format("Publish PingTimeoutEvent. %s", this));
        }
        
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ...
            switch (message.fixedHeader().messageType()) {
                case PINGRESP:
                    logger.log(Level.FINE, String.format("Received ping response %s.", msg));
                    System.out.println(String.format("Received ping response %s.", msg));
                    this.pingRequestWasSent = false;
                    break;
            ...
        }
    
        class RunnableTask implements Runnable {
            public RunnableTask() {
                System.out.println(String.format("Create sending ping task in ping handler. %s", this));
                logger.log(Level.FINE, String.format("Create sending ping task in ping handler. %s.", this));
            }
    
            @Override
            public void run() {
                if (pingRequestWasSent) {
                    System.out.println(String.format("Ping response was not received for keepAlive time. %s", this));
                    logger.log(Level.WARNING, String.format("Ping response was not received for keepAlive time. %s", this));
                    publishPingTimeoutEvent();
                    return;
                }
    
                MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGREQ, false, MqttQoS.AT_MOST_ONCE, false, 0);
                MqttMessage msg = new MqttMessage(fixedHeader);
                ctx.writeAndFlush(msg);
                pingRequestWasSent = true;
                logger.log(Level.FINE, String.format("Sent ping request. %s. Message %s.", this, msg));
                System.out.println(String.format("Sent ping request. %s. Message %s.", this, msg));
            }
        }
    }
 

Ответ №1:

Проблема, похоже, в том periodicTrigger.setFixedRate(true) . Фиксированная скорость означает, что запланированное задание будет запущено независимо от того, было ли выполнено предыдущее задание; время измеряется с начала предыдущего выполнения. С другой стороны, фиксированная задержка означает, что следующее выполнение произойдет после завершения предыдущего выполнения; время измеряется с момента окончания предыдущего выполнения. Видеть PeriodicTrigger#nextExecutionTime

Комментарии:

1. Ты прав. метод run() запускается несколько раз из-за накопления выполнений в очереди.