#c 11 #boost-asio
Вопрос:
Я использую библиотеку asio для создания TCP-соединений. Асинхронная операция чтения/записи выполняется с помощью функции обработчика async_accept.
mAcceptor.async_accept(*(mConnection->Socket()),
boost::bind(amp;TCPConnection::HandleAcceptForWrite, this,
pI8Msg,
boost::asio::placeholders::error));
void TCPConnection::HandleAcceptForWrite(
INT8* pI8Msg,
const boost::system::error_codeamp; err)
{
if (!err) {
TransmitData(pI8Msg);//--> Want to call this fn from outside the handler
}
SocketAcceptConnection(pI8Msg);
}
Я хочу избежать вызова TransmitData (async_write) из обработчика.
Я намерен вызвать write из любого места за пределами обработчика accept. Когда я это делаю, я получаю ошибку — «Плохой файловый дескриптор»
Всегда ли необходимо выполнять асинхронную запись из обработчика? Пожалуйста, поделитесь любым примером кода, если его можно вызвать из другого места.
Комментарии:
1. Обратите внимание, что не владеющие необработанными указателями (pI8Msg) без индикаторов размера являются запахом кода C и вызывают большую категорию ошибок. Рассмотрим
std:: vector<uint8_t>
илиstd::span<uint8_t>
(или действительноstd::string
/std::string_view
). О, и сделайте это постоянным, пока вы этим занимаетесь
Ответ №1:
Вы должны предоставить необходимый контекст. Например, что такое mConnection
? Почему mConnection->Socket()
возвращает указатель? Если это сервер, то почему у него только один mConnection
?
Спелеология С Хрустальным Шаром
Используя свой хрустальный шар, я собираюсь ответить на этот вопрос с помощью
- Mcconnection-это общий указатель на объект, инкапсулирующий сокет и некоторое состояние соединения
- Он инициализируется новым экземпляром перед принятием, всегда
- Поэтому, если только что-то другое не разделит право собственности
*mConnection
на него, оно по определению будет уничтожено приmConnection
назначении нового экземпляра.
Все вышесказанное в совокупности дает только одно разумное объяснение: mConnection
указывает на тип T
, который происходит от enable_shared_from_this<T>
того, чтобы он мог делиться собственностью с самим собой. Вы должны быть в состоянии увидеть это внутри TransmitData
функции, где общий указатель должен быть записан в выражении привязки (или лямбда) для выполнения обработчика async_read
завершения.
Что это делает, так это: поддерживает соединение, в терминах C : продлевает или гарантирует срок службы до тех пор, пока не вернется обработчик завершения. Обработчик завершения, в свою очередь, может инициировать дополнительную работу, которая разделяет права собственности (захватывает общий указатель) и так далее, пока последняя операция, которой принадлежит объект подключения, не потеряет интерес и объект подключения ( T
) не будет освобожден.
что делать?
Вам нужно поддерживать связь, даже если она бездействует. Есть много способов. Вы можете вставить общий указатель на него в «таблицу соединений» (например std::vector<shared_ptr<Connection> >
). Недостатком было бы то, что тогда становится трудно обрезать соединения: соединения всегда принадлежат, поэтому никогда не освобождаются. (Смотрите weak_ptr
ниже!)
На практике я бы сделал соединение (давайте назовем тип Connection
вместо T
«с этого момента») ответственным: оно может решить, когда другая сторона повесила трубку, или когда произошел обмен, который сообщает о закрытии соединения, или, возможно, даже наступил тайм-аут.
Поскольку последнее очень распространено (соединения часто закрываются автоматически после установленного периода простоя), а также является наиболее гибким (вы можете установить срок действия на «бесконечный»), давайте покажем это:
Примечание: во время моих попыток превратить ваш фрагмент во что-то работающее, я заметил, что, как ни странно, акцептор, похоже, находится ВНУТРИ типа соединения? Это не имеет смысла, потому что как у вас может быть несколько соединений с одним приемником («сервером»)? Поэтому я изменил все, чтобы быть более типичным
Обратите внимание, что я все еще храню таблицу соединений ( mConnections
), но вместо хранения указателей на владение (например shared_ptr<>
) Я храню weak_ptr<>
данные, чтобы мы могли наблюдать за соединениями, не изменяя их срок службы.
Я использовал другой подход к своим обычным ответам и указал на множество деталей в комментариях:
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <memory>
#include <iostream>
#include <list>
// Library facilities
namespace ba = boost::asio;
using boost::asio::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
// Application types
using INT8 = std::int8_t;
using Executor = ba::io_context::executor_type;
using Duration = std::chrono::steady_clock::duration;
struct TCPConnection : public /*!*/ std::enable_shared_from_this<TCPConnection> {
TCPConnection(Executor ex, Duration timeout = 120s)
: mStrand(ex)
, mSocket(mStrand)
, mIdleTime(timeout)
, mIdleTimer{mStrand}
{
// cannot use `shared_from_this` inside the constructor!
}
tcp::socketamp; Socket() { return mSocket; }
~TCPConnection() {
// Demo output, to understand TCPConnection shared lifetime
std::cerr << __PRETTY_FUNCTION__ << std::endl;
}
void StartSession() {
// Here we start the idle timer, so we don't disappear until that
// expires;
// We don't post to the strand because we assume this is the very first
// operation on this object (there can only be 1 thread aware of this
// object)
StartTimer();
}
void TransmitData(INT8 const* pI8Msg)
{
post(mStrand, [this, pI8Msg, self = shared_from_this()] {
std::cout << "TransmitData on " << mSocket.remote_endpoint()
<< std::endl;
// Remove the following line to make the timout also fire if the
// writing takes too long
mIdleTimer.cancel(); // We're busy, not idle
// WARNING: assumes
// - lifetime of pI8Msg guaranteed
// - message NUL-terminated
boost::asio::async_write(
mSocket, ba::buffer(std::basic_string_view(pI8Msg)),
boost::bind(amp;TCPConnection::OnWritten, shared_from_this(),
ba::placeholders::error(),
ba::placeholders::bytes_transferred()));
});
}
private:
void StartTimer() {
mIdleTimer.expires_from_now(mIdleTime);
mIdleTimer.async_wait(boost::bind(amp;TCPConnection::OnTimer,
shared_from_this(),
ba::placeholders::error()));
}
void OnTimer(error_code ec)
{
if (ec != ba::error::operation_aborted) // cancellation is not timeout
{
std::cout << "TCPConnection timeout, closing " << mSocket.remote_endpoint() << std::endl;
// Timeout, let's respond by closing the socket and letting the
// connection disappear
// No need to post to the strand since the completion already
// happens on the strand
mSocket.cancel();
mSocket.shutdown(tcp::socket::shutdown_both);
// No more async operations so shared_from_this goes out of scope
}
}
void OnWritten(error_code ec, size_t bytes_transferred)
{
if (ec.failed()) {
std::cerr << "OnWritten: " << ec.message() << std::endl;
// Error, we let the connection die
// In case the timer wasn't canceled pre-write (see comment in
// TransmitData) let's make sure it's canceled here
mIdleTimer.cancel();
// ignoring errors:
mSocket.shutdown(tcp::socket::shutdown_both, ec);
mSocket.close(ec);
}
// write was successful, let's restart timer
std::cerr << "OnWritten: " << ec.message() << " (" << bytes_transferred
<< " bytes)" << std::endl;
StartTimer();
}
ba::strand<Executor> mStrand; // serialize execution
tcp::socket mSocket;
Duration mIdleTime;
ba::steady_timer mIdleTimer{mStrand, mIdleTime};
};
struct Server {
Server(Executor ex, uint16_t port)
: mExecutor(ex)
, mAcceptor{make_strand(mExecutor), {{}, port}}
{
AcceptLoop();
}
void TransmitData(INT8 const* pI8Msg) {
for (autoamp; weak : mConnections) {
if (auto conn = weak.lock()) {
conn->TransmitData(pI8Msg);
}
}
}
private:
std::shared_ptr<TCPConnection> mConnection;
std::list<std::weak_ptr<TCPConnection> > mConnections;
void AcceptLoop()
{
mConnection = std::make_shared<TCPConnection>(mExecutor, 1s);
mAcceptor.async_accept(mConnection->Socket(),
boost::bind(amp;Server::HandleAccept, this,
boost::asio::placeholders::error()));
}
void HandleAccept(error_code err)
{
if (!err) {
std::cerr << "HandleAccept: "
<< mConnection->Socket().remote_endpoint() << std::endl;
mConnection->StartSession(); // no tramsmit, just a keeping
// the connection open
mConnections.push_back(
mConnection); // store so we can send messages later
// optional: garbage collect the mConnections table
mConnections.remove_if(
std::mem_fn(amp;std::weak_ptr<TCPConnection>::expired));
// Keep accepting connections
AcceptLoop();
}
}
Executor mExecutor;
tcp::acceptor mAcceptor;
};
int main() {
ba::io_context mIo;
Server srv(mIo.get_executor(), 7979);
std::thread delayed_transmission([amp;srv] {
static INT8 const message[] =
"HELLO WORLDn"; // NUL-terminated and static!
std::this_thread::sleep_for(3s);
std::cout << "Sending '" << message << "'" << std::endl;
srv.TransmitData(message);
});
// mIo.run();
mIo.run_for(10s); // timelimited for Coliru
delayed_transmission.join();
}
Это демонстрация живого запуска, демонстрирующая как истекшие по времени соединения, так и успешные TransmitData
вызовы:
- Я установил тайм-аут на 1 сек,
- Передача осуществляется через 3 секунды после запуска сервера (
delayed_transmission
) - Я начинаю один набор из 3 подключений рано, что означает тайм-аут,
- Я немедленно запускаю еще один набор из 3 соединений, как раз вовремя, чтобы получить задержанную передачу
- Общее время работы сервера ограничено 10 секундами для демонстрационных целей
Вывод текста
Для удобства ознакомления (взято из http://coliru.stacked-crooked.com/a/def4f1927c7b975f):
- Сервер
./a.out amp; HandleAccept: 127.0.0.1:43672 HandleAccept: 127.0.0.1:43674 HandleAccept: 127.0.0.1:43676 TCPConnection timeout, closing 127.0.0.1:43672 TCPConnection::~TCPConnection() TCPConnection timeout, closing 127.0.0.1:43674 TCPConnection::~TCPConnection() TCPConnection timeout, closing 127.0.0.1:43676 TCPConnection::~TCPConnection() HandleAccept: 127.0.0.1:43678 HandleAccept: 127.0.0.1:43680 HandleAccept: 127.0.0.1:43682 Sending 'HELLO WORLD ' TransmitData on 127.0.0.1:43678 TransmitData on 127.0.0.1:43680 TransmitData on 127.0.0.1:43682 OnWritten: SuccessHELLO WORLD (12 bytes) OnWritten: Success (12 bytes) OnWritten: Success (12 bytes) TCPConnection timeout, closing 127.0.0.1:43678 TCPConnection::~TCPConnection() TCPConnection timeout, closing 127.0.0.1:43680 TCPConnection::~TCPConnection() TCPConnection timeout, closing 127.0.0.1:43682 TCPConnection::~TCPConnection()
- Клиенты
sleep 1 (for a in {1..3}; do netcat 127.0.0.1 7979amp; done; time wait) real 0m1.011s user 0m0.012s sys 0m0.004s sleep 0.5 (for a in {1..3}; do netcat 127.0.0.1 7979amp; done; time wait) HELLO WORLD HELLO WORLD HELLO WORLD real 0m1.478s user 0m0.008s sys 0m0.008s
Комментарии:
1. ошибка: нет соответствующей функции для вызова‘boost::asio::basic_stream_socket<boost::asio::ip::tcp>::basic_stream_socket(boost::asio::strand<boost::asio::ip::tcp><boost::asio::io_context::executor_type>amp;)’ , mSocket(mStrand) Почему я получаю эту ошибку? Я использую boost lib 1.66.
2. Да, это старо. Тогда исполнителей так не поддерживали. Что раздражает, потому что вы должны все время помнить о том, чтобы размещать/обертывать пряди. Кроме того, строковое представление не было бы поддержано, что только подтверждает, насколько вам нужно рассмотреть возможность использования более безопасного типа. Вот мое мнение: coliru.stacked-crooked.com/a/62a38e2ca27871c8 (теперь у него есть MAXLEN, потому что простой strlen не работает с INT*)
3. Здесь wandbox.org/permlink/ns0D7eYPybesSqOp и вот godbolt.org/z/nYYG6rd8q вы можете видеть, что он компилируется с Boost 1.66 (хотя там не так много живого теста, как на Coliru). Он уже совместим с c 14.
4. Чтобы получить совместимость с C 11, вам нужно только переписать литералы времени хроно и инициализатор захвата лямбды: godbolt.org/z/azv3sn5T4
5. Уэлп. Чтобы добраться до C 03, требуется гораздо больше работы. И намного больше стимулов 🙂 godbolt.org/z/K1oYqh7Pf Это был мой кусочек возвращения в прошлое на тот день.