Обработка исключения, вызванного вызываемым объектом из пула потоков

#java #spring #multithreading

#java #spring #многопоточность

Вопрос:

У меня есть пружина, ThreadPoolTaskExecutor я submit выполняю некоторые Callable задачи для этого Executor .

Внутри Task я использую dynamic Map для установки некоторых значений. И Future это Callable может быть использовано для отмены этого потока. Перед запуском этого Callable я инициализирую некоторые условия, которые аннулируются или возвращаются обратно, когда поток завершает выполнение.

Может быть случай, когда Task не был запущен и он отменен. Это означает, что условия были инициализированы. Но когда поток, который не был запущен и все еще находится в пуле, отменяется, я не могу аннулировать свои инициализации, поскольку call метод никогда не вызывается.

Я читал об этом, и если бы это был Runnable поток, то я мог бы обработать его с помощью UncaughtExceptionHandler . Или, если бы я использовал future.get() для ожидания результата, тогда я мог бы справиться с ExecutionException . Другое решение — переопределить afterExecute() но я не смог найти это в ThreadPoolTaskExecutor , также я не очень уверен в этом подходе.

Итак, как мне справиться с этим в этом случае?

Приведенный ниже код вызывается из демонического потока, ожидающего BlockingQueue :

 public void process(View view)
{
    //getMapOfViewsAndFuture and getMapOfViewsPersistingLocks fetch the ConcurrentHashMaps
    viewController.getMapOfViewsAndFuture().remove(view.getId());
    viewController.getMapOfViewsPersistingLocks().put(view.getId(), new ReentrantLock());
    Callable<WebResponse> calculatePI = (Callable<WebResponse>) mAppContext.getBean("piCalculator", view.getId()
        ,viewController.getMapOfViewsPersistingLocks().get(view.getId()), viewController.getMapOfViewsPrintingLocks().get(view.getId()));
    Future<WebResponse> future = mExecutor.submit(calculatePI);
    viewController.getMapOfViewsAndFuture().put(view.getId(), future);
}
  

Вызываемый объект (вычислитель) выглядит следующим образом:

 class PICalculator implements Callable<WebResponse>
{   
    @Override
    public WebResponse call()
    {
        try
        {
            //business logic
            mWebResponse = getResponse();
        }
        catch(Exception e)
        {
            //log the exceprtion
        }
        finally
        {
            //remove this entry from datasets
            viewController.getMapOfViewsAndFuture().remove(mViewId);
            viewController.getListOfCalculatingViews().remove((Integer)mViewId);
            viewController.getMapOfViewsPersistingLocks().remove(mViewId);
        }

        return mWebResponse;
    }
}
  

Комментарии:

1. заключите весь блок кода метода call() в блок try{}catch{}finallly{}. В вашем коде даже для окончательного{} блокирования требуется try{}catch{}

Ответ №1:

Хороший вопрос, с помощью Spring ThreadPoolTaskExecutor вы можете предоставить конфигурацию для ThreadPoolExecutor (встроенной Java) like — corePoolSize, maxPoolSize и т.д., Но вы не можете установить свой пользовательский ThreadPoolExecutor.

В этом случае я предложу вам использовать ConcurrentTaskExecutor от Spring,

  1. Создайте объект Java ThreadPoolExecutor с требуемой конфигурацией. Смотрите ниже конструктор:

      public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) 
      

Здесь вы должны предоставить свой собственный объект очереди.

  1. Теперь вы можете переопределить метод afterExecute() для очистки настроек, как показано ниже:

         ThreadPoolExecutor myExecutor = new ThreadPoolExecutor(3,5,5, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10)){
    
                protected void afterExecute(Runnable r, Throwable t) {
                    super.afterExecute(r,t);
                    //Move your cleanup code here
                    //remove this entry from datasets
                   viewController.getMapOfViewsAndFuture().remove(mViewId);
              viewController.getListOfCalculatingViews().remove((Integer)mViewId);
                   viewController.getMapOfViewsPersistingLocks().remove(mViewId);
                    System.out.println("do cleanup here");
               }
        };
      
  2. Теперь установите этот исполнитель в объект ConcurrentTaskExecutor.

     ConcurrentTaskExecutor.setConcurrentExecutor(myExecutor);
      

    Вы даже можете передать свой executor в конструкторе ConcurrentTaskExecutor.

       ConcurrentTaskExecutor taskExecutor = new ConcurrentTaskExecutor(myExecutor);   
      

Теперь ваш taskExecutor готов к обработке ваших задач, просто вызовите submit(callable) метод для передачи вызываемых объектов / исполняемых объектов (задач). Надеюсь, это может вам помочь.