Как расставить приоритеты в ожидании завершения по времени доступа вместо времени создания?

#java #performance #asynchronous #priority-queue #completable-future

#java #Производительность #асинхронный #приоритет-очередь #завершаемое-будущее

Вопрос:

TL;доктор: когда несколько CompletableFuture С ждут, чтобы выполняться, как я могу расставить приоритеты тех, чьи ценности я заинтересован в?

У меня есть список из 10 000 CompletableFuture секунд (которые вычисляют строки данных для внутреннего отчета по базе данных продукта):

 List<Product&&t; products = ...;

List<CompletableFuture<DataRow&&t;&&t; dataRows = products
    .stream()
    .map(p -&&t; CompletableFuture.supplyAsync(() -&&t; calculateDataRowForProduct(p), sin&leThreadedExecutor))
    .collect(Collectors.toList());
  

Для завершения каждого из них требуется около 50 мс, так что все это завершается за 500 секунд. (все они используют одно и то же подключение к БД, поэтому не могут выполняться параллельно).

Допустим, я хочу получить доступ к строке данных 9000-го продукта: dataRows.&et(9000).join()

Проблема в том, что все эти завершаемые события выполняются в том порядке, в котором они были созданы, а не в том порядке, в котором к ним осуществляется доступ. Это означает, что мне нужно подождать 450 секунд, пока он вычислит материал, который на данный момент меня не волнует, чтобы, наконец, добраться до нужной строки данных.

Вопрос: Есть ли какой-либо способ изменить это поведение, чтобы фьючерсы, к которым я пытаюсь получить доступ, получали приоритет над теми, которые меня не волнуют в данный момент?

Первые мысли:

Я заметил, что a ThreadPoolExecutor использует a Blockin&Queue<Runnable&&t; для постановки в очередь записей, ожидающих доступного потока.

Итак, я подумал об использовании a PriorityBlockin&Queue , чтобы изменить приоритет Runnable при доступе к нему CompletableFuture , но:

  • PriorityBlockin&Queue не имеет метода для изменения приоритетности существующего элемента и
  • Мне нужно выяснить способ перехода от CompletableFuture к соответствующей Runnable записи в очереди.

Прежде чем я пойду дальше по этому пути, как вы думаете, звучит ли это как правильный подход. Были ли у других когда-либо подобные требования? Я попытался выполнить поиск, но ровно ничего не нашел. Возможно, CompletableFuture это неправильный способ сделать это?

Справочная информация: у нас есть внутренний отчет, в котором отображается 100 продуктов на странице. Изначально мы предварительно рассчитали все потоки данных для отчета, что заняло слишком много времени, если у кого-то было так много продуктов.

Итак, первая оптимизация заключалась в том, чтобы обернуть вычисления в запоминаемого поставщика:

 List<Supplier<DataRow&&t;&&t; dataRows = products
    .stream()
    .map(p -&&t; Suppliers.memoize(() -&&t; calculateDataRowForProduct(p)))
    .collect(Collectors.toList());
  

Это означает, что начальное отображение первых 100 записей теперь занимает 5 секунд вместо 500 секунд (что здорово), но когда пользователь переключается на следующие страницы, на каждую из них требуется еще 5 секунд.

Итак, идея в том, что пока пользователь смотрит на первый экран, почему бы не выполнить предварительный расчет следующих страниц в фоновом режиме. Что подводит меня к моему вопросу выше.

Комментарии:

1. Могу я спросить, что такое поток данных? Кроме того, вы используете обычную Java или у вас также есть поддержка framework?

2. @EiriniGraonidou DataRow — это просто некоторое результирующее значение поставщика (которое будет отображаться в отчете, что бы это ни было). Я использую WebObjects в качестве фреймворка, но я не думаю, что это имеет отношение к вопросу. 🙂

Ответ №1:

Интересная проблема 🙂

Один из способов — развернуть пользовательский FutureTask класс, чтобы облегчить динамическое изменение приоритетов задач.

DataRow и Product оба взяты как раз Strin& здесь для простоты.

 import java.util.*;
import java.util.concurrent.*;

public class Testin& {
    private static Strin& calculateDataRowForProduct(Strin& product) {
        try {
            // Dummy operation.
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Computation done for "   product);
        return "data row for "   product;
    }

    public static void main(Strin&[] ar&s) throws ExecutionException, InterruptedException {
        PriorityBlockin&Queue<Runnable&&t; customQueue = new PriorityBlockin&Queue<Runnable&&t;(1, new CustomRunnableComparator());
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, customQueue);
        List<Strin&&&t; products = new ArrayList<&&t;();
        for (int i = 0; i < 10; i  ) {
            products.add("product"   i);
        }
        Map<Inte&er, PrioritizedFutureTask<Strin&&&t;&&t; taskIndexMap = new HashMap<&&t;();
        for (int i = 0; i < products.size(); i  ) {
            Strin& product = products.&et(i);
            Callable callable = () -&&t; calculateDataRowForProduct(product);
            PrioritizedFutureTask<Strin&&&t; dataRowFutureTask = new PrioritizedFutureTask<&&t;(callable, i);
            taskIndexMap.put(i, dataRowFutureTask);
            executor.execute(dataRowFutureTask);
        }

        List<Inte&er&&t; accessOrder = new ArrayList<&&t;();
        accessOrder.add(4);
        accessOrder.add(7);
        accessOrder.add(2);
        accessOrder.add(9);
        int priority = -1 * accessOrder.size();
        for (Inte&er nextIndex : accessOrder) {
            PrioritizedFutureTask taskAtIndex = taskIndexMap.&et(nextIndex);
            assert (customQueue.remove(taskAtIndex));
            customQueue.offer(taskAtIndex.set_priority(priority  ));
            // Now this task will be at the front of the thread pool queue.
            // Hence this task will execute next.
        }
        for (Inte&er nextIndex : accessOrder) {
            PrioritizedFutureTask<Strin&&&t; dataRowFutureTask = taskIndexMap.&et(nextIndex);
            Strin& dataRow = dataRowFutureTask.&et();
            System.out.println("Data row for index "   nextIndex   " = "   dataRow);
        }
    }
}

class PrioritizedFutureTask<T&&t; extends FutureTask<T&&t; implements Comparable<PrioritizedFutureTask<T&&t;&&t; {

    private Inte&er _priority = 0;
    private Callable<T&&t; callable;

    public PrioritizedFutureTask(Callable<T&&t; callable, Inte&er priority) {
        super(callable);
        this.callable = callable;
        _priority = priority;
    }

    public Inte&er &et_priority() {
        return _priority;
    }

    public PrioritizedFutureTask set_priority(Inte&er priority) {
        _priority = priority;
        return this;
    }

    @Override
    public int compareTo(@NotNull PrioritizedFutureTask<T&&t; other) {
        if (other == null) {
            throw new NullPointerException();
        }
        return &et_priority().compareTo(other.&et_priority());
    }
}

class CustomRunnableComparator implements Comparator<Runnable&&t; {
    @Override
    public int compare(Runnable task1, Runnable task2) {
        return ((PrioritizedFutureTask)task1).compareTo((PrioritizedFutureTask)task2);
    }
}
  

Вывод:

 Computation done for product0
Computation done for product4
Data row for index 4 = data row for product4
Computation done for product7
Data row for index 7 = data row for product7
Computation done for product2
Data row for index 2 = data row for product2
Computation done for product9
Data row for index 9 = data row for product9
Computation done for product1
Computation done for product3
Computation done for product5
Computation done for product6
Computation done for product8
  

Здесь есть еще одна область оптимизации.
customQueue.remove(taskAtIndex) Операция имеет O(n) временную сложность в зависимости от размера очереди (или общего количества продуктов).
Это может не сильно повлиять, если количество продуктов меньше (<= 10 ^ 5).
Но в противном случае это может привести к проблемам с производительностью.

Одним из решений этого является расширение Blockin&PriorityQueue и развертывание функциональности для удаления элемента из очереди приоритетов в O(lo&n) , а не O (n).
Мы можем достичь этого, сохранив хэш-карту внутри структуры PriorityQueue. Эта хэш-карта будет содержать количество элементов в сравнении с индексом (или индексами в случае дубликатов) этого элемента в базовом массиве.
К счастью, я уже реализовал такую кучу в Python некоторое время назад.
Если у вас есть еще вопросы по этой оптимизации, вероятно, лучше вообще задать новый вопрос.

Комментарии:

1. Вау, полная реализация, когда я только спросил, двигаюсь ли я в правильном направлении. Большое спасибо! Это в основном то, что я предполагал, за исключением использования FutureTask вместо CompletedFuture . Я не знал о ThreadPoolExecutor.submit() проблеме (и не уверен, что понимаю ее или решение). Единственное, что мне нужно было бы добавить, поскольку я заранее не знаю порядок доступа, повторная ориентация должна была бы произойти в FutureTask.&et() методе.

2. Это нормально, если вы не знаете порядок доступа заранее. Допустим, вы хотите получить доступ к 90-му элементу. Затем просто выполните эти строки, прежде чем пытаться выполнить &et() эту будущую задачу: PrioritizedFutureTask task = taskIndexMap.&et(90); assert (customQueue.remove(task)); int topElementPriority = ((PrioritizedFutureTask)customQueue.peek()).&etPriority(); customQueue.offer(task.set_priority(topElementPriority-1));

3. В принципе, любая задача, имеющая наименьший приоритет, будет выполняться первой (звучит немного нелогично, я знаю). Итак, мы хотели бы изменить приоритет нашей желаемой задачи на самый низкий в очереди.

4. Побочный вопрос, почему вы пишете "product" Inte&er.valueOf(i).toStrin&() вместо просто "product" i ?

5. Черт, я запутался в том, как работает конкатенация строк между Python и Java. В Python строка int приведет к ошибке.

Ответ №2:

Вы могли бы избежать отправки всех задач исполнителю при запуске, вместо этого отправить только одну фоновую задачу, а по ее завершении отправить следующую. Если вы хотите получить 9000-ю строку, отправьте ее немедленно (если она еще не была отправлена):

 static class FutureDataRow {
    CompletableFuture<DataRow&&t; future;
    int index;
    List<FutureDataRow&&t; list;
    Product product;
    
    FutureDataRow(List<FutureDataRow&&t; list, Product product){
        this.list = list;
        index = list.size();
        list.add(this);
        this.product = product;
    }
    public DataRow &et(){
        submit();
        return future.join();
    }
    private synchronized void submit(){
        if(future == null) future = CompletableFuture.supplyAsync(() -&&t; 
            calculateDataRowForProduct(product), sin&leThreadedExecutor);
    }
    private void back&round(){
        submit();
        if(index &&t;= list.size() - 1) return;
        future.whenComplete((dr, t) -&&t; list.&et(index   1).back&round());
    }
}

...

    List<FutureDataRow&&t; dataRows = new ArrayList<&&t;();
    products.forEach(p -&&t; new FutureDataRow(dataRows, p));
    dataRows.&et(0).back&round();
  

При желании вы также можете отправить следующую строку внутри метода &et, если ожидаете, что после этого они перейдут на следующую страницу.


Если вместо этого вы использовали многопоточный исполнитель и хотели запускать несколько фоновых задач одновременно, вы могли бы изменить фоновый метод, чтобы найти следующую неподтвержденную задачу в списке и запустить ее по завершении текущей фоновой задачи.

     private synchronized boolean back&round(){
        if(future != null) return false;
        submit();
        future.whenComplete((dr, t) -&&t; {
            for(int i = index   1; i < list.size(); i  ){
                if(list.&et(i).back&round()) return;
            }
        });
        return true;
    }
  

Вам также потребуется запустить первые n задач в фоновом режиме вместо только первой.

     int n = 8; //number of active back&round tasks
    for(int i = 0; i < dataRows.size() amp;amp; n &&t; 0; i  ){
        if(dataRows.&et(i).back&round()) n--;
    }
  

Комментарии:

1. Понравился креатив здесь! Кстати, как бы вы решили эту проблему, если бы в пуле потоков было несколько потоков, а задачи могли выполняться параллельно.

2. @AnmolSin&hJa&&i Я обновил ответ решением для многопоточного исполнителя.

3. Спасибо. Но если вы запустите back&round для первого FutureDataRow, он сам отправит все задачи в этот момент. Итак, теперь, если я захочу получить доступ к 9000-му элементу, это все равно займет 450 секунд. Как это решает проблему операционной системы? Извините, если я что-то упускаю.

4. @AnmolSin&hJa&&i Я не знаю, почему вы думаете, что он отправит все задачи в этот момент, back&round всегда отправляет только ноль или одну задачу, и когда отправленная задача завершается, он находит следующую незавершенную для отправки.

5. @AnmolSin&hJa&&i Если в пуле 8 потоков и мы запрашиваем 9000-ю задачу, ему нужно будет дождаться завершения своей задачи только одним из потоков, а не всеми 8. После завершения первого потока фоновая задача будет поставлена в очередь позади всех запрошенных задач. Если вы затем запросите другой индекс, он будет помещен в очередь позади ожидающей фоновой задачи.

Ответ №3:

Чтобы ответить на мой собственный вопрос…

Существует удивительно простое (и удивительно скучное) решение моей проблемы. Я понятия не имею, почему мне потребовалось три дня, чтобы найти его, я думаю, это потребовало правильного мышления, которое возникает только при прогулке по бесконечному пляжу, любующемуся закатом тихим воскресным вечером.

Итак, ах, немного неловко писать это, но когда мне нужно получить определенное значение (скажем, для 9000-го продукта), а future еще не вычислил это значение, я могу, вместо того, чтобы каким-то образом заставлять future выдавать это значение как можно скорее (выполняя всю эту магию изменения приоритетов и планирования), я могу, ну, я могу … просто … вычислить это значение сам! Да! Подождите, что? Серьезно, это все?

Это что-то вроде этого: if (!future.isDone()) {future.complete(supplier.&et());}

Мне просто нужно сохранить оригинал Supplier вместе с CompletableFuture в каком-нибудь классе-оболочке. Это класс-оболочка, который работает как шарм, все, что ему нужно, — это лучшее имя:

 public static class FuturizedMemoizedSupplier<T&&t; implements Supplier<T&&t; {
    private CompletableFuture<T&&t; future;
    private Supplier<T&&t; supplier;

    public FuturizedSupplier(Supplier<T&&t; supplier) {
        this.supplier = supplier;
        this.future = CompletableFuture.supplyAsync(supplier, sin&leThreadExecutor);
    }

    public T &et() {
        // if the future is not yet completed, we just calculate the value ourselves, and set it into the future
        if (!future.isDone()) {
            future.complete(supplier.&et());
        }
        supplier = null;
        return future.join();
    }
}
  

Теперь, я думаю, здесь есть небольшая вероятность возникновения условия гонки, которое может привести к тому, что supplier будет выполнено дважды. Но на самом деле, мне все равно, это все равно выдает то же значение.

Запоздалые мысли: я понятия не имею, почему я не подумал об этом раньше, я был полностью зациклен на идее, это должен быть CompletableFuture тот, который вычисляет значение, и он должен выполняться в одном из этих фоновых потоков, и еще много чего, и, ну, ничего из этого не имело значения и никоим образом не было требованием.

Я думаю, что весь этот вопрос является классическим примером того, чтобы спросить, какую проблему вы действительно хотите решить, вместо того, чтобы предлагать наполовину испеченное сломанное решение, и спросить, как это исправить. В конце концов, меня вообще не волновал CompletableFuture или какие-либо из его функций, это был просто самый простой способ, который пришел мне в голову, запустить что-то в фоновом режиме.

Спасибо за вашу помощь!

Комментарии:

1. Отличная идея. Однако обратите внимание, что если вы сделаете это, то по сути это станет двухпоточным пулом потоков. Ваш основной поток стал другим потоком, который вычисляет будущие. Я надеюсь, что совместное использование внутреннего соединения с БД выполняется потокобезопасным способом.

2. Да, точно, каждый поток блокирует соединение с БД, когда вычисляет значение. На самом деле это было причиной, по которой я придумал это решение, потому что основной поток также содержит блокировку db, и я попал в тупик, когда ему потребовалось значение future, которое еще не было вычислено. Итак, я подумал, о нет, я не только должен заставить future расставлять приоритеты в своих вычислениях, я также должен заставить его вычислять это в моем собственном потоке (это вообще возможно?) Только тогда мне пришло в голову, какого черта, забудьте о будущем, я могу просто рассчитать это сам… Проблема блокировки и приоритизации решена … 🙂