#java #spring
Вопрос:
Я разработал приложение mqtt с помощью spring и Netty. Периодически происходит MqttPingScheduleHandler
отправка запросов на пинг. У него есть @PostConstruct
метод, который запускает выполняемую задачу (внутренний класс) ThreadPoolTaskScheduler
. Если ответ ping не получен в течение нескольких секунд (20 секунд), публикуется PingTimeoutEvent
a. Вот как это происходит в первый раз.
2021-09-21 18:58:41.861 ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler$RunnableTask run
FINE: Sent ping request. ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler$RunnableTask@14b83570. Message MqttMessage[fixedHeader=MqttFixedHeader[messageType=PINGREQ, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=0], variableHeader=, payload=].
2021-09-21 18:59:01.873 ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler$RunnableTask run
WARNING: Ping response was not received for keepAlive time. ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler$RunnableTask@14b83570
2021-09-21 18:59:01.875 ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler publishPingTimeoutEvent
WARNING: Publish PingTimeoutEvent. ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler@13b5ae8
2021-09-21 18:59:01.876 ru.maxeltr.mqttClient.Mqtt.MqttClientImpl reconnect
INFO: Start reconnect! Attempt 1.
Все ломается во второй раз, когда ответ на пинг снова не получен. Несколько событий публикуются с минимальной разницей во времени (1-3 мс). Количество одновременных запусков может достигать 10 и более.
2021-09-21 19:01:28.778 ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler$RunnableTask run
FINE: Sent ping request. ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler$RunnableTask@6f6c557a. Message MqttMessage[fixedHeader=MqttFixedHeader[messageType=PINGREQ, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=0], variableHeader=, payload=].
2021-09-21 19:01:28.779 ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler$RunnableTask run <-- It should be run in 20 seconds!
WARNING: Ping response was not received for keepAlive time. ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler$RunnableTask@6f6c557a
2021-09-21 19:01:28.779 ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler publishPingTimeoutEvent
WARNING: Publish PingTimeoutEvent. ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler@4dbee29a
2021-09-21 19:01:28.779 ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler$RunnableTask run <-- It should be run in 40 seconds!
WARNING: Ping response was not received for keepAlive time. ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler$RunnableTask@6f6c557a
2021-09-21 19:01:28.779 ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler publishPingTimeoutEvent
WARNING: Publish PingTimeoutEvent. ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler@4dbee29a
2021-09-21 19:01:28.780 ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler$RunnableTask run <-- It should be run in 60 seconds and so on!
WARNING: Ping response was not received for keepAlive time. ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler$RunnableTask@6f6c557a
2021-09-21 19:01:28.780 ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler publishPingTimeoutEvent
WARNING: Publish PingTimeoutEvent. ru.maxeltr.mqttClient.Mqtt.MqttPingScheduleHandler@4dbee29a
2021-09-21 19:01:28.782 ru.maxeltr.mqttClient.Mqtt.MqttClientImpl reconnect
INFO: Start reconnect! Attempt 2.
Я не могу понять, что происходит. Я думаю run()
, что метод многократно запущен ThreadPoolTaskScheduler
. Почему run()
метод выполняется несколько раз одновременно?
/*
* For scheduling a ping request with a fixed delay which is defined by config (MqttPingScheduleHandler)
*/
@Bean
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(5);
threadPoolTaskScheduler.setThreadNamePrefix("ThreadPoolTaskScheduler");
return threadPoolTaskScheduler;
}
/*
* For scheduling a ping request with a fixed delay which is defined by config (MqttPingScheduleHandler)
*/
@Bean
public PeriodicTrigger pingPeriodicTrigger(Config config) {
String keepAliveTimer = config.getProperty("keepAliveTimer", "20");
if (keepAliveTimer.trim().isEmpty()) {
throw new IllegalStateException("Invalid keepAliveTimer property");
}
PeriodicTrigger periodicTrigger = new PeriodicTrigger(Long.parseLong(keepAliveTimer, 10), TimeUnit.SECONDS);
periodicTrigger.setFixedRate(true);
periodicTrigger.setInitialDelay(Long.parseLong(keepAliveTimer, 10));
return periodicTrigger;
}
public class MqttPingScheduleHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = Logger.getLogger(MqttPingHandler.class.getName());
private final Config config;
private ApplicationEventPublisher applicationEventPublisher;
private ThreadPoolTaskScheduler taskScheduler;
private PeriodicTrigger periodicTrigger;
private boolean pingRequestWasSent;
private ChannelHandlerContext ctx;
private ScheduledFuture<?> future;
public MqttPingScheduleHandler(Config config, ThreadPoolTaskScheduler taskScheduler, PeriodicTrigger periodicTrigger, ApplicationEventPublisher applicationEventPublisher) {
this.config = config;
this.taskScheduler = taskScheduler;
this.periodicTrigger = periodicTrigger;
this.applicationEventPublisher = applicationEventPublisher;
logger.log(Level.FINE, String.format("Create ping handler: %s", this));
System.out.println(String.format("Create ping handler: %s", this));
}
@PostConstruct
public void scheduleRunnableWithCronTrigger() {
this.future = taskScheduler.schedule(new RunnableTask(), periodicTrigger);
}
private void publishPingTimeoutEvent() {
applicationEventPublisher.publishEvent(new PingTimeoutEvent(this, "Ping response was not received for keepAlive time."));
System.out.println(String.format("Publish PingTimeoutEvent. %s", this));
logger.log(Level.WARNING, String.format("Publish PingTimeoutEvent. %s", this));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
...
switch (message.fixedHeader().messageType()) {
case PINGRESP:
logger.log(Level.FINE, String.format("Received ping response %s.", msg));
System.out.println(String.format("Received ping response %s.", msg));
this.pingRequestWasSent = false;
break;
...
}
class RunnableTask implements Runnable {
public RunnableTask() {
System.out.println(String.format("Create sending ping task in ping handler. %s", this));
logger.log(Level.FINE, String.format("Create sending ping task in ping handler. %s.", this));
}
@Override
public void run() {
if (pingRequestWasSent) {
System.out.println(String.format("Ping response was not received for keepAlive time. %s", this));
logger.log(Level.WARNING, String.format("Ping response was not received for keepAlive time. %s", this));
publishPingTimeoutEvent();
return;
}
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGREQ, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessage msg = new MqttMessage(fixedHeader);
ctx.writeAndFlush(msg);
pingRequestWasSent = true;
logger.log(Level.FINE, String.format("Sent ping request. %s. Message %s.", this, msg));
System.out.println(String.format("Sent ping request. %s. Message %s.", this, msg));
}
}
}
Ответ №1:
Проблема, похоже, в том periodicTrigger.setFixedRate(true)
. Фиксированная скорость означает, что запланированное задание будет запущено независимо от того, было ли выполнено предыдущее задание; время измеряется с начала предыдущего выполнения. С другой стороны, фиксированная задержка означает, что следующее выполнение произойдет после завершения предыдущего выполнения; время измеряется с момента окончания предыдущего выполнения. Видеть PeriodicTrigger#nextExecutionTime
Комментарии:
1. Ты прав. метод run() запускается несколько раз из-за накопления выполнений в очереди.