Как обернуть поток с помощью ReentrantLock в завершаемый будущий вызов?

#java #java-8 #thread-safety

#java #java-8 #безопасность потоков

Вопрос:

Это моя текущая реализация, которая последовательно обрабатывает различные операции чтения / сохранения файлов:

 public void runThread(MyThreadImpl myThreadImpl) {
    synchronized (this) {
        this.myThreadImpl = myThreadImpl;
        notify();
    }
}

synchronized public void run() {
    while (true)
        try {
            wait();
            Global.myReentrantLock.lock();
            try {
                try {
                    myThreadImpl.call();
                } catch (FileException e) {
                    // trace e
                } catch (RuntimeException e) {
                    // trace e
                } catch (Exception e) {
                    // trace e
                }
            } finally {
                Global.myReentrantLock.unlock();
            }
        } catch (InterruptedException e) {
          // trace e
        } catch (Exception e) {
          // trace e
        }
}
  

У меня проблема в том, что я не жду результата потока перед выполнением другой операции, и я пришел к случаю, когда это необходимо.

Поскольку я использую Java 8, я хотел обернуть это в CompletableFuture. Как я могу сделать это с моей текущей реализацией?

Ответ №1:

Вы могли бы сделать следующее:

  • Вместо того, чтобы сохранять следующее задание, которое нужно выполнить, в виде одной ссылки ( this.myThreadImpl ), которая обновляется после освобождения блокировки, вы можете использовать очередь.
  • При добавлении нового задания создается CompletableFuture новое, и ссылка на него возвращается вызывающему.
  • Как только задание завершено, будущее завершено.

Обновляя ваш код и предполагая queue , что это очередь блокировки типа Queue<Pair<CompletableFuture<Void>, MyThreadImpl>> , вы бы:

 /**
 * @return a Future that will complete once the passed MyThreadImpl has been run.
 */
public CompletableFuture<Void> runThread(MyThreadImpl myThreadImpl) {
    Pair<CompletableFuture<Void>, MyThreadImpl> p = 
              new Pair<>(new CompletableFuture<>(),myThreadImpl);
    queue.add(p);
    return p.left;
}

public void run() {
    while (true) {
        try {

            Pair<CompletableFuture<MyThreadImpl>, MyThreadImpl> p = 
                  queue.take(); // will block until a job is added

            try {
                p.right.call();
                p.left.complete(null); // Future<Void> can only be completed with null. Alternatively, it could be completed with a relevant result.
            } catch (Exception e) {
                p.left.completeExceptionally(e);
            }

         } catch (InterruptedException e) {
            // trace e
         } 
   }
}
  

Здесь Pair просто должен быть pojo, подобный паре. ( ImmutablePair Например, это может быть apache commons.)

Блокирующие очереди обычно полезны, когда необходимо обработать материал: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/BlockingQueue.html

Кроме того, вы смотрели ExecutorService ? Вы могли бы использовать тот, который основан на одном потоке, для последовательного выполнения заданий: этот submit(Callable<> task) метод очень похож runThread() на описанный выше, поскольку он возвращает a Future<Void> , который сообщит вам, когда задача будет выполнена.