Как отправить сообщение, чтобы остановить опрос и выйти из приложения

#java #spring #spring-integration

#java #весна #spring-интеграция

Вопрос:

У меня есть приложение Spring integration, и мне нужно закрыть его после обработки всех данных. Если я явно вызываю appContext.close() , то не все данные могут быть обработаны вовремя (если я не установил Thread.sleep() ). Если я не вызываю close в контексте приложения, приложение не останавливается, поскольку у меня есть фоновый опрос, который не позволяет приложению закрываться автоматически. Итак, как подать сигнал на остановку всего приложения в одном из моих активаторов службы (последнем в цепочке обработки)?

  1. первый компонент считывает данные из хранилища построчно и отправляет их gateway.send(data) через цикл while
  2. цепочка обработки, которая происходит параллельно
  3. затем все потоки отправляют сообщения в один поток через опрашиваемую очередь
  4. И здесь я должен остановить приложение, если я понимаю, что все сообщения, прочитанные в первом компоненте, были обработаны

Я попытался остановить последний активатор службы с помощью controlbus, но это не помогло

Спасибо

Обновить

вот несколько примеров кода:

Бегун:

 public class Runner {
static Logger log = LoggerFactory.getLogger(Service2.class);

public static void main(String[] args) throws InterruptedException {
    log.info("START APP");
    AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class);
    RootService service = context.getBean(RootService.class);
    service.start();
    service.stop();
    context.close();
    log.info("END APP");
}
  

}

RootService:

     @Component
public class RootService {

    Logger log = LoggerFactory.getLogger(RootService.class);

    @Autowired
    MyGateway gateway;

    int totalSize = 0;

    public void start() {
        List<String> source = generateSource();
        totalSize = source.size();
        //imitate very long but finite process
        for (String s : source) {
            gateway.send(s, totalSize);
        }
        log.info("end sending data");
    }

    public void stop() throws InterruptedException {
        log.info("sending stop signal...");
        while (gateway.sendStop(totalSize)<0) {
            Thread.sleep(100);
            log.info("sending stop signal...");
        }
        log.info("THE END");
    }


    private List<String> generateSource() {
        List<String> result = new ArrayList<String>();
        for (int i = 0; i < 15; i  ) {
            result.add("data"   i);
        }
        return resu<
    }
}
  

Сервис1

 @Component
public class Service1 {

    public String dodo(String data) throws InterruptedException {
        //doing a job in parallel
        Thread.sleep(100);
        return data   "-"   Thread.currentThread().getName();
    }
}
  

Service2:

     @Component
public class Service2 {
    Logger log = LoggerFactory.getLogger(Service2.class);
    int counter = 0;

    public void dodo(String data) {
        log.info("data: {}-{}", data, Thread.currentThread().getName());
        counter  ;
        log.info("counter: {}", counter);
    }

    public Integer dodo(Integer data) {
        if (counter < data) {
            return -1;
        } else {
            return 0;
        }
    }
}


@Component
public class ErrorHandler {

    Logger log = LoggerFactory.getLogger(Service2.class);

    public void handleError(Message<?> message) {
        log.info("ERROR: {}", message);
    }
}
  

и xml config

 <?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd"
             xmlns:beans="http://www.springframework.org/schema/beans"
             xmlns:task="http://www.springframework.org/schema/task">


    <gateway id="myGateway"
             service-interface="com.dimas.MyGateway"
             default-request-channel="channel1"
             error-channel="errorChannel"
             default-reply-timeout="3000">
        <method name="send" request-channel="channel1"/>
        <method name="sendStop" request-channel="channel2" reply-channel="channel3"/>
    </gateway>

    <channel id="channel1">
        <dispatcher task-executor="executor"/>
    </channel>
    <channel id="channel2">
        <queue/>
    </channel>
    <channel id="channel3">
        <queue/>
    </channel>
    <channel id="errorChannel"/>

    <service-activator input-channel="errorChannel" ref="errorHandler" method="handleError"/>

    <service-activator input-channel="channel1" output-channel="channel2" ref="service1"/>
    <service-activator input-channel="channel2" output-channel="channel3" ref="service2">
        <poller fixed-delay="0"/>
    </service-activator>

    <task:executor id="executor" pool-size="2"/>
</beans:beans>
  

Теперь это работает — приложение останавливается, когда обрабатываются все данные, и возвращает код остановки 0, но я вижу в журнале много ошибок, таких как:

Сообщение об ошибке [полезная нагрузка=org.springframework.messaging.core.Исключение DestinationResolutionException: заголовок выходного канала или канала ответа недоступен, headers={id=639ca939-8110-4486- 6a2b-5d36c7bfdbcd, временная метка=1475872251269}]

Обработчик ошибок успешно перехватил его, но что-то не так

Я понял, в чем проблема. Последний Service2 возвращает soemthing для любого сообщения о доходах. Переделал его так, чтобы он отвечал кодом только на запрос stopRequest.

Просто интересно, есть ли более простое решение

Ответ №1:

Непонятно, почему, если вы можете обнаружить, что вы завершили, вы не можете закрыть контекст приложения.

Вы можете заменить значение по умолчанию taskScheduler на значение, использующее потоки демона.

Редактировать

Прежде всего, вам не нужны все эти каналы очередей; простое использование channel1 в качестве канала-исполнителя даст вам параллелизм — использование каналов очередей после этого просто добавляет накладные расходы.

В любом случае, чтобы определить, когда процесс завершен, просто добавьте getCounter() метод в Service2; затем в вашем основном методе get service2 из контекста и подождите, пока счетчик не увеличится до ожидаемого числа.

Или вы можете добавить защелку обратного отсчета в service2 — добавьте метод для ее установки…

 CountDownLatch latch = new CountDownLatch(source.size());
service2.setCountDownLatch(latch);
for (

...
if (!latch.await(...)) { // add a timeout in case is never completes)
    // failure
}
  

Комментарии:

1. я могу обнаружить, что я опрашиваю все записи из источника, но мне все равно нужно подождать, пока все они будут обработаны. Я добавил еще один вызов к своему шлюзу, где я пингую последний активатор службы, если он обработал последнее сообщение, если да-> закрыть нет-> сон и снова пинг. Но у меня много неизвестного канала вывода или ответа

2. Извините — вашу ситуацию трудно понять; пожалуйста, предоставьте пример конфигурации и в чем именно проблема.