Как я могу реализовать протокол запроса-ответа без блокировки в ожидании ответа?

#java #networking #concurrency #protocols #nonblocking

#java #сеть #параллелизм #протоколы #неблокирующий

Вопрос:

Мне нужно реализовать приложение, которое одновременно взаимодействует с несколькими клиентами, используя (двунаправленный) протокол запроса-ответа. Ранее я реализовал это, используя два выделенных потока для каждого клиента (один читатель / реактор и один писатель / инициатор). Проблема в том, что управление потоками стало довольно сложным и уродливым. Существует ли какой-либо стандартный способ обработки этого, возможно, даже с одним потоком или, по крайней мере, постоянным количеством потоков для обработки всех клиентов?

Примерно так будет выглядеть некоторая связь в потоке с реализацией блокировки:

 Command response = request("cmd1", "a", "b");
if(!response.is("OK")) {
    return;
}
response = request("cmd2", "c");
if(!response.is("OK")) {
    return;
}
response = request("cmd3");
 

Между отправкой запроса и ожиданием соответствующего ответа я хотел бы, чтобы текущий поток мог выполнять другую работу, но затем продолжить выполнение, как только ответ действительно поступит.

Я знаю, что можно было бы использовать асинхронный ввод-вывод и зарегистрировать будущие / запускаемые экземпляры Java для запуска после получения ответа на запрос, но это легко превращается в многоуровневую вложенность анонимных запускаемых подклассов, и я подозреваю, что это будет больше боли, чем того стоит. Вероятно, это приведет к чему-то вроде примера ниже, который быстро становится сильно вложенным и нечитаемым. Наверняка должен быть более простой способ?

 request("cmd1", "a", "b", new ResponseHandler() {
    public void response(Command response) {
        if(response.is("OK")) {
            request("cmd2", "c", new ResponseHandler() {
                public void response(Command response) {
                    if(response.is("OK")) {
                        request("cmd3", new NullResponseHandler());
                    }
                }});
        }
    }});
 

Я также рассмотрел возможность использования выделенной инфраструктуры актера для обработки логики запроса-ответа. Хотя они выглядят так, как будто могут помочь в этой ситуации, я никогда раньше не работал с такими фреймворками, поэтому я не знаю, подходят ли они для этой ситуации.

Короче говоря, мой вопрос таков: как мне обрабатывать произвольное количество соединений запрос-ответ неблокирующим способом, чтобы было достаточно постоянного количества потоков? Являются ли фреймворки актеров допустимым подходом?

PS. Я использую Java для этого проекта.

Ответ №1:

Я думаю, это должно быть довольно просто, используя некоторые Java NIO framework, такие как, например, netty. Не использовал фреймворк Actor, поэтому не знаю, подойдет ли это лучше.

По сути, вы создаете один класс, который обрабатывает всю коммуникацию и хранит необходимую информацию — фреймворк обрабатывает весь поток в фоновом режиме, вы просто предоставляете метод, например, MessageReceived и обрабатываете его там.

Недостатком является то, что вам нужно в основном написать свой собственный конечный автомат, который может быть не таким простым, но это, безусловно, самый простой способ использования NIO.

Пример:

 enum State {
    S0, S1
}
private State state = State.S0;
public void messageReceived(
        ChannelHandlerContext ctx, MessageEvent e) {
    switch(state) {
    case S0:
        // read object from channel and write appropriate response
        e.getChannel().write("HELO");  // writes are asynchronous 
        state = State.S1;
        break;
    case S1:
        // same as S0
        e.getChannel().write("DONE");
        break;
    }
}
 

Обратите внимание, что чтение руководства по netty по-прежнему абсолютно необходимо, потому что в NIO есть вещи, которые не объясняются самостоятельно (по крайней мере, они не были для меня). Но есть несколько примеров, с которыми легко работать, которые должны научить вас основам.

Комментарии:

1. Спасибо. Я никогда не слышал и не использовал Netty раньше. Я обязательно изучу это. (Когда дело доходит до фреймворков Java NIO, я использовал только Apache MINA . Может быть, они похожи?) Написав свой собственный конечный автомат, я предполагаю, что он будет обрабатывать логику в вопросе, подобном: if(state == AwaitingResponseForA amp;amp; response.is("OK")) { /* Perform next step and change state */ } существуют ли какие-либо доступные платформы для помощи в подобных вещах? (В основном, я надеялся, что фреймворки Actor могут быть удобными.)

2. Я только что заметил ваше редактирование. Спасибо! Это очень полезно, и я почти наверняка собираюсь приступить к такому решению, однако мне бы очень хотелось найти платформу для оказания помощи в создании этого конечного автомата, поскольку в противном случае он может стать очень большим и недостижимым.

3. @Jiddo Да, они, вероятно, похожи. Я использовал netty только на основе некоторых сообщений здесь, на SO, где основное соглашение заключалось в том, что netty — самая быстрая и довольно простая структура (я не сравнивал, но это, конечно, не сложно). Я не знаю о какой-либо помощи в написании конечных автоматов (здесь могут быть полезны операторы yield / сопрограммы), но я полагаю, что вам все равно придется придумать все состояния и переходы, и в этом случае вы уже проделали большую часть тяжелой работы — просто создание переключателя и перечисление — это исключительномеханическая задача. Возможно, я сделал слишком много аппаратных средств;-)

Ответ №2:

Я думаю, что использование Javas без блокировки ввода-вывода — это правильный путь. Оттуда вы можете просто добавить ответ в BlockingQueue. Тогда может быть один поток, получающий элементы из BlockingQueue (или блокирующий, если там нет элементов) и обрабатывающий ответы последовательно.

Комментарии:

1. Спасибо. Это, безусловно, было бы началом, но я все еще не уверен в том, как обрабатывать зависимости после ответа в реальной логике приложения. Например, в тех случаях, когда действие требует отправки и получения нескольких пар запрос-ответ по порядку (т. Е. После отправки запроса A мы должны дождаться ответа A, прежде чем сможем отправить запрос B и т.д.).