Поток, спящий в пуле потоков

#java #multithreading #performance #threadpool #threadpoolexecutor

#java #многопоточность #Производительность #threadpool #threadpoolexecutor

Вопрос:

Допустим, у нас есть пул потоков с ограниченным числом потоков.

 Executor executor = Executors.newFixedThreadPool(3);
  

Теперь предположим, что одна из активных задач должна находиться в режиме ожидания в течение 3 секунд (по какой-либо причине).

 executor.execute(() -> {
    try {
        Thread.sleep(3000L);
    } catch (InterruptedException ignore) {}
});
  

Как мы можем реализовать такой пул потоков таким образом, чтобы, когда задача переходит в режим ожидания (или ожидает на мониторе / условии), поток1 можно было эффективно использовать для выполнения другой задачи?

1 Под потоком я не имею в виду «физический» поток Java, потому что это было бы невозможно, пока поток находится в режиме ожидания. Я имею в виду, что пул потоков должен иметь абстрактную реализацию, которая, по-видимому, практически позволяет потоку запускать другую задачу во время сна. Ключевым моментом является то, что всегда есть N одновременно запущенных (не спящих) задач.

Несколько похоже на то, как монитор обрабатывает доступ к критической области:

  • Если поток ожидает ресурс, ресурс может быть использован другим потоком.
  • Если поток уведомлен, он помещается в набор ожидающих для (повторного) получения доступа к этому ресурсу.

Комментарии:

1. Я думаю, заставьте его выполнять какую-то работу вместо того, чтобы спать.

2. @PavelSmirnov Вы можете построить рабочую абстракцию на этом? Тот, который учитывает случаи с ожиданием на ресурсе? Что, если выполняемое задание также переходит в спящий режим (выполняет другое задание), затем другое, затем еще одно и т.д… Тогда исходная задача никогда не получит возможности продолжить.

3. что ж, если у вас есть задание, для выполнения которого требуется блокировка, поток может попытаться получить блокировку и выполнить задание, только если оно выполнено успешно. В противном случае он может переключиться на другое задание, ожидающее в очереди, и попытаться выполнить это или вернуться к задаче исходного потока. Я хотел показать, что вам лучше выполнить какую-то работу, если это возможно, вместо того, чтобы просто спать.

4. Это напоминает мне сопрограммы в lua — посмотрите это. Это может послужить для вас источником вдохновения.

5. Интересный вопрос. Я не верю, что вы получите удовлетворительный ответ, поскольку вы в основном пытаетесь реализовать планировщик потоков поверх ОС. Вы могли бы получить, используя блокировки и P > K фоновых потоков с пользовательскими методами «сна».

Ответ №1:

То, о чем вы просите, по сути, реализует сопрограммы / волокна поверх потока JVM / OS. Санхонг Ли выступил с замечательным докладом о том, как инженеры Alibaba реализовали такую конструкцию — идея заключается в том, что вместо того, чтобы полагаться на планировщик потоков ОС, вам нужно полагаться на свой собственный селектор.

Смотрите также проект Loom для волокон (пользовательские зеленые потоки).

Ответ №2:

Я реализовал минимальный рабочий пример, который в основном делает то, что, я думаю, вы хотите.

Интерфейс задачи (очень похожий на интерфейс runnable, только с переданным контекстом для выполнения ожидания)

 package io.medev.stackoverflow;

import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;

public interface Task {

    /**
     * Wraps the given runnable into a Task with a not guessable execution time (meaning guessExecutionTime always returns Long.MAX_VALUE)
     * @param runnable The runnable to wrap
     * @return a Task wrapping this runnable
     */
    static Task wrap(Runnable runnable) {
        return wrap(runnable, Long.MAX_VALUE);
    }

    /**
     * Wraps the given runnable using the given guessedExecutionTimeMillis
     * @param runnable The runnable to wrap
     * @param guessedExecutionTimeMillis The guessed execution time in millis for this runnable
     * @return a Task wrapping this runnable
     */
    static Task wrap(Runnable runnable, long guessedExecutionTimeMillis) {
        return new Task() {
            @Override
            public long guessExecutionTimeMillis() {
                return guessedExecutionTimeMillis;
            }

            @Override
            public void run(Context context) {
                runnable.run();
            }
        };
    }

    /**
     * Should more or less guess how long this task will run
     * @return The execution time of this Task in milliseconds
     */
    long guessExecutionTimeMillis();

    void run(Context context);

    interface Context {

        /**
         * Block until the condition is met, giving other Tasks time to execute
         * @param condition the condition to check
         * @throws InterruptedException if the current thread is interrupted
         */
        void idle(BooleanSupplier condition) throws InterruptedException;

        /**
         * Blocks at least for the given duration, giving other Tasks time to execute
         * @param timeout
         * @param timeUnit
         * @throws InterruptedException if the current thread is interrupted
         */
        void idle(long timeout, TimeUnit timeUnit) throws InterruptedException;

        /**
         * Blocks until the condition is met or the timeout expires, giving other Tasks time to execute
         * @param condition the condition to check
         * @param timeout
         * @param timeUnit
         * @throws InterruptedException if the current thread is interrupted
         */
        void idle(BooleanSupplier condition, long timeout, TimeUnit timeUnit) throws InterruptedException;
    }
}
  

И базовый фиксированный исполнитель пула потоков — но здесь вы должны зависеть от конкретной реализации:

 package io.medev.stackoverflow;

import java.util.Comparator;
import java.util.concurrent.*;
import java.util.function.BooleanSupplier;

public class TimeEfficientExecutor implements Executor {

    private final BlockingQueue<Task> taskQueue;
    private final CountDownLatch latch;
    private volatile boolean alive;

    public TimeEfficientExecutor(int threads) {
        this.taskQueue = new PriorityBlockingQueue<>(10, Comparator.comparingLong(Task::guessExecutionTimeMillis));
        this.latch = new CountDownLatch(threads);
        this.alive = true;

        for (int i = 0; i < threads; i  ) {
            Thread thread = new Thread(new TimeEfficientExecutorRunnable());
            thread.start();
        }
    }

    @Override
    public void execute(Runnable runnable) {
        execute(Task.wrap(runnable));
    }

    public void execute(Runnable runnable, long guessedExecutionTimeMillis) {
        execute(Task.wrap(runnable, guessedExecutionTimeMillis));
    }

    public void execute(Task task) {
        this.taskQueue.offer(task);
    }

    public void shutdown() {
        this.alive = false;
    }

    public void awaitShutdown() throws InterruptedException {
        this.latch.await();
    }

    public void awaitShutdown(long timeout, TimeUnit timeUnit) throws InterruptedException {
        this.latch.await(timeout, timeUnit);
    }

    private class TimeEfficientExecutorRunnable implements Runnable {

        @Override
        public void run() {
            try {
                while (TimeEfficientExecutor.this.alive) {
                    Task task = TimeEfficientExecutor.this.taskQueue.poll();

                    if (task != null) {
                        try {
                            task.run(new IdleTaskContext());
                        } catch (Exception e) {
                            // TODO: logging
                        }
                    }
                }
            } finally {
                TimeEfficientExecutor.this.latch.countDown();
            }
        }
    }

    private class IdleTaskContext implements Task.Context {

        @Override
        public void idle(BooleanSupplier condition) throws InterruptedException {
            idle(condition, Long.MAX_VALUE);
        }

        @Override
        public void idle(long timeout, TimeUnit timeUnit) throws InterruptedException {
            idle(() -> false, timeout, timeUnit);
        }

        @Override
        public void idle(BooleanSupplier condition, long timeout, TimeUnit timeUnit) throws InterruptedException {
            idle(condition, System.currentTimeMillis()   timeUnit.toMillis(timeout));
        }

        private void idle(BooleanSupplier condition, long idleUntilTs) throws InterruptedException {
            long leftMillis = idleUntilTs - System.currentTimeMillis();

            while (TimeEfficientExecutor.this.alive amp;amp; !condition.getAsBoolean() amp;amp; leftMillis >= 1L) {
                Task task = TimeEfficientExecutor.this.taskQueue.poll(leftMillis, TimeUnit.MILLISECONDS);
                leftMillis = idleUntilTs - System.currentTimeMillis();

                if (task != null) {
                    if (leftMillis >= 1L amp;amp; task.guessExecutionTimeMillis() < leftMillis) {
                        task.run(new IdleTaskContext());
                    } else {
                        TimeEfficientExecutor.this.taskQueue.offer(task);
                    }
                }
            }
        }
    }
}
  

Обратите внимание, что вы не можете просто спуститься по стеку — и стек привязан к исполняющему потоку. Это означает, что невозможно вернуться к основной задаче ожидания, если какая-то «вспомогательная» задача начинает работать в режиме ожидания. Вы должны «доверять» тому, что каждая задача возвращает в guessExecutionTimeMillis -методе.

Благодаря PriorityQueue, используемому в исполнителе, очередь всегда будет возвращать задачу с наименьшим временем выполнения.