Обработка удаленных событий с помощью Java futures

#java #concurrency #completable-future

#java #параллелизм #завершаемый-будущее

Вопрос:

Я программирую взаимодействие в стиле RPC с микроконтроллерами на Java. Проблема, с которой я сталкиваюсь, заключается в блокировании выполнения клиентского кода до тех пор, пока я не получу результат от микроконтроллера, который поступает асинхронно.

А именно, я отправляю команды и получаю результаты в двух разных потоках (хотя и одного класса). Подход, который я использовал, заключается в использовании CompletableFuture, но это работает не так, как я ожидаю.

Мой метод вызова RPC отправляет команду и создает экземпляр CompletableFuture, как показано ниже:

 protected synchronized CompletableFuture<String> sendCommand(String command) {
    ... send command ...

    this.handler = new CompletableFuture<String>();

    return this.handler;
}
 

В вызывающем коде это выглядит так:

 CompletableFuture<String> result = procedure.sendCommand("readSensor(0x1508)");
String result = result.get(5, TimeUnit.SECONDS); // line X
 

Далее, есть метод прослушивания, который получает данные от микроконтроллера:

 protected synchronized void onReceiveResult(String data) {
    this.handler.complete(data); // line Y
}
 

Я ожидаю, что выполнение клиентского кода будет заблокировано в строке X, и это действительно так. Но по какой-то причине строка Y не разблокирует ее, что приводит к исключению тайм-аута.

Чтобы ответить на комментарии ниже…

Вызывающий код (извините, имена не совсем соответствуют тому, что я указал выше, но, я думаю, это единственное отличие):

 CompletableFuture<String> result = this.device.sendCommand(cmd);
log.debug("Waiting for callback, result="   result);
String sid = result.get(timeout, unit);
 

Выдает вывод:

 2016-10-14 21:58:30 DEBUG RemoteProcedure:36 - Waiting for callback, result=com.***.rpc.RemoteDevice$ActiveProcedure@44c519a2[Not completed]
 

Код завершения:

 log.debug("Dispatching msg ["   msg   "] to a procedure: "   this.commandForResult);
log.debug("result="   this.result);
log.debug("Cancelled = "   this.result.isCancelled());
log.debug("Done = "   this.result.isDone());
log.debug("CompletedExceptionally = "   this.result.isCompletedExceptionally());

boolean b = this.result.complete(msg);
this.result = null;

log.debug("b="   b);
 

Выдает вывод:

 2016-10-14 21:58:35 DEBUG RemoteDevice:141 - Dispatching msg [123] to a procedure: getId;
2016-10-14 21:58:35 DEBUG RemoteDevice:142 - result=com.***.rpc.RemoteDevice$ActiveProcedure@44c519a2[Not completed]
2016-10-14 21:58:35 DEBUG RemoteDevice:143 - Cancelled = false
2016-10-14 21:58:35 DEBUG RemoteDevice:144 - Done = false
2016-10-14 21:58:35 DEBUG RemoteDevice:145 - CompletedExceptionally = false
2016-10-14 21:58:35 DEBUG RemoteDevice:150 - b=true
 

ActiveProcedure является фактическим завершаемым в будущем:

 public static class ActiveProcedure extends CompletableFuture<String> {
    @Getter String command;
    public ActiveProcedure(String command) {
        this.command = command;
    }
}
 

Комментарии:

1. В принципе, это должно работать таким образом. Вы уверены, что onReceiveResult был вызван потоком слушателя? И вы уверены, что он вызывает complete в том же самом будущем, которого вы ждете? Если другой вызов SendCommand (выполняемый в третьем потоке) вмешивается, будущий участник изменился бы. Или обратный вызов выполняется слишком быстро и выполняется для старого участника future до создания нового. Возможно, измените порядок: сначала создайте future, затем отправьте.

2. ваши методы синхронизированы, что означает, что это какой-то параллельный код. И в то же время вы устанавливаете класс field ( this.handler ) , возвращаете его как результат и обновляете в onReceiveResult методе. Это какое-то противоречие, поскольку следующий поток будет переназначен handler , а старый никогда не будет обновлен

3. RPC означает отдельные JVM. Используете ли вы отдельные JVM, и если да, то CompletableFuturecannot не может охватывать эти JVM. Пожалуйста, уточните.

4. @Dmitry, эти методы определены как синхронизированные специально, чтобы избежать нежелательного сброса переменной обработчика. Это обновляется при каждом вызове. Причина: я не знаю, в ответ на какой вызов я получаю входные данные от микроконтроллера. Поэтому я направляю все в обработчик последней процедуры, который затем решает, принадлежит ли он ему.

5. @edharned, в микроконтроллере нет java. java работает только на стороне компьютера.

Ответ №1:

ОК. Все стало ясно:

Возникла проблема с интеграцией с базовой библиотекой, которую я использую для связи с микроконтроллером. Я ожидал, что получу данные с устройства в отдельном потоке, но это происходило в том же потоке. Поэтому CompletableFuture.get не был разблокирован.

Я не совсем понимаю механизм, приводящий к такому поведению, но размещение

 handler.complete(msg);
 

в отдельный поток проблема решена.