Вызов async_write из внешнего обработчика подключения в asio

#c 11 #boost-asio

Вопрос:

Я использую библиотеку asio для создания TCP-соединений. Асинхронная операция чтения/записи выполняется с помощью функции обработчика async_accept.

         mAcceptor.async_accept(*(mConnection->Socket()),
                boost::bind(amp;TCPConnection::HandleAcceptForWrite, this, 
                pI8Msg, 
                boost::asio::placeholders::error));
 
 void TCPConnection::HandleAcceptForWrite(
        INT8* pI8Msg, 
        const boost::system::error_codeamp; err)
{
    if (!err) {
        TransmitData(pI8Msg);//--> Want to call this fn from outside the handler
    }
    SocketAcceptConnection(pI8Msg);
}
 

Я хочу избежать вызова TransmitData (async_write) из обработчика.

Я намерен вызвать write из любого места за пределами обработчика accept. Когда я это делаю, я получаю ошибку — «Плохой файловый дескриптор»

Всегда ли необходимо выполнять асинхронную запись из обработчика? Пожалуйста, поделитесь любым примером кода, если его можно вызвать из другого места.

Комментарии:

1. Обратите внимание, что не владеющие необработанными указателями (pI8Msg) без индикаторов размера являются запахом кода C и вызывают большую категорию ошибок. Рассмотрим std:: vector<uint8_t> или std::span<uint8_t> (или действительно std::string / std::string_view ). О, и сделайте это постоянным, пока вы этим занимаетесь

Ответ №1:

Вы должны предоставить необходимый контекст. Например, что такое mConnection ? Почему mConnection->Socket() возвращает указатель? Если это сервер, то почему у него только один mConnection ?

Спелеология С Хрустальным Шаром

Используя свой хрустальный шар, я собираюсь ответить на этот вопрос с помощью

  • Mcconnection-это общий указатель на объект, инкапсулирующий сокет и некоторое состояние соединения
  • Он инициализируется новым экземпляром перед принятием, всегда
  • Поэтому, если только что-то другое не разделит право собственности *mConnection на него, оно по определению будет уничтожено при mConnection назначении нового экземпляра.

Все вышесказанное в совокупности дает только одно разумное объяснение: mConnection указывает на тип T , который происходит от enable_shared_from_this<T> того, чтобы он мог делиться собственностью с самим собой. Вы должны быть в состоянии увидеть это внутри TransmitData функции, где общий указатель должен быть записан в выражении привязки (или лямбда) для выполнения обработчика async_read завершения.

Что это делает, так это: поддерживает соединение, в терминах C : продлевает или гарантирует срок службы до тех пор, пока не вернется обработчик завершения. Обработчик завершения, в свою очередь, может инициировать дополнительную работу, которая разделяет права собственности (захватывает общий указатель) и так далее, пока последняя операция, которой принадлежит объект подключения, не потеряет интерес и объект подключения ( T ) не будет освобожден.


что делать?

Вам нужно поддерживать связь, даже если она бездействует. Есть много способов. Вы можете вставить общий указатель на него в «таблицу соединений» (например std::vector<shared_ptr<Connection> > ). Недостатком было бы то, что тогда становится трудно обрезать соединения: соединения всегда принадлежат, поэтому никогда не освобождаются. (Смотрите weak_ptr ниже!)

На практике я бы сделал соединение (давайте назовем тип Connection вместо T «с этого момента») ответственным: оно может решить, когда другая сторона повесила трубку, или когда произошел обмен, который сообщает о закрытии соединения, или, возможно, даже наступил тайм-аут.

Поскольку последнее очень распространено (соединения часто закрываются автоматически после установленного периода простоя), а также является наиболее гибким (вы можете установить срок действия на «бесконечный»), давайте покажем это:

Примечание: во время моих попыток превратить ваш фрагмент во что-то работающее, я заметил, что, как ни странно, акцептор, похоже, находится ВНУТРИ типа соединения? Это не имеет смысла, потому что как у вас может быть несколько соединений с одним приемником («сервером»)? Поэтому я изменил все, чтобы быть более типичным

Обратите внимание, что я все еще храню таблицу соединений ( mConnections ), но вместо хранения указателей на владение (например shared_ptr<> ) Я храню weak_ptr<> данные, чтобы мы могли наблюдать за соединениями, не изменяя их срок службы.

Живая демонстрация

Я использовал другой подход к своим обычным ответам и указал на множество деталей в комментариях:

 #include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <memory>
#include <iostream>
#include <list>

// Library facilities
namespace ba = boost::asio;
using boost::asio::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;

// Application types
using INT8     = std::int8_t;
using Executor = ba::io_context::executor_type;
using Duration = std::chrono::steady_clock::duration;

struct TCPConnection : public /*!*/ std::enable_shared_from_this<TCPConnection> {

    TCPConnection(Executor ex, Duration timeout = 120s)
        : mStrand(ex)
        , mSocket(mStrand)
        , mIdleTime(timeout)
        , mIdleTimer{mStrand}
    {
        // cannot use `shared_from_this` inside the constructor!
    }

    tcp::socketamp; Socket() { return mSocket; }

    ~TCPConnection() {
        // Demo output, to understand TCPConnection shared lifetime
        std::cerr << __PRETTY_FUNCTION__ << std::endl;
    }

    void StartSession() {
        // Here we start the idle timer, so we don't disappear until that
        // expires;

        // We don't post to the strand because we assume this is the very first
        // operation on this object (there can only be 1 thread aware of this
        // object)
        StartTimer();
    }

    void TransmitData(INT8 const* pI8Msg)
    {
        post(mStrand, [this, pI8Msg, self = shared_from_this()] {
            std::cout << "TransmitData on " << mSocket.remote_endpoint()
                      << std::endl;
            // Remove the following  line to make the timout also fire if the
            // writing takes too long
            mIdleTimer.cancel(); // We're busy, not idle

            // WARNING: assumes
            // - lifetime of pI8Msg guaranteed
            // - message NUL-terminated
            boost::asio::async_write(
                mSocket, ba::buffer(std::basic_string_view(pI8Msg)),
                boost::bind(amp;TCPConnection::OnWritten, shared_from_this(),
                            ba::placeholders::error(),
                            ba::placeholders::bytes_transferred()));
        });
    }

  private:
    void StartTimer() {
        mIdleTimer.expires_from_now(mIdleTime);
        mIdleTimer.async_wait(boost::bind(amp;TCPConnection::OnTimer,
                                          shared_from_this(),
                                          ba::placeholders::error()));
    }

    void OnTimer(error_code ec)
    {
        if (ec != ba::error::operation_aborted) // cancellation is not timeout
        {
            std::cout << "TCPConnection timeout, closing " << mSocket.remote_endpoint() << std::endl;
            // Timeout, let's respond by closing the socket and letting the
            // connection disappear

            // No need to post to the strand since the completion already
            // happens on the strand
            mSocket.cancel();
            mSocket.shutdown(tcp::socket::shutdown_both);

            // No more async operations so shared_from_this goes out of scope
        }
    }

    void OnWritten(error_code ec, size_t bytes_transferred)
    {
        if (ec.failed()) {
            std::cerr << "OnWritten: " << ec.message() << std::endl;
            // Error, we let the connection die

            // In case the timer wasn't canceled pre-write (see comment in
            // TransmitData) let's make sure it's canceled here
            mIdleTimer.cancel();
            // ignoring errors:
            mSocket.shutdown(tcp::socket::shutdown_both, ec);
            mSocket.close(ec);
        }

        // write was successful, let's restart timer
        std::cerr << "OnWritten: " << ec.message() << " (" << bytes_transferred
                  << " bytes)" << std::endl;

        StartTimer();
    }

    ba::strand<Executor> mStrand; // serialize execution
    tcp::socket          mSocket;
    Duration             mIdleTime;
    ba::steady_timer     mIdleTimer{mStrand, mIdleTime};
};

struct Server {
    Server(Executor ex, uint16_t port)
        : mExecutor(ex)
        , mAcceptor{make_strand(mExecutor), {{}, port}}
    {
        AcceptLoop();
    }

    void TransmitData(INT8 const* pI8Msg) {
        for (autoamp; weak : mConnections) {
            if (auto conn = weak.lock()) {
                conn->TransmitData(pI8Msg);
            }
        }
    }

  private:
    std::shared_ptr<TCPConnection> mConnection;
    std::list<std::weak_ptr<TCPConnection> > mConnections;

    void AcceptLoop()
    {
        mConnection = std::make_shared<TCPConnection>(mExecutor, 1s);

        mAcceptor.async_accept(mConnection->Socket(),
                               boost::bind(amp;Server::HandleAccept, this,
                                           boost::asio::placeholders::error()));
    }

    void HandleAccept(error_code err)
    {
        if (!err) {
            std::cerr << "HandleAccept: "
                      << mConnection->Socket().remote_endpoint() << std::endl;
            mConnection->StartSession(); // no tramsmit, just a keeping
                                         // the connection open

            mConnections.push_back(
                mConnection); // store so we can send messages later

            // optional: garbage collect the mConnections table
            mConnections.remove_if(
                std::mem_fn(amp;std::weak_ptr<TCPConnection>::expired));

            // Keep accepting connections
            AcceptLoop();
        }
    }

    Executor      mExecutor;
    tcp::acceptor mAcceptor;
};

int main() {
    ba::io_context mIo;

    Server srv(mIo.get_executor(), 7979);

    std::thread delayed_transmission([amp;srv] {
        static INT8 const message[] =
            "HELLO WORLDn"; // NUL-terminated and static!
        std::this_thread::sleep_for(3s);
        std::cout << "Sending '" << message << "'" << std::endl;
        srv.TransmitData(message);
    });

    // mIo.run();
    mIo.run_for(10s); // timelimited for Coliru

    delayed_transmission.join();
}
 

Это демонстрация живого запуска, демонстрирующая как истекшие по времени соединения, так и успешные TransmitData вызовы:

  • Я установил тайм-аут на 1 сек,
  • Передача осуществляется через 3 секунды после запуска сервера ( delayed_transmission )
  • Я начинаю один набор из 3 подключений рано, что означает тайм-аут,
  • Я немедленно запускаю еще один набор из 3 соединений, как раз вовремя, чтобы получить задержанную передачу
  • Общее время работы сервера ограничено 10 секундами для демонстрационных целей

введите описание изображения здесь

Вывод текста

Для удобства ознакомления (взято из http://coliru.stacked-crooked.com/a/def4f1927c7b975f):

  • Сервер
      ./a.out amp;
     HandleAccept: 127.0.0.1:43672
     HandleAccept: 127.0.0.1:43674
     HandleAccept: 127.0.0.1:43676
     TCPConnection timeout, closing 127.0.0.1:43672
     TCPConnection::~TCPConnection()
     TCPConnection timeout, closing 127.0.0.1:43674
     TCPConnection::~TCPConnection()
     TCPConnection timeout, closing 127.0.0.1:43676
     TCPConnection::~TCPConnection()
     HandleAccept: 127.0.0.1:43678
     HandleAccept: 127.0.0.1:43680
     HandleAccept: 127.0.0.1:43682
     Sending 'HELLO WORLD
     '
     TransmitData on 127.0.0.1:43678
     TransmitData on 127.0.0.1:43680
     TransmitData on 127.0.0.1:43682
     OnWritten: SuccessHELLO WORLD
      (12 bytes)
     OnWritten: Success (12 bytes)
     OnWritten: Success (12 bytes)
     TCPConnection timeout, closing 127.0.0.1:43678
     TCPConnection::~TCPConnection()
     TCPConnection timeout, closing 127.0.0.1:43680
     TCPConnection::~TCPConnection()
     TCPConnection timeout, closing 127.0.0.1:43682
     TCPConnection::~TCPConnection()
     
  • Клиенты
      sleep 1
     (for a in {1..3}; do netcat 127.0.0.1 7979amp; done; time wait)
    
     real    0m1.011s
     user    0m0.012s
     sys 0m0.004s
    
     sleep 0.5
     (for a in {1..3}; do netcat 127.0.0.1 7979amp; done; time wait)
     HELLO WORLD
     HELLO WORLD
     HELLO WORLD
    
     real    0m1.478s
     user    0m0.008s
     sys 0m0.008s
     

Комментарии:

1. ошибка: нет соответствующей функции для вызова‘boost::asio::basic_stream_socket<boost::asio::ip::tcp>::basic_stream_socket(boost::asio::strand<boost::asio::ip::tcp><boost::asio::io_context::executor_type>amp;)’ , mSocket(mStrand) Почему я получаю эту ошибку? Я использую boost lib 1.66.

2. Да, это старо. Тогда исполнителей так не поддерживали. Что раздражает, потому что вы должны все время помнить о том, чтобы размещать/обертывать пряди. Кроме того, строковое представление не было бы поддержано, что только подтверждает, насколько вам нужно рассмотреть возможность использования более безопасного типа. Вот мое мнение: coliru.stacked-crooked.com/a/62a38e2ca27871c8 (теперь у него есть MAXLEN, потому что простой strlen не работает с INT*)

3. Здесь wandbox.org/permlink/ns0D7eYPybesSqOp и вот godbolt.org/z/nYYG6rd8q вы можете видеть, что он компилируется с Boost 1.66 (хотя там не так много живого теста, как на Coliru). Он уже совместим с c 14.

4. Чтобы получить совместимость с C 11, вам нужно только переписать литералы времени хроно и инициализатор захвата лямбды: godbolt.org/z/azv3sn5T4

5. Уэлп. Чтобы добраться до C 03, требуется гораздо больше работы. И намного больше стимулов 🙂 godbolt.org/z/K1oYqh7Pf Это был мой кусочек возвращения в прошлое на тот день.