Закройте Boost Websocket со стороны сервера, C , тайм-аут tcp::acceptor accept ()?

#c #boost #websocket #pthreads #poco

Вопрос:

ОБНОВЛЕНИЕ: Похоже, мне нужно решить мою проблему с помощью асинхронной реализации. Я обновлю свою публикацию новым направлением, как только завершу тестирование

Оригинал: В настоящее время я пишу многосерверное приложение, которое будет собирать, обмениваться и запрашивать информацию с нескольких компьютеров. В некоторых случаях машина A запросит информацию у машины B, но должна будет отправить ее машине C, которая ответит на A. Не вдаваясь слишком глубоко в то, что будет делать приложение, мне нужна помощь с моим клиентским приложением.

У меня есть клиентское приложение, разработанное с двумя потоками. Я использовал этот пример из boost в качестве основы для своего дизайна.

Поток один откроет веб-сайт клиента с помощью Machine-A, он будет передавать серию точек данных и команд. Вот урезанная версия моего кода

 #include "Poco/Clock.h"
#include "Poco/Task.h"
#include "Poco/Thread.h"
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <jsoncons/json.hpp>

namespace beast     = boost::beast;     // from <boost/beast.hpp>
namespace http      = beast::http;      // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net       = boost::asio;      // from <boost/asio.hpp>
using tcp           = net::ip::tcp;     // from <boost/asio/ip/tcp.hpp>

class ResponseChannel : public Poco::Runnable {
    void do_session(tcp::socket socket)
    {
        try {
            websocket::stream<tcp::socket> ws{std::move(socket)};
            ws.set_option(websocket::stream_base::decorator(
                [](websocket::response_typeamp; res) {
                    res.set(http::field::server,
                            std::string(BOOST_BEAST_VERSION_STRING)  
                                " websocket-server-sync");
                }));

            ws.accept();

            for (;;) {
                beast::flat_buffer buffer;
                ws.read(buffer);

                if (ws.got_binary()) {
                    // do something
                }
            }
        } catch (beast::system_error constamp; se) {
            if (se.code() != websocket::error::closed) {
                std::cerr << "do_session1 ->: " << se.code().message()
                          << std::endl;
                return;
            }
        } catch (std::exception constamp; e) {
            std::cerr << "do_session2 ->: " << e.what() << std::endl;
            return;
        }
    }

    virtual void run()
    {
        auto const address = net::ip::make_address(host);
        auto const port    = static_cast<unsigned short>(respPort);

        try {
            net::io_context ioc{1};
            tcp::acceptor   acceptor{ioc, {address, port}};
            tcp::socket     socket{ioc};

            for (; keep_running;) {
                acceptor.accept(socket);

                std::thread(amp;ResponseChannel::do_session, this,
                            std::move(socket))
                    .detach();
            }
        } catch (const std::exceptionamp; e) {
            std::cout << "run: " << e.what() << std::endl;
        }
    }

    void _terminate() { keep_running = false; }

  public:
    std::string                host;
    int                        respPort;
    bool                       keep_running  = true;
    int                        responseCount = 0;
    std::vector<long long int> latency_times;
    long long int              time_sum;
    Poco::Clock*               responseClock;
};

int main()
{
    using namespace std::chrono_literals;
    Poco::Clock     clock = Poco::Clock();
    Poco::Thread    response_thread;
    ResponseChannel response_channel;

    response_channel.responseClock = amp;clock;
    response_channel.host          = "0.0.0.0";
    response_channel.respPort      = 8080;
    response_thread.start(response_channel);
    response_thread.setPriority(Poco::Thread::Priority::PRIO_HIGH);

    // doing some work here. work will vary depending on command-line arguments
    std::this_thread::sleep_for(30s);
    response_channel.keep_running = false;
    response_thread.join();
}
 

То, как я спроектировал несколько машин, работает, как и ожидалось, в отношении отправки команд машине-B и получения результатов от машины-C.

Проблема, с которой я сталкиваюсь, заключается в закрытии потока 2, который содержит мой локальный канал ответа.

Я переходил туда и обратно между Poco::Thread и Poco::Task, но решил, что не хочу использовать Task, так как было бы ошибкой закрывать 2-й поток/задачу из основного потока. Мне нужно знать, что все пакеты были получены до закрытия 2-го потока.

Поэтому мне нужно закрыть события только после того, как я получил флаг websocket::ошибка::закрыто с компьютера-C. Отключение веб-узла, отсоединенного, потока не является проблемой, так как, когда появляется флаг, он заботится об этом за меня.

Однако, как часть процесса цикла для повторного подключения после закрытого сокета, поток просто ждет нового соединения.

 acceptor.accept(socket);
 

Он блокируется, и в документации, похоже, нет функции тайм-аута. Я вижу, что есть опция закрытия, но моя попытка использовать close просто вызвала исключение. Что в конечном счете добавляло сложности, чего я не хотел.

В конечном счете, я хочу, чтобы сервер непрерывно выполнял цикл через серию подключений как от машины-B, так и от машины-C, но только после завершения моего клиентского приложения. Последнее, что я делаю, прежде чем дождаться завершения потока Poco::, — это устанавливаю флаг, который я больше не хочу запускать на сервере Websocket.

Я поставил этот флаг перед блокировкой вызова accept (). Это сработало бы, только при идеальном времени поднятия флага новое соединение открывается, а затем закрывается, прежде чем вернуться назад, чтобы дождаться нового соединения.

В идеале должен быть тайм-аут, чтобы он выполнял цикл, сначала проверяя, истекло ли время ожидания, позволяя периодически проверять, хочу ли я, чтобы поток оставался открытым.

Кто-нибудь когда-нибудь сталкивался с этим?

Комментарии:

1. Я не понимаю, о чем меня спрашивают. Принять блокировку — это не проблема. Какая розетка закрывается? Акцептор? Зачем вам нужно было отменять прием? Если да, то почему бы не сделать его асинхронным?

2. В общем, я думаю, что все слишком сложно с точки зрения потоковой передачи. С небольшой асинхронностью (вы знаете, как намеревается Asio) вы, вероятно, сможете обслуживать все более эффективно и надежно из одного потока.

3. Hello @sehe. Спасибо за 4 комментария. Я не уверен в вашей ссылке на Asio. Я покопаюсь в этом и посмотрю, что смогу найти. Однако, что касается блокировки принятия. Я хотел бы найти способ предотвратить блокировку функции accept() или способ вывести ее из блока. Мне нужно иметь возможность проверить наличие флага, чтобы определить, следует ли мне ждать новых подключений или закрыть клиентское программное обеспечение. В конечном счете, я считаю, что мне нужно, по крайней мере, 2 потока, так как 1-й поток откроет веб-сайт с компьютера A на B. В то время как 2-я нить будет обрабатывать соединение от машины-C. Не знаю, как бы я это обошел.

4. Пока вы хотите писать синхронный код, другого пути нет. Цель асинхронности-параллелизм без потоковой передачи.

5. @sehe, спасибо тебе. Я рассмотрю асинхронную реализацию кода и обновлю свои результаты.