Построение DAG отменяемых задач Java

#java #asynchronous #future #futuretask

#java #асинхронный #будущее #будущая задача

Вопрос:

Я хочу создать DAG из задач на Java, где задачи могут зависеть от результатов других задач. Если между двумя задачами нет направленного пути, они могут выполняться параллельно. Задачи могут быть отменены. Если какая-либо задача вызывает исключение, все задачи отменяются.

Я хотел использовать CompleteableFuture для этого, но, несмотря на реализацию Future интерфейса (в том числе Future.cancel(boolean) , CompletableFuture не поддерживает отмену CompletableFuture.cancel(true) просто игнорируется. (Кто-нибудь знает почему?)

Поэтому я прибегаю к созданию своей собственной базы данных задач с использованием Future . Это очень шаблонно, и в нем сложно разобраться правильно. Есть ли какой-нибудь лучший метод, чем этот?

Вот один из примеров:

  1. Я хочу вызвать Process process = Runtime.getRuntime().exec(cmd) , чтобы запустить процесс командной строки, создав Future<Process> . Затем я хочу запустить (разветвить) три подзадачи:
    • Одна задача, которая потребляет входные данные от process.getInputStream()
    • Одна задача, которая потребляет входные данные от process.getErrorStream()
    • Одна задача, которая вызывает process.waitFor() , а затем ожидает результата.
  2. Затем я хочу дождаться завершения всех трех запущенных подзадач (т. Е. Барьер ввода-вывода / завершения). Это должно быть собрано в финале Future<Integer> exitCode , который собирает код выхода, возвращенный process.waitFor() задачей. Две входные потребительские задачи просто возвращаются Void , поэтому их вывод можно игнорировать, но барьер завершения все равно должен ждать их завершения.

Я хочу, чтобы сбой в любой из запущенных подзадач привел к отмене всех подзадач и уничтожению базового процесса.

Обратите внимание, что Process process = Runtime.getRuntime().exec(cmd) на первом шаге может возникнуть исключение, которое должно привести к каскадированию сбоя на всем пути exitCode .

 @FunctionalInterface
public static interface ConsumerThrowingIOException<T> {
    public void accept(T val) throws IOException;
}

public static Future<Integer> exec(
        ConsumerThrowingIOException<InputStream> stdoutConsumer,
        ConsumerThrowingIOException<InputStream> stderrConsumer,
        String... cmd) {

    Future<Process> processFuture = executor.submit(
            () -> Runtime.getRuntime().exec(cmd));

    AtomicReference<Future<Void>> stdoutProcessorFuture = //
            new AtomicReference<>();
    AtomicReference<Future<Void>> stderrProcessorFuture = //
            new AtomicReference<>();
    AtomicReference<Future<Integer>> exitCodeFuture = //
            new AtomicReference<>();

    Runnable cancel = () -> {
        try {
            processFuture.get().destroy();
        } catch (Exception e) {
            // Ignore (exitCodeFuture.get() will still detect exceptions)
        }
        if (stdoutProcessorFuture.get() != null) {
            stdoutProcessorFuture.get().cancel(true);
        }
        if (stderrProcessorFuture.get() != null) {
            stderrProcessorFuture.get().cancel(true);
        }
        if (exitCodeFuture.get() != null) {
            stderrProcessorFuture.get().cancel(true);
        }
    };

    if (stdoutConsumer != null) {
        stdoutProcessorFuture.set(executor.submit(() -> {
            try {
                InputStream inputStream = processFuture.get()
                        .getInputStream();
                stdoutConsumer.accept(inputStream != null
                        ? inputStream
                        : new ByteArrayInputStream(new byte[0]));
                return null;
            } catch (Exception e) {
                cancel.run();
                throw e;
            }
        }));
    }

    if (stderrConsumer != null) {
        stderrProcessorFuture.set(executor.submit(() -> {
            try {
                InputStream errorStream = processFuture.get()
                        .getErrorStream();
                stderrConsumer.accept(errorStream != null
                        ? errorStream
                        : new ByteArrayInputStream(new byte[0]));
                return null;
            } catch (Exception e) {
                cancel.run();
                throw e;
            }
        }));
    }

    exitCodeFuture.set(executor.submit(() -> {
        try {
            return processFuture.get().waitFor();
        } catch (Exception e) {
            cancel.run();
            throw e;
        }
    }));

    // Async completion barrier -- wait for process to exit,
    // and for output processors to complete
    return executor.submit(() -> {
        Exception exception = null;
        int exitCode = 1;
        try {
            exitCode = exitCodeFuture.get().get();
        } catch (InterruptedException | CancellationException
                | ExecutionException e) {
            cancel.run();
            exception = e;
        }
        if (stderrProcessorFuture.get() != null) {
            try {
                stderrProcessorFuture.get().get();
            } catch (InterruptedException | CancellationException
                    | ExecutionException e) {
                cancel.run();
                if (exception == null) {
                    exception = e;
                } else if (e instanceof ExecutionException) {
                    exception.addSuppressed(e);
                }
            }
        }
        if (stdoutProcessorFuture.get() != null) {
            try {
                stdoutProcessorFuture.get().get();
            } catch (InterruptedException | CancellationException
                    | ExecutionException e) {
                cancel.run();
                if (exception == null) {
                    exception = e;
                } else if (e instanceof ExecutionException) {
                    exception.addSuppressed(e);
                }
            }
        }
        if (exception != null) {
            throw exception;
        } else {
            return exitCode;
        }
    });
}
 

Примечание: я понимаю, что Runtime.getRuntime().exec(cmd) это должно быть неблокирующим, поэтому не требует своего собственного Future , но я все равно написал код, используя его, чтобы указать на построение DAG.

Комментарии:

1. CompletableFuture не поддерживает отмену, если только вы не выключите базовый пул и ваши потоки не отреагируют на прерывания. И разветвление — это просто flatMap ?

2. @Eugene тогда что CompletableFuture.cancel(true) делать? Да, конечно, потоки должны реагировать на прерывание, чтобы заставить это работать, но для примера, который я привел, все три подзадачи являются блокирующими, поэтому в любом случае нужно было бы перехватить InterruptedException внутренне и повторно использовать его как CompletionException . И я не думаю, что разветвление — это просто a flatMap , потому что для того, чтобы разветвление работало, три подзадачи должны быть запущены из родительской задачи — или по завершении родительской задачи — что добавляет сложности.

3. отмена не прерывает, прочитайте документацию. В нем буквально говорится, что

4. @Eugene Я обновил вопрос, чтобы использовать Future , а не CompletableFuture — спасибо за предупреждение.

Ответ №1:

Ни в коем случае. Процесс не имеет асинхронного интерфейса (за исключением Process.OnExit()). Таким образом, вы должны использовать потоки для ожидания создания процесса и во время чтения из входных потоков. Другие компоненты вашей DAG могут быть асинхронными задачами (CompletableFutures).

Это не большая проблема. Единственное преимущество асинхронных задач перед потоками — меньшее потребление памяти. Ваш процесс в любом случае потребляет много памяти, поэтому здесь нет особого смысла экономить память.

Комментарии:

1. Я думаю, вы пропустили весь смысл моего вопроса. Process запускает другой поток, который выполняется параллельно, хотя waitFor() и блокируется при выходе этого потока. Вся причина, по которой я пытаюсь использовать CompletableFuture , заключается в том, чтобы сделать код выхода из waitFor() и stdout / stderr из процесса доступными асинхронно.

2. stdout / stderr имеют только интерфейс блокировки, они недоступны асинхронно. Конечно, вы всегда можете преобразовать синхронный интерфейс в асинхронный (но не наоборот), но это было бы пустой тратой ресурсов.

3. waitFor уже возвращает код выхода, промежуточное завершение не требуется.