#java #spring #multithreading
#java #spring #многопоточность
Вопрос:
У меня есть пружина, ThreadPoolTaskExecutor
я submit
выполняю некоторые Callable
задачи для этого Executor
.
Внутри Task
я использую dynamic Map
для установки некоторых значений. И Future
это Callable
может быть использовано для отмены этого потока. Перед запуском этого Callable
я инициализирую некоторые условия, которые аннулируются или возвращаются обратно, когда поток завершает выполнение.
Может быть случай, когда Task
не был запущен и он отменен. Это означает, что условия были инициализированы. Но когда поток, который не был запущен и все еще находится в пуле, отменяется, я не могу аннулировать свои инициализации, поскольку call
метод никогда не вызывается.
Я читал об этом, и если бы это был Runnable
поток, то я мог бы обработать его с помощью UncaughtExceptionHandler
. Или, если бы я использовал future.get()
для ожидания результата, тогда я мог бы справиться с ExecutionException
. Другое решение — переопределить afterExecute()
но я не смог найти это в ThreadPoolTaskExecutor
, также я не очень уверен в этом подходе.
Итак, как мне справиться с этим в этом случае?
Приведенный ниже код вызывается из демонического потока, ожидающего BlockingQueue
:
public void process(View view)
{
//getMapOfViewsAndFuture and getMapOfViewsPersistingLocks fetch the ConcurrentHashMaps
viewController.getMapOfViewsAndFuture().remove(view.getId());
viewController.getMapOfViewsPersistingLocks().put(view.getId(), new ReentrantLock());
Callable<WebResponse> calculatePI = (Callable<WebResponse>) mAppContext.getBean("piCalculator", view.getId()
,viewController.getMapOfViewsPersistingLocks().get(view.getId()), viewController.getMapOfViewsPrintingLocks().get(view.getId()));
Future<WebResponse> future = mExecutor.submit(calculatePI);
viewController.getMapOfViewsAndFuture().put(view.getId(), future);
}
Вызываемый объект (вычислитель) выглядит следующим образом:
class PICalculator implements Callable<WebResponse>
{
@Override
public WebResponse call()
{
try
{
//business logic
mWebResponse = getResponse();
}
catch(Exception e)
{
//log the exceprtion
}
finally
{
//remove this entry from datasets
viewController.getMapOfViewsAndFuture().remove(mViewId);
viewController.getListOfCalculatingViews().remove((Integer)mViewId);
viewController.getMapOfViewsPersistingLocks().remove(mViewId);
}
return mWebResponse;
}
}
Комментарии:
1. заключите весь блок кода метода call() в блок try{}catch{}finallly{}. В вашем коде даже для окончательного{} блокирования требуется try{}catch{}
Ответ №1:
Хороший вопрос, с помощью Spring ThreadPoolTaskExecutor
вы можете предоставить конфигурацию для ThreadPoolExecutor
(встроенной Java) like — corePoolSize, maxPoolSize
и т.д., Но вы не можете установить свой пользовательский ThreadPoolExecutor.
В этом случае я предложу вам использовать ConcurrentTaskExecutor от Spring,
-
Создайте объект Java ThreadPoolExecutor с требуемой конфигурацией. Смотрите ниже конструктор:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
Здесь вы должны предоставить свой собственный объект очереди.
-
Теперь вы можете переопределить метод afterExecute() для очистки настроек, как показано ниже:
ThreadPoolExecutor myExecutor = new ThreadPoolExecutor(3,5,5, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10)){ protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r,t); //Move your cleanup code here //remove this entry from datasets viewController.getMapOfViewsAndFuture().remove(mViewId); viewController.getListOfCalculatingViews().remove((Integer)mViewId); viewController.getMapOfViewsPersistingLocks().remove(mViewId); System.out.println("do cleanup here"); } };
-
Теперь установите этот исполнитель в объект ConcurrentTaskExecutor.
ConcurrentTaskExecutor.setConcurrentExecutor(myExecutor);
Вы даже можете передать свой executor в конструкторе ConcurrentTaskExecutor.
ConcurrentTaskExecutor taskExecutor = new ConcurrentTaskExecutor(myExecutor);
Теперь ваш taskExecutor
готов к обработке ваших задач, просто вызовите submit(callable)
метод для передачи вызываемых объектов / исполняемых объектов (задач). Надеюсь, это может вам помочь.