#java #concurrency #completable-future
#java #параллелизм #завершаемый-будущее
Вопрос:
Я программирую взаимодействие в стиле RPC с микроконтроллерами на Java. Проблема, с которой я сталкиваюсь, заключается в блокировании выполнения клиентского кода до тех пор, пока я не получу результат от микроконтроллера, который поступает асинхронно.
А именно, я отправляю команды и получаю результаты в двух разных потоках (хотя и одного класса). Подход, который я использовал, заключается в использовании CompletableFuture, но это работает не так, как я ожидаю.
Мой метод вызова RPC отправляет команду и создает экземпляр CompletableFuture, как показано ниже:
protected synchronized CompletableFuture<String> sendCommand(String command) {
... send command ...
this.handler = new CompletableFuture<String>();
return this.handler;
}
В вызывающем коде это выглядит так:
CompletableFuture<String> result = procedure.sendCommand("readSensor(0x1508)");
String result = result.get(5, TimeUnit.SECONDS); // line X
Далее, есть метод прослушивания, который получает данные от микроконтроллера:
protected synchronized void onReceiveResult(String data) {
this.handler.complete(data); // line Y
}
Я ожидаю, что выполнение клиентского кода будет заблокировано в строке X, и это действительно так. Но по какой-то причине строка Y не разблокирует ее, что приводит к исключению тайм-аута.
Чтобы ответить на комментарии ниже…
Вызывающий код (извините, имена не совсем соответствуют тому, что я указал выше, но, я думаю, это единственное отличие):
CompletableFuture<String> result = this.device.sendCommand(cmd);
log.debug("Waiting for callback, result=" result);
String sid = result.get(timeout, unit);
Выдает вывод:
2016-10-14 21:58:30 DEBUG RemoteProcedure:36 - Waiting for callback, result=com.***.rpc.RemoteDevice$ActiveProcedure@44c519a2[Not completed]
Код завершения:
log.debug("Dispatching msg [" msg "] to a procedure: " this.commandForResult);
log.debug("result=" this.result);
log.debug("Cancelled = " this.result.isCancelled());
log.debug("Done = " this.result.isDone());
log.debug("CompletedExceptionally = " this.result.isCompletedExceptionally());
boolean b = this.result.complete(msg);
this.result = null;
log.debug("b=" b);
Выдает вывод:
2016-10-14 21:58:35 DEBUG RemoteDevice:141 - Dispatching msg [123] to a procedure: getId;
2016-10-14 21:58:35 DEBUG RemoteDevice:142 - result=com.***.rpc.RemoteDevice$ActiveProcedure@44c519a2[Not completed]
2016-10-14 21:58:35 DEBUG RemoteDevice:143 - Cancelled = false
2016-10-14 21:58:35 DEBUG RemoteDevice:144 - Done = false
2016-10-14 21:58:35 DEBUG RemoteDevice:145 - CompletedExceptionally = false
2016-10-14 21:58:35 DEBUG RemoteDevice:150 - b=true
ActiveProcedure является фактическим завершаемым в будущем:
public static class ActiveProcedure extends CompletableFuture<String> {
@Getter String command;
public ActiveProcedure(String command) {
this.command = command;
}
}
Комментарии:
1. В принципе, это должно работать таким образом. Вы уверены, что onReceiveResult был вызван потоком слушателя? И вы уверены, что он вызывает complete в том же самом будущем, которого вы ждете? Если другой вызов SendCommand (выполняемый в третьем потоке) вмешивается, будущий участник изменился бы. Или обратный вызов выполняется слишком быстро и выполняется для старого участника future до создания нового. Возможно, измените порядок: сначала создайте future, затем отправьте.
2. ваши методы синхронизированы, что означает, что это какой-то параллельный код. И в то же время вы устанавливаете класс field (
this.handler
) , возвращаете его как результат и обновляете вonReceiveResult
методе. Это какое-то противоречие, поскольку следующий поток будет переназначенhandler
, а старый никогда не будет обновлен3. RPC означает отдельные JVM. Используете ли вы отдельные JVM, и если да, то CompletableFuturecannot не может охватывать эти JVM. Пожалуйста, уточните.
4. @Dmitry, эти методы определены как синхронизированные специально, чтобы избежать нежелательного сброса переменной обработчика. Это обновляется при каждом вызове. Причина: я не знаю, в ответ на какой вызов я получаю входные данные от микроконтроллера. Поэтому я направляю все в обработчик последней процедуры, который затем решает, принадлежит ли он ему.
5. @edharned, в микроконтроллере нет java. java работает только на стороне компьютера.
Ответ №1:
ОК. Все стало ясно:
Возникла проблема с интеграцией с базовой библиотекой, которую я использую для связи с микроконтроллером. Я ожидал, что получу данные с устройства в отдельном потоке, но это происходило в том же потоке. Поэтому CompletableFuture.get не был разблокирован.
Я не совсем понимаю механизм, приводящий к такому поведению, но размещение
handler.complete(msg);
в отдельный поток проблема решена.