#java #asynchronous #future #futuretask
#java #асинхронный #будущее #будущая задача
Вопрос:
Я хочу создать DAG из задач на Java, где задачи могут зависеть от результатов других задач. Если между двумя задачами нет направленного пути, они могут выполняться параллельно. Задачи могут быть отменены. Если какая-либо задача вызывает исключение, все задачи отменяются.
Я хотел использовать CompleteableFuture
для этого, но, несмотря на реализацию Future
интерфейса (в том числе Future.cancel(boolean)
, CompletableFuture
не поддерживает отмену — CompletableFuture.cancel(true)
просто игнорируется. (Кто-нибудь знает почему?)
Поэтому я прибегаю к созданию своей собственной базы данных задач с использованием Future
. Это очень шаблонно, и в нем сложно разобраться правильно. Есть ли какой-нибудь лучший метод, чем этот?
Вот один из примеров:
- Я хочу вызвать
Process process = Runtime.getRuntime().exec(cmd)
, чтобы запустить процесс командной строки, создавFuture<Process>
. Затем я хочу запустить (разветвить) три подзадачи:- Одна задача, которая потребляет входные данные от
process.getInputStream()
- Одна задача, которая потребляет входные данные от
process.getErrorStream()
- Одна задача, которая вызывает
process.waitFor()
, а затем ожидает результата.
- Одна задача, которая потребляет входные данные от
- Затем я хочу дождаться завершения всех трех запущенных подзадач (т. Е. Барьер ввода-вывода / завершения). Это должно быть собрано в финале
Future<Integer> exitCode
, который собирает код выхода, возвращенныйprocess.waitFor()
задачей. Две входные потребительские задачи просто возвращаютсяVoid
, поэтому их вывод можно игнорировать, но барьер завершения все равно должен ждать их завершения.
Я хочу, чтобы сбой в любой из запущенных подзадач привел к отмене всех подзадач и уничтожению базового процесса.
Обратите внимание, что Process process = Runtime.getRuntime().exec(cmd)
на первом шаге может возникнуть исключение, которое должно привести к каскадированию сбоя на всем пути exitCode
.
@FunctionalInterface
public static interface ConsumerThrowingIOException<T> {
public void accept(T val) throws IOException;
}
public static Future<Integer> exec(
ConsumerThrowingIOException<InputStream> stdoutConsumer,
ConsumerThrowingIOException<InputStream> stderrConsumer,
String... cmd) {
Future<Process> processFuture = executor.submit(
() -> Runtime.getRuntime().exec(cmd));
AtomicReference<Future<Void>> stdoutProcessorFuture = //
new AtomicReference<>();
AtomicReference<Future<Void>> stderrProcessorFuture = //
new AtomicReference<>();
AtomicReference<Future<Integer>> exitCodeFuture = //
new AtomicReference<>();
Runnable cancel = () -> {
try {
processFuture.get().destroy();
} catch (Exception e) {
// Ignore (exitCodeFuture.get() will still detect exceptions)
}
if (stdoutProcessorFuture.get() != null) {
stdoutProcessorFuture.get().cancel(true);
}
if (stderrProcessorFuture.get() != null) {
stderrProcessorFuture.get().cancel(true);
}
if (exitCodeFuture.get() != null) {
stderrProcessorFuture.get().cancel(true);
}
};
if (stdoutConsumer != null) {
stdoutProcessorFuture.set(executor.submit(() -> {
try {
InputStream inputStream = processFuture.get()
.getInputStream();
stdoutConsumer.accept(inputStream != null
? inputStream
: new ByteArrayInputStream(new byte[0]));
return null;
} catch (Exception e) {
cancel.run();
throw e;
}
}));
}
if (stderrConsumer != null) {
stderrProcessorFuture.set(executor.submit(() -> {
try {
InputStream errorStream = processFuture.get()
.getErrorStream();
stderrConsumer.accept(errorStream != null
? errorStream
: new ByteArrayInputStream(new byte[0]));
return null;
} catch (Exception e) {
cancel.run();
throw e;
}
}));
}
exitCodeFuture.set(executor.submit(() -> {
try {
return processFuture.get().waitFor();
} catch (Exception e) {
cancel.run();
throw e;
}
}));
// Async completion barrier -- wait for process to exit,
// and for output processors to complete
return executor.submit(() -> {
Exception exception = null;
int exitCode = 1;
try {
exitCode = exitCodeFuture.get().get();
} catch (InterruptedException | CancellationException
| ExecutionException e) {
cancel.run();
exception = e;
}
if (stderrProcessorFuture.get() != null) {
try {
stderrProcessorFuture.get().get();
} catch (InterruptedException | CancellationException
| ExecutionException e) {
cancel.run();
if (exception == null) {
exception = e;
} else if (e instanceof ExecutionException) {
exception.addSuppressed(e);
}
}
}
if (stdoutProcessorFuture.get() != null) {
try {
stdoutProcessorFuture.get().get();
} catch (InterruptedException | CancellationException
| ExecutionException e) {
cancel.run();
if (exception == null) {
exception = e;
} else if (e instanceof ExecutionException) {
exception.addSuppressed(e);
}
}
}
if (exception != null) {
throw exception;
} else {
return exitCode;
}
});
}
Примечание: я понимаю, что Runtime.getRuntime().exec(cmd)
это должно быть неблокирующим, поэтому не требует своего собственного Future
, но я все равно написал код, используя его, чтобы указать на построение DAG.
Комментарии:
1. CompletableFuture не поддерживает отмену, если только вы не выключите базовый пул и ваши потоки не отреагируют на прерывания. И разветвление — это просто
flatMap
?2. @Eugene тогда что
CompletableFuture.cancel(true)
делать? Да, конечно, потоки должны реагировать на прерывание, чтобы заставить это работать, но для примера, который я привел, все три подзадачи являются блокирующими, поэтому в любом случае нужно было бы перехватитьInterruptedException
внутренне и повторно использовать его какCompletionException
. И я не думаю, что разветвление — это просто aflatMap
, потому что для того, чтобы разветвление работало, три подзадачи должны быть запущены из родительской задачи — или по завершении родительской задачи — что добавляет сложности.3. отмена не прерывает, прочитайте документацию. В нем буквально говорится, что
4. @Eugene Я обновил вопрос, чтобы использовать
Future
, а неCompletableFuture
— спасибо за предупреждение.
Ответ №1:
Ни в коем случае. Процесс не имеет асинхронного интерфейса (за исключением Process.OnExit()). Таким образом, вы должны использовать потоки для ожидания создания процесса и во время чтения из входных потоков. Другие компоненты вашей DAG могут быть асинхронными задачами (CompletableFutures).
Это не большая проблема. Единственное преимущество асинхронных задач перед потоками — меньшее потребление памяти. Ваш процесс в любом случае потребляет много памяти, поэтому здесь нет особого смысла экономить память.
Комментарии:
1. Я думаю, вы пропустили весь смысл моего вопроса.
Process
запускает другой поток, который выполняется параллельно, хотяwaitFor()
и блокируется при выходе этого потока. Вся причина, по которой я пытаюсь использоватьCompletableFuture
, заключается в том, чтобы сделать код выхода изwaitFor()
и stdout / stderr из процесса доступными асинхронно.2. stdout / stderr имеют только интерфейс блокировки, они недоступны асинхронно. Конечно, вы всегда можете преобразовать синхронный интерфейс в асинхронный (но не наоборот), но это было бы пустой тратой ресурсов.
3. waitFor уже возвращает код выхода, промежуточное завершение не требуется.