как «повторно использовать» поток несколько раз в C?

#c #multithreading #sockets #client-server #threadpool

Вопрос:

Мне нужно создать рабочий сервер менеджера, на котором работник обрабатывает только один запрос за раз. В коде, о котором я думал, менеджер хранит файловые дескрипторы в очереди; поток извлекает файловый дескриптор и обрабатывает запрос на него.

Моя проблема в том, что в текущем коде в начале создается N потоков, которые ожидают обработки N запросов; но как только N запросов обработаны, clientFun() функция больше не запускается, поскольку начальные потоки завершили свою работу.

Код сервера:

 #include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <ctype.h>
#include "utils.h"
#include "conn.h"

#define DIM_BUFFER 100
#define N_THREADS 1

struct nodo
{
    int fd;
    struct nodo *prossimoPtr;
};
typedef struct nodo Nodo;
typedef Nodo *NodoPtr;

static Nodo *testaPtr = NULL;
static Nodo *codaPtr = NULL;

static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t emptyFd = PTHREAD_COND_INITIALIZER;

unsigned int updateMaxSelect(int maxFd, fd_set set);
static void run_server(int pipeW2M_Read);
void push(NodoPtr *testaPtrF, NodoPtr *codaPtrF, int fdF);
static void *clientFun(void *pipeW2M_WriteF);
int pop(NodoPtr *lPtrF);
void gestioneCoda(int maxFdF, int fd, fd_set set);
void stampa(NodoPtr lPtrF);
void cleanup();

int main()
{
    cleanup();
    atexit(cleanup);

    int pipeW2M[2];
    SYSCALL(pipe(pipeW2M), "Errore: pipe(pipeW2M)")

    pthread_t threadFd[N_THREADS];
    for(int i = 0; i < N_THREADS; i  )
    {
        THREAD_CREATE(amp;threadFd[i], NULL, amp;clientFun, (void *) amp;pipeW2M[WRITE_END], "Thread setId")
    }

    run_server(pipeW2M[READ_END]);
    SYSCALL(close(pipeW2M[WRITE_END]), "Errore: close(pipeW2M[WRITE_END])")
    SYSCALL(close(pipeW2M[READ_END]), "Errore: close(pipeW2M[READ_END])")


    for(int i = 0; i < N_THREADS; i  )
    {
        THREAD_JOIN(threadFd[i], NULL, "Impossibile fare la join: seetId");
    }
}

static void run_server(int pipeW2M_Read)
{
    //Socket di connessione
    int fdSkt;
    RETURN_SYSCALL(fdSkt, socket(AF_UNIX, SOCK_STREAM, 0), "Errore creazione socket - fdSkt")
    struct sockaddr_un sckAddr;
    strncpy(sckAddr.sun_path, SOCKNAME, MAXBACKLOG);
    sckAddr.sun_family = AF_UNIX;
    SYSCALL(bind(fdSkt, (struct sockaddr *) amp;sckAddr, sizeof(sckAddr)), "Errore bind - fdSkt")
    SYSCALL(listen(fdSkt, SOMAXCONN), "Errore listen - fdSkt")

    //Massimo fd attivo
    int maxFd = fdSkt;

    //Inizializzazione set
    fd_set set, readSet;
    FD_ZERO(amp;set);
    FD_SET(fdSkt, amp;set); //FD_SET imposta a 1 il bit corrispondente a fdSkt
    FD_SET(pipeW2M_Read, amp;set);
    if(pipeW2M_Read > maxFd)
    {
        maxFd = pipeW2M_Read;
    }

    int fdSkt_accept;
    while(1)
    {
        readSet = set;
        SYSCALL(select(maxFd   1, amp;readSet, NULL, NULL, NULL), "select(fd_num   1, amp;rdset, NULL, NULL, NULL)")

        for(int i = 0; i <= maxFd; i  )
        {
            if (FD_ISSET(i, amp;readSet))
            {
                if (i == fdSkt)
                {
                    RETURN_SYSCALL(fdSkt_accept, accept(fdSkt, NULL, 0), "fdSkt_accept = accept(fdSkt, NULL, 0)")
                    FD_SET(fdSkt_accept, amp;set);
                    if (fdSkt_accept > maxFd) {
                        maxFd = fdSkt_accept;
                    }
                    continue;
                }

                if(i == pipeW2M_Read)
                {
                    int pipeFdSoccket;
                    SYSCALL(read(pipeW2M_Read, amp;pipeFdSoccket, sizeof(int)), "Errore")
                    printf("%dn", pipeFdSoccket);

                    FD_SET(pipeFdSoccket, amp;set);
                    if(pipeFdSoccket > maxFd)
                        maxFd = pipeFdSoccket;

                    continue;
                }

                gestioneCoda(maxFd, i, set);
            }
        }
    }
    SYSCALL(close(fdSkt), "Errore close - fdSkt")
}

static void *clientFun(void *pipeW2M_WriteF)
{
    puts("Entro");
    int pipeW2M_Write = *((int *) pipeW2M_WriteF);

    LOCK(amp;mutex)
    while (testaPtr == NULL)
    {
        WAIT(amp;emptyFd, amp;mutex)
    }
    int fdAccept = pop(amp;testaPtr);
    printf("Fd in thread: %dn", fdAccept);
    UNLOCK(amp;mutex)

    char buffer[DIM_BUFFER];
    memset(buffer, '', DIM_BUFFER);

    int lenghtRead;
    RETURN_SYSCALL(lenghtRead, read(fdAccept, buffer, DIM_BUFFER), "Errore: read(fdSkt_com, buffer, DIM_BUFFER)")

    if(lenghtRead == 0)
    {
        SYSCALL(write(pipeW2M_Write, amp;fdAccept, sizeof(int)), "Erroer: write(pipeW2M_Write, amp;fdAccept, sizeof(int))")
        return NULL;
    }

    //lenghtRead comprende conta tutti i caretteri letti (compreso il '' se è presente)
    for(int i = 0; i < lenghtRead-1; i  )
    {
        buffer[i] = toupper((unsigned char) buffer[i]);
    }
    SYSCALL(writen(fdAccept, buffer, lenghtRead), "Errore: writen(fdSkt_com, buffer, lengthBuffer)")

    SYSCALL(write(pipeW2M_Write, amp;fdAccept, sizeof(int)), "Erroer: write(pipeW2M_Write, amp;fdAccept, sizeof(int))")
    puts("Esco");
    return NULL;

}

unsigned int updateMaxSelect(int maxFd, fd_set set)
{
    for(int i = maxFd - 1; i >= 0; i--)
    {
        if(FD_ISSET(i, amp;set))
        {
            return i;
        }
    }
    return -1;
}

void gestioneCoda(int maxFdF, int fd, fd_set set)
{
    LOCK(amp;mutex)

    push(amp;testaPtr, amp;codaPtr, fd);
    FD_CLR(fd, amp;set);
    if(fd == maxFdF)
        maxFdF = updateMaxSelect(fd, set);

    SIGNAL(amp;emptyFd)
    UNLOCK(amp;mutex)
}

void push(NodoPtr *testaPtrF, NodoPtr *codaPtrF, int fdF)
{
    NodoPtr nuovoPtr = NULL;
    RETURN_NULL_SYSCALL(nuovoPtr, malloc(sizeof(Nodo)), "nuovoPtr = malloc(sizeof(Nodo))")

    nuovoPtr->fd = fdF;
    nuovoPtr->prossimoPtr = NULL;

    if(*testaPtrF == NULL)
    {
        *testaPtrF = nuovoPtr;
        *codaPtrF = nuovoPtr;
    }
    else
    {
        (*codaPtrF)->prossimoPtr = nuovoPtr;
        *codaPtrF = nuovoPtr;
    }
}

int pop(NodoPtr *lPtrF)
{
    if(*lPtrF != NULL)
    {
        int value = (*lPtrF)->fd;
        NodoPtr tempPtr = *lPtrF;
        *lPtrF = (*lPtrF)->prossimoPtr;
        free(tempPtr);

        return value;
    }
    else
    {
        puts("la lista è vuota");
        exit(EXIT_FAILURE);
    }
}

void stampa(NodoPtr lPtrF)
{
    if(lPtrF != NULL)
    {
        printf("Parola: %dn", lPtrF->fd);

        stampa(lPtrF->prossimoPtr);
    }
    else
        puts("NULL");
}

void cleanup()
{
    unlink(SOCKNAME);
}
 

Код клиента:

 #include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include "utils.h"
#include "conn.h"
//Librerie per i socket:
#include <sys/socket.h>
#include <sys/un.h>

#define DIM_BUFFER 256


int main()
{
    int fdSkt;
    //Creazione socket - si usano (quasi) sempre questi parametri
    RETURN_SYSCALL(fdSkt, socket(AF_UNIX, SOCK_STREAM, 0), "Errore creazione socket - fdSkt")


    //Connect
    //sckAddr deve essere uguale a quello del server
    struct sockaddr_un sckAddr;
    strncpy(sckAddr.sun_path, SOCKNAME, 108);
    sckAddr.sun_family = AF_UNIX;

    //il socket potrebbe non aver ancora fatto la listen (per via dello scheduler)
    //il prof nelle correzioni mette solamente: SYSCALL(connect(fdSkt, (struct sockaddr *) amp;sckAddr, sizeof(sckAddr)), "")
    while(connect(fdSkt, (struct sockaddr *) amp;sckAddr, sizeof(sckAddr)) == -1)
    {
        puts("Bloccato");
        if(errno != ENOENT)
        {
            perror("Errore connect - fdSkt");
            exit(EXIT_FAILURE);
        }
    }

    while (1)
    {
        char buffer[DIM_BUFFER];
        memset(buffer, '', DIM_BUFFER);
        SCANF_STRINGA(buffer);

        if(strncmp(buffer, "quit", strlen("quit")) == 0) {
            break;
        }

        int lenghtBuffer = strlen(buffer) 1;
        SYSCALL(writen(fdSkt, buffer, lenghtBuffer), "Errore: writen(fdSkt, buffer, strlen(buffer) 1)")

        SYSCALL(readn(fdSkt, buffer, lenghtBuffer), "Errore: writen(fdSkt, buffer, strlen(buffer) 1)")
        printf("%sn", buffer);

    }

    SYSCALL(close(fdSkt), "Errore close - fdSkt")

    return 0;
}
 

Utils.h code:

 #include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <errno.h>

#define READ_END 0
#define WRITE_END 1

#define RETURN_SYSCALL(r,c,e) if((r=c)==-1) { perror(e);exit(errno); }
#define SYSCALL(c,e) if(c==-1) { perror(e);exit(errno);}
#define THREAD_CREATE(a, b, c, d, text) if(pthread_create(a, b, c, d) != 0) { perror(text);exit(EXIT_FAILURE);}
#define THREAD_JOIN(a, b, text) if(pthread_join(a, b) != 0) { perror(text);exit(EXIT_FAILURE);}
//usare con le funzioni che ritornano NULL quando falliscono e di cui si vuole memorizzare il valore di ritorno (es: fopen)
#define RETURN_NULL_SYSCALL(retrunVar, fun, text) if((retrunVar=fun) == NULL) { perror(text);exit(errno); }
//usare per le syscall che quando falliscono ritornano un valore != 0
#define SYSCALL_ZERO(syscall, text) if(syscall != 0) {perror(text);exit(errno);}

#define LOCK(l)                                         
if (pthread_mutex_lock(l) != 0)                         
{                                                       
    fprintf(stderr, "ERRORE FATALE lockn");            
    pthread_exit((void*)EXIT_FAILURE);                  
}

#define UNLOCK(l)                                       
if (pthread_mutex_unlock(l) != 0)                       
{                                                       
    fprintf(stderr, "ERRORE FATALE unlockn");          
    pthread_exit((void*)EXIT_FAILURE);                  
}

#define SIGNAL(c)                                       
if (pthread_cond_signal(c) != 0)                        
{                                                       
    fprintf(stderr, "ERRORE FATALE signaln");          
    pthread_exit((void*)EXIT_FAILURE);                  
}

#define WAIT(c, l)                                      
if (pthread_cond_wait(c,l) != 0)                        
{                                                       
    fprintf(stderr, "ERRORE FATALE waitn");            
    pthread_exit((void*)EXIT_FAILURE);                  
}

#define SCANF_STRINGA(stringa)                
if(scanf("%s", stringa) == 0)                 
{                                             
    perror("Impossibile leggere la stringa"); 
    exit(EXIT_FAILURE);                       
}
 

Код соединения h.:

 #if !defined(CONN_H)
#define CONN_H

#include <sys/types.h> 
#include <sys/socket.h>
#include <sys/uio.h>
#include <sys/un.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>

#define SOCKNAME "./cs_sock"
#define MAXBACKLOG 108

/** Evita letture parziali
 *
 *   retval -1   errore (errno settato)
 *   retval  0   se durante la lettura da fd leggo EOF
 *   retval size se termina con successo
 */
static inline int readn(long fd, void *buf, size_t size) {
    size_t left = size;
    int r;
    char *bufptr = (char*)buf;
    while(left>0) {
    if ((r=read((int)fd ,bufptr,left)) == -1) {
        if (errno == EINTR) continue;
        return -1;
    }
    if (r == 0) return 0;   // EOF
        left    -= r;
    bufptr   = r;
    }
    return size;
}

/** Evita scritture parziali
 *
 *   retval -1   errore (errno settato)
 *   retval  0   se durante la scrittura la write ritorna 0
 *   retval  1   se la scrittura termina con successo
 */
static inline int writen(long fd, void *buf, size_t size) {
    size_t left = size;
    int r;
    char *bufptr = (char*)buf;
    while(left>0) {
    if ((r=write((int)fd ,bufptr,left)) == -1) {
        if (errno == EINTR) continue;
        return -1;
    }
    if (r == 0) return 0;  
        left    -= r;
    bufptr   = r;
    }
    return 1;
}


#endif /* CONN_H */
 

Комментарии:

1. Я не собираюсь читать ваш код. Вообще говоря, у вас была бы очередь на работу, в которую вы помещаете «задания». Поток, который не работает, пытается получить мьютекс в рабочей очереди, получить задание и освобождает мьютекс. Не забудьте также заблокировать мьютекс при размещении новых заданий в очереди.

Ответ №1:

Потоки обычно не выполняются до завершения; скорее, они ожидают в исполнительном цикле какого-либо триггерного события, такого как прибытие сообщения или семафора. Структура функции trpical thred такова:

 void* func( void* arg )
{
    // Thread initialisation

    while( !terminated )
    {
        // Block waiting
        ...
 
        // Do stuff (handle event/message for example)
        ...
    }

    // Clean-up
    ...
}
 

Сигнатура функции потока зависит от библиотеки потоков/ОС; например, для потоков pthreads это нормально. Также поток может работать бесконечно с while(1) for(;;) циклом или. Точный механизм завершения (т. е. terminate в моем примере зависит от вас; это не требуется.

Циклы потоков часто являются реализациями конечных автоматов.

Конечно, у вас может быть поток выполнения до завершения, но в этом случае вам придется создавать новый поток для каждого события-это довольно неэффективно.

Вызов блокировки может быть таким же простым, как sleep() вызов для периодических задач. Можно блокировать в нескольких местах на протяжении всей задачи, но это усложняет проектирование и отладку, и шаблон конечного автомата обычно является лучшим решением.

Ответ №2:

Теперь, когда Клиффорд предоставил общую информацию, я рассмотрю конкретные вопросы в вашей программе и предложу соответствующие изменения.

  • Как она, ваша clientFun не удается обработать не только несколько (подряд) клиентских подключений, а также несколько отдельных сообщений от одного соединения (транспортная обработка сообщений от одного соединения в несколько потоков не разумно, так как клиент в любом случае не предназначен для отправки параллельных запросов); в качестве первого шага нам нужно петлю вокруг обработка одно сообщение, я. Эл. линий
         RETURN_SYSCALL(lenghtRead, read(fdAccept, buffer, DIM_BUFFER), "Errore: read(fdSkt_com, buffer, DIM_BUFFER)")
    …
        SYSCALL(writen(fdAccept, buffer, lenghtRead), "Errore: writen(fdSkt_com, buffer, lengthBuffer)")
     

    поместите for (; ; ) { … } петлю и замените return NULL; ее на break; .

  • Теперь, чтобы обрабатывать несколько соединений, нам нужна внешняя for (; ; ) { … } петля вокруг
         LOCK(amp;mutex)
    …
        SYSCALL(write(pipeW2M_Write, amp;fdAccept, sizeof(int)), "Erroer: write(pipeW2M_Write, amp;fdAccept, sizeof(int))")
     
  • Для совместной работы с этими изменениями run_server необходимы корректировки. Серверу больше не нужно следить за принятым дескриптором сокета, поэтому измените
                         FD_SET(fdSkt_accept, amp;set);
     

    Для

                         gestioneCoda(maxFd, fdSkt_accept, set);
     

    и удалите более поздний вызов gestioneCoda , чтобы принятый сокет немедленно передавался рабочему потоку. Наконец-то измениться

                         FD_SET(pipeFdSoccket, amp;set);
     

    Для

                         close(pipeFdSoccket);
     

    — когда соединение завершено, его не следует добавлять в список наблюдения, а скорее закрывать, чтобы избежать утечки дескриптора сокета.

Комментарии:

1. Но разве таким образом работник не обрабатывает все запросы только от одного клиента за раз? В то время как работник должен обрабатывать только один запрос от одного клиента и ждать других запросов (даже от других клиентов).

2. Тем временем я изменил код в соответствии с ответом Клиффорда; теперь, по крайней мере, по-видимому, код, похоже, работает: pastebin.com/11FhFB5y

3. Это полный код сервера: pastebin.com/MeNxjf1t

4. извините, я забыл удалить изменение из кода, теперь я обновил ссылку

5. Не мог бы он тем временем заняться другими клиентами? Например, в этом случае, если есть клиент, который не пишет в течение «очень долгого» времени, поток вместо ожидания может обработать запрос другого клиента.