#java #actor #shutdown #executor
#java #актер #завершение работы #исполнитель
Вопрос:
Давайте создадим одну классику Executor
в приложении. Многие части приложения используют этот исполнитель для некоторых вычислений, каждое вычисление может быть отменено, для этого я могу вызвать shutdown()
или shutdownNow()
на Executor.
Но я хочу отключить только часть задач в Executor. К сожалению, я не могу получить доступ к Future
объектам, они являются частной частью реализации вычислений (фактически вычисления поддерживаются actor framework jetlang)
Я хочу что-то вроде оболочки исполнителя, которую я мог бы передать для вычисления и которая должна поддерживаться реальным исполнителем. Что-то вроде этого:
// main application executor
Executor applicationExecutor = Executors.newCachedThreadPool();
// starting computation
Executor computationExecutor = new ExecutorWrapper(applicationExecutor);
Computation computation = new Computation(computationExecutor);
computation.start();
// cancelling computation
computation.cancel();
// shutting down only computation tasks
computationExecutor.shutdown();
// applicationExecutor remains running and happy
Или любая другая идея?
Ответ №1:
Для тех, кто хочет хороших результатов: есть окончательное решение, частично основанное на ответе Ивана Сопова. К счастью, jetlang использует для выполнения своих задач только Executor
интерфейс (не ExecutorService
), поэтому я создаю класс-оболочку, который поддерживает остановку задач, созданных только этой оболочкой.
static class StoppableExecutor implements Executor {
final ExecutorService executor;
final List<Future<?>> futures = Lists.newArrayList();
boolean stopped;
public StoppableExecutor(ExecutorService executor) {
this.executor = executor;
}
void stop() {
this.stopped = true;
synchronized (futures) {
for (Iterator<Future<?>> iterator = futures.iterator(); iterator.hasNext();) {
Future<?> future = iterator.next();
if (!future.isDone() amp;amp; !future.isCancelled()) {
System.out.println(future.cancel(true));
}
}
futures.clear();
}
}
@Override
public void execute(Runnable command) {
if (!stopped) {
synchronized (futures) {
Future<?> newFuture = executor.submit(command);
for (Iterator<Future<?>> iterator = futures.iterator(); iterator.hasNext();) {
Future<?> future = iterator.next();
if (future.isDone() || future.isCancelled())
iterator.remove();
}
futures.add(newFuture);
}
}
}
}
Использовать это довольно просто:
ExecutorService service = Executors.newFixedThreadPool(5);
StoppableExecutor executor = new StoppableExecutor(service);
// doing some actor stuff with executor instance
PoolFiberFactory factory = new PoolFiberFactory(executor);
// stopping tasks only created on executor instance
// executor service is happily running other tasks
executor.stop();
Вот и все. Работает хорошо.
Ответ №2:
Как насчет того, чтобы ваш Computation
be a Runnable
(и запускался с использованием предоставленного Executor
) до тех пор, пока не будет установлен логический флаг? Что-то вроде :
public class Computation
{
boolean volatile stopped;
public void run(){
while(!stopped){
//do magic
}
public void cancel)(){stopped=true;}
}
То, что вы делаете, по сути, останавливает поток. Однако он не собирает мусор, а вместо этого используется повторно, поскольку он управляется исполнителем. Посмотрите «как правильно остановить поток?».
РЕДАКТИРОВАТЬ: пожалуйста, обратите внимание, что приведенный выше код довольно примитивен в том смысле, что он предполагает, что тело цикла while занимает короткое время. Если этого не произойдет, проверка будет выполняться нечасто, и вы заметите задержку между отменой задачи и ее фактической остановкой.
Комментарии:
1. Спасибо. Одно вычисление состоит из множества задач. Эти задачи являются довольно частными частями фреймворка, я могу только кодировать обратные вызовы, которые вызываются из этих задач. Обратные вызовы — это относительно длительные процессы. У меня нет выполняемых объектов, которые передаются исполнителю. И да, я хочу остановить потоки, которые принадлежат текущему вычислению.
2. Правильно. Чтобы остановить их (почти) немедленно, вам нужно прервать. Сделать это правильно немного сложно, особенно когда потоки, в которых выполняется задача, находятся в части пула. Я не могу объяснить это лучше, чем параллелизм Java Брайана Гетца на практике, глава 7.
3. Я знаю, что ваше решение является стандартным для остановки потоков. То, что я хочу, это что-то вроде «серебряной пули» 🙂 для ситуаций, когда я не могу ввести вычислительный код.
Ответ №3:
Что-то вроде этого? Вы можете выполнить частичное завершение работы:
for (Future<?> future : %ExecutorServiceWrapperInstance%.getFutures()) {
if (%CONDITION%) {
future.cancel(true);
}
}
Вот код:
package com.sopovs.moradanen;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class ExecutorServiceWrapper implements ExecutorService {
private final ExecutorService realService;
private List<Future<?>> futures = new ArrayList<Future<?>>();
public ExecutorServiceWrapper(ExecutorService realService) {
this.realService = realService;
}
@Override
public void execute(Runnable command) {
realService.execute(command);
}
@Override
public void shutdown() {
realService.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
return realService.shutdownNow();
}
@Override
public boolean isShutdown() {
return realService.isShutdown();
}
@Override
public boolean isTerminated() {
return realService.isTerminated();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return realService.awaitTermination(timeout, unit);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
Future<T> future = realService.submit(task);
synchronized (this) {
futures.add(future);
}
return future;
}
public synchronized List<Future<?>> getFutures() {
return Collections.unmodifiableList(futures);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
Future<T> future = realService.submit(task, result);
synchronized (this) {
futures.add(future);
}
return future;
}
@Override
public Future<?> submit(Runnable task) {
Future<?> future = realService.submit(task);
synchronized (this) {
futures.add(future);
}
return future;
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
List<Future<T>> future = realService.invokeAll(tasks);
synchronized (this) {
futures.addAll(future);
}
return future;
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
List<Future<T>> future = realService.invokeAll(tasks, timeout, unit);
synchronized (this) {
futures.addAll(future);
}
return future;
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
//don't know what to do here. Maybe this method is not needed by the framework
//than just throw new NotImplementedException();
return realService.invokeAny(tasks);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
//don't know what to do here. Maybe this method is not needed by the framework
//than just throw new NotImplementedException();
return realService.invokeAny(tasks, timeout, unit);
}
}
Комментарии:
1. Спасибо, наконец-то у меня есть собственное решение, которое очень слегка вдохновлено вашим ответом.