Отправка сокета Linux C / C в многопоточном коде

#linux #multithreading #sockets

#linux #многопоточность #сокеты

Вопрос:

Что произойдет, если в многопоточном коде несколько потоков одновременно пытаются отправить данные в сокет tcp? будут ли их данные смешаны или разные потоки будут отправлять данные один за другим?

Комментарии:

1. Как вы пишете в сокет? Используете ли вы write(2) или какую-либо абстракцию C ?

2. спасибо, пожалуйста, смотрите Ниже мои комментарии, это send / sendto для пакетов ~ 100 байт

Ответ №1:

Это зависит от того, какие примитивы вы используете для отправки данных в сокет.

Если вы используете write(2) , send(2) , sendto(2) , или sendmsg(2) и размер вашего сообщения достаточно мал, чтобы полностью поместиться в буферах ядра для сокета, тогда вся запись будет отправлена в виде блока без вкрапления других данных.

Если вы используете fwrite(3) (или любую другую абстракцию ввода-вывода с буферизацией более высокого уровня), то есть вероятность, что ваши данные будут отправлены без вкрапления каких-либо других данных, но я бы не стал полагаться на такое поведение.

Я не могу говорить о sendfile(2) поведении. Я хотел бы думать, что sendfile(2) операция «записывает» все содержимое файла в сокет перед любыми другими write(2) запросами в сокете, но в документации, которую я прочитал, об этом не говорится ни слова, поэтому вам лучше не делать предположение, что оно в каком-либо смысле «атомарное».

Самый безопасный механизм заключается в том, что только один поток когда-либо отправляет данные в сокет.

Комментарии:

1. я использую send / sendto, каждый раз, когда поток пытается отправить некоторые данные, размер данных невелик, примерно <200 байт. в настоящее время я использую mutex_lock для синхронизации разных потоков перед вызовом send, я пытаюсь выяснить, могу ли я удалить mutex_lock для повышения производительности.

2. В этом случае я ожидаю, что удаление мьютекса, вероятно, безопасно. Но обязательно проверьте send(2) возврат для -1 , errno == EMSGSIZE , на случай, если вы отправляете данные быстрее, чем сеть может их доставить.

3. В документах POSIX мало что говорится о потокобезопасности этих вызовов, кроме поведения блочного / неблокирующего ввода-вывода. В ядре Linux есть сообщение об ошибке для многопоточных вызовов для записи (2) с файлом, но я не знаю, исправлено ли это. Но, безусловно, системные вызовы обеспечивают гораздо более высокую гарантию (подразумеваемую или нет) атомарности, чем что-либо выше них.

4. Исправление к моему предыдущему комментарию: POSIX требует, чтобы все системные вызовы, кроме указанного набора, были потокобезопасными: pubs.opengroup.org/onlinepubs/007908799/xsh/threads.html

5. PIPE_BUF имеет значение только в том случае, если вы записываете в канал. Если вы пишете в сокет, на самом деле нет никакой гарантии. Используйте мьютекс.

Ответ №2:

Действительно, как сказано в предыдущем ответе: «Самый безопасный механизм заключается в том, чтобы только один поток когда-либо отправлял данные в сокет».

Но если вы хотите, чтобы несколько потоков вызывали чтение / запись, вы должны сами гарантировать потокобезопасность. Ниже я написал оболочку для отправки и recv, которая делает это за вас.

Send() и Recv() являются потокобезопасными в том смысле, что они не приведут к сбою, но нет никакой гарантии, что эти данные не будут «перемешаны» с данными, отправленными из других потоков. Поскольку мы, скорее всего, этого не хотим, мы должны блокировать, пока не будут подтверждены все данные потока, отправленные или полученные, или с ошибкой. Такой длительный блокирующий вызов может быть проблематичным, поэтому убедитесь, что вы делаете это в потоках, которые могут обрабатывать длительные операции блокировки.

Для linux:

 #include <sys/types.h>
#include <sys/socket.h>
#include <map>
#include <mutex>



/* blocks until the full amount of bytes requested are read
 * thread safe
 * throws exception on error */
void recv_bytes(int sock, char* buf, int len, int flags){

    static std::map<int, std::mutex> mtx;

    std::lock_guard<std::mutex> lock(mtx[sock]);

    int bytes_received = 0;

    while (bytes_received != len){

        int bytes = recv(sock, buf   bytes_received, len - bytes_received, flags);

        //error check
        if (bytes == -1){
            throw std::runtime_error("Network Exception");
        }

        bytes_received  = bytes;

    } 
}



/* blocks until the full amount of bytes requested are sent
 * thread safe
 * throws exception on error */
void send_bytes(int sock, char* buf, int len, int flags){

    static std::map<int, std::mutex> mtx;

    std::lock_guard<std::mutex> lock(mtx[sock]);

    int bytes_sent = 0; 

    while (bytes_sent != len){

        int bytes_s0 = send(sock, buf, len, flags);

        if (bytes_sent == -1) {
            throw std::runtime_error("Network Exception");
        }

        bytes_sent  = bytes_s0;

    }
}
 

Комментарии:

1. Выбрасывание исключения и использование mutex.lock()? Если вы создадите исключение, этот код больше никогда не сможет выполняться! Пожалуйста, используйте lock_guard

2. Ах, действительно, я пропустил разблокировку. lock_guards — отличное изобретение.

3. При некоторых обстоятельствах вызов блокирующей функции ввода-вывода при удержании мьютекса может привести к тому, что ваша программа перестанет отвечать на запросы в течение длительного периода. (например, если удаленный клиент не реагирует, вызов recv() может блокироваться в течение длительного времени, в течение которого вызывающий поток, конечно, будет заблокирован,как и любые другие потоки, которые пытались получить мьютекс)

4. Программа перестанет отвечать, только если у вас нет выделенного потока пользовательского интерфейса.

5. Я понимаю, что эти вызовы не решаются блокировать дольше, чем они уже делают, но OP спрашивает о многопоточных сокетах, и предполагается, что у него уже есть выделенный поток пользовательского интерфейса.