#c #multithreading #sockets #client-server #threadpool
Вопрос:
Мне нужно создать рабочий сервер менеджера, на котором работник обрабатывает только один запрос за раз. В коде, о котором я думал, менеджер хранит файловые дескрипторы в очереди; поток извлекает файловый дескриптор и обрабатывает запрос на него.
Моя проблема в том, что в текущем коде в начале создается N потоков, которые ожидают обработки N запросов; но как только N запросов обработаны, clientFun()
функция больше не запускается, поскольку начальные потоки завершили свою работу.
Код сервера:
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <ctype.h>
#include "utils.h"
#include "conn.h"
#define DIM_BUFFER 100
#define N_THREADS 1
struct nodo
{
int fd;
struct nodo *prossimoPtr;
};
typedef struct nodo Nodo;
typedef Nodo *NodoPtr;
static Nodo *testaPtr = NULL;
static Nodo *codaPtr = NULL;
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t emptyFd = PTHREAD_COND_INITIALIZER;
unsigned int updateMaxSelect(int maxFd, fd_set set);
static void run_server(int pipeW2M_Read);
void push(NodoPtr *testaPtrF, NodoPtr *codaPtrF, int fdF);
static void *clientFun(void *pipeW2M_WriteF);
int pop(NodoPtr *lPtrF);
void gestioneCoda(int maxFdF, int fd, fd_set set);
void stampa(NodoPtr lPtrF);
void cleanup();
int main()
{
cleanup();
atexit(cleanup);
int pipeW2M[2];
SYSCALL(pipe(pipeW2M), "Errore: pipe(pipeW2M)")
pthread_t threadFd[N_THREADS];
for(int i = 0; i < N_THREADS; i )
{
THREAD_CREATE(amp;threadFd[i], NULL, amp;clientFun, (void *) amp;pipeW2M[WRITE_END], "Thread setId")
}
run_server(pipeW2M[READ_END]);
SYSCALL(close(pipeW2M[WRITE_END]), "Errore: close(pipeW2M[WRITE_END])")
SYSCALL(close(pipeW2M[READ_END]), "Errore: close(pipeW2M[READ_END])")
for(int i = 0; i < N_THREADS; i )
{
THREAD_JOIN(threadFd[i], NULL, "Impossibile fare la join: seetId");
}
}
static void run_server(int pipeW2M_Read)
{
//Socket di connessione
int fdSkt;
RETURN_SYSCALL(fdSkt, socket(AF_UNIX, SOCK_STREAM, 0), "Errore creazione socket - fdSkt")
struct sockaddr_un sckAddr;
strncpy(sckAddr.sun_path, SOCKNAME, MAXBACKLOG);
sckAddr.sun_family = AF_UNIX;
SYSCALL(bind(fdSkt, (struct sockaddr *) amp;sckAddr, sizeof(sckAddr)), "Errore bind - fdSkt")
SYSCALL(listen(fdSkt, SOMAXCONN), "Errore listen - fdSkt")
//Massimo fd attivo
int maxFd = fdSkt;
//Inizializzazione set
fd_set set, readSet;
FD_ZERO(amp;set);
FD_SET(fdSkt, amp;set); //FD_SET imposta a 1 il bit corrispondente a fdSkt
FD_SET(pipeW2M_Read, amp;set);
if(pipeW2M_Read > maxFd)
{
maxFd = pipeW2M_Read;
}
int fdSkt_accept;
while(1)
{
readSet = set;
SYSCALL(select(maxFd 1, amp;readSet, NULL, NULL, NULL), "select(fd_num 1, amp;rdset, NULL, NULL, NULL)")
for(int i = 0; i <= maxFd; i )
{
if (FD_ISSET(i, amp;readSet))
{
if (i == fdSkt)
{
RETURN_SYSCALL(fdSkt_accept, accept(fdSkt, NULL, 0), "fdSkt_accept = accept(fdSkt, NULL, 0)")
FD_SET(fdSkt_accept, amp;set);
if (fdSkt_accept > maxFd) {
maxFd = fdSkt_accept;
}
continue;
}
if(i == pipeW2M_Read)
{
int pipeFdSoccket;
SYSCALL(read(pipeW2M_Read, amp;pipeFdSoccket, sizeof(int)), "Errore")
printf("%dn", pipeFdSoccket);
FD_SET(pipeFdSoccket, amp;set);
if(pipeFdSoccket > maxFd)
maxFd = pipeFdSoccket;
continue;
}
gestioneCoda(maxFd, i, set);
}
}
}
SYSCALL(close(fdSkt), "Errore close - fdSkt")
}
static void *clientFun(void *pipeW2M_WriteF)
{
puts("Entro");
int pipeW2M_Write = *((int *) pipeW2M_WriteF);
LOCK(amp;mutex)
while (testaPtr == NULL)
{
WAIT(amp;emptyFd, amp;mutex)
}
int fdAccept = pop(amp;testaPtr);
printf("Fd in thread: %dn", fdAccept);
UNLOCK(amp;mutex)
char buffer[DIM_BUFFER];
memset(buffer, '', DIM_BUFFER);
int lenghtRead;
RETURN_SYSCALL(lenghtRead, read(fdAccept, buffer, DIM_BUFFER), "Errore: read(fdSkt_com, buffer, DIM_BUFFER)")
if(lenghtRead == 0)
{
SYSCALL(write(pipeW2M_Write, amp;fdAccept, sizeof(int)), "Erroer: write(pipeW2M_Write, amp;fdAccept, sizeof(int))")
return NULL;
}
//lenghtRead comprende conta tutti i caretteri letti (compreso il '' se è presente)
for(int i = 0; i < lenghtRead-1; i )
{
buffer[i] = toupper((unsigned char) buffer[i]);
}
SYSCALL(writen(fdAccept, buffer, lenghtRead), "Errore: writen(fdSkt_com, buffer, lengthBuffer)")
SYSCALL(write(pipeW2M_Write, amp;fdAccept, sizeof(int)), "Erroer: write(pipeW2M_Write, amp;fdAccept, sizeof(int))")
puts("Esco");
return NULL;
}
unsigned int updateMaxSelect(int maxFd, fd_set set)
{
for(int i = maxFd - 1; i >= 0; i--)
{
if(FD_ISSET(i, amp;set))
{
return i;
}
}
return -1;
}
void gestioneCoda(int maxFdF, int fd, fd_set set)
{
LOCK(amp;mutex)
push(amp;testaPtr, amp;codaPtr, fd);
FD_CLR(fd, amp;set);
if(fd == maxFdF)
maxFdF = updateMaxSelect(fd, set);
SIGNAL(amp;emptyFd)
UNLOCK(amp;mutex)
}
void push(NodoPtr *testaPtrF, NodoPtr *codaPtrF, int fdF)
{
NodoPtr nuovoPtr = NULL;
RETURN_NULL_SYSCALL(nuovoPtr, malloc(sizeof(Nodo)), "nuovoPtr = malloc(sizeof(Nodo))")
nuovoPtr->fd = fdF;
nuovoPtr->prossimoPtr = NULL;
if(*testaPtrF == NULL)
{
*testaPtrF = nuovoPtr;
*codaPtrF = nuovoPtr;
}
else
{
(*codaPtrF)->prossimoPtr = nuovoPtr;
*codaPtrF = nuovoPtr;
}
}
int pop(NodoPtr *lPtrF)
{
if(*lPtrF != NULL)
{
int value = (*lPtrF)->fd;
NodoPtr tempPtr = *lPtrF;
*lPtrF = (*lPtrF)->prossimoPtr;
free(tempPtr);
return value;
}
else
{
puts("la lista è vuota");
exit(EXIT_FAILURE);
}
}
void stampa(NodoPtr lPtrF)
{
if(lPtrF != NULL)
{
printf("Parola: %dn", lPtrF->fd);
stampa(lPtrF->prossimoPtr);
}
else
puts("NULL");
}
void cleanup()
{
unlink(SOCKNAME);
}
Код клиента:
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include "utils.h"
#include "conn.h"
//Librerie per i socket:
#include <sys/socket.h>
#include <sys/un.h>
#define DIM_BUFFER 256
int main()
{
int fdSkt;
//Creazione socket - si usano (quasi) sempre questi parametri
RETURN_SYSCALL(fdSkt, socket(AF_UNIX, SOCK_STREAM, 0), "Errore creazione socket - fdSkt")
//Connect
//sckAddr deve essere uguale a quello del server
struct sockaddr_un sckAddr;
strncpy(sckAddr.sun_path, SOCKNAME, 108);
sckAddr.sun_family = AF_UNIX;
//il socket potrebbe non aver ancora fatto la listen (per via dello scheduler)
//il prof nelle correzioni mette solamente: SYSCALL(connect(fdSkt, (struct sockaddr *) amp;sckAddr, sizeof(sckAddr)), "")
while(connect(fdSkt, (struct sockaddr *) amp;sckAddr, sizeof(sckAddr)) == -1)
{
puts("Bloccato");
if(errno != ENOENT)
{
perror("Errore connect - fdSkt");
exit(EXIT_FAILURE);
}
}
while (1)
{
char buffer[DIM_BUFFER];
memset(buffer, '', DIM_BUFFER);
SCANF_STRINGA(buffer);
if(strncmp(buffer, "quit", strlen("quit")) == 0) {
break;
}
int lenghtBuffer = strlen(buffer) 1;
SYSCALL(writen(fdSkt, buffer, lenghtBuffer), "Errore: writen(fdSkt, buffer, strlen(buffer) 1)")
SYSCALL(readn(fdSkt, buffer, lenghtBuffer), "Errore: writen(fdSkt, buffer, strlen(buffer) 1)")
printf("%sn", buffer);
}
SYSCALL(close(fdSkt), "Errore close - fdSkt")
return 0;
}
Utils.h code:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <errno.h>
#define READ_END 0
#define WRITE_END 1
#define RETURN_SYSCALL(r,c,e) if((r=c)==-1) { perror(e);exit(errno); }
#define SYSCALL(c,e) if(c==-1) { perror(e);exit(errno);}
#define THREAD_CREATE(a, b, c, d, text) if(pthread_create(a, b, c, d) != 0) { perror(text);exit(EXIT_FAILURE);}
#define THREAD_JOIN(a, b, text) if(pthread_join(a, b) != 0) { perror(text);exit(EXIT_FAILURE);}
//usare con le funzioni che ritornano NULL quando falliscono e di cui si vuole memorizzare il valore di ritorno (es: fopen)
#define RETURN_NULL_SYSCALL(retrunVar, fun, text) if((retrunVar=fun) == NULL) { perror(text);exit(errno); }
//usare per le syscall che quando falliscono ritornano un valore != 0
#define SYSCALL_ZERO(syscall, text) if(syscall != 0) {perror(text);exit(errno);}
#define LOCK(l)
if (pthread_mutex_lock(l) != 0)
{
fprintf(stderr, "ERRORE FATALE lockn");
pthread_exit((void*)EXIT_FAILURE);
}
#define UNLOCK(l)
if (pthread_mutex_unlock(l) != 0)
{
fprintf(stderr, "ERRORE FATALE unlockn");
pthread_exit((void*)EXIT_FAILURE);
}
#define SIGNAL(c)
if (pthread_cond_signal(c) != 0)
{
fprintf(stderr, "ERRORE FATALE signaln");
pthread_exit((void*)EXIT_FAILURE);
}
#define WAIT(c, l)
if (pthread_cond_wait(c,l) != 0)
{
fprintf(stderr, "ERRORE FATALE waitn");
pthread_exit((void*)EXIT_FAILURE);
}
#define SCANF_STRINGA(stringa)
if(scanf("%s", stringa) == 0)
{
perror("Impossibile leggere la stringa");
exit(EXIT_FAILURE);
}
Код соединения h.:
#if !defined(CONN_H)
#define CONN_H
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/uio.h>
#include <sys/un.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#define SOCKNAME "./cs_sock"
#define MAXBACKLOG 108
/** Evita letture parziali
*
* retval -1 errore (errno settato)
* retval 0 se durante la lettura da fd leggo EOF
* retval size se termina con successo
*/
static inline int readn(long fd, void *buf, size_t size) {
size_t left = size;
int r;
char *bufptr = (char*)buf;
while(left>0) {
if ((r=read((int)fd ,bufptr,left)) == -1) {
if (errno == EINTR) continue;
return -1;
}
if (r == 0) return 0; // EOF
left -= r;
bufptr = r;
}
return size;
}
/** Evita scritture parziali
*
* retval -1 errore (errno settato)
* retval 0 se durante la scrittura la write ritorna 0
* retval 1 se la scrittura termina con successo
*/
static inline int writen(long fd, void *buf, size_t size) {
size_t left = size;
int r;
char *bufptr = (char*)buf;
while(left>0) {
if ((r=write((int)fd ,bufptr,left)) == -1) {
if (errno == EINTR) continue;
return -1;
}
if (r == 0) return 0;
left -= r;
bufptr = r;
}
return 1;
}
#endif /* CONN_H */
Комментарии:
1. Я не собираюсь читать ваш код. Вообще говоря, у вас была бы очередь на работу, в которую вы помещаете «задания». Поток, который не работает, пытается получить мьютекс в рабочей очереди, получить задание и освобождает мьютекс. Не забудьте также заблокировать мьютекс при размещении новых заданий в очереди.
Ответ №1:
Потоки обычно не выполняются до завершения; скорее, они ожидают в исполнительном цикле какого-либо триггерного события, такого как прибытие сообщения или семафора. Структура функции trpical thred такова:
void* func( void* arg )
{
// Thread initialisation
while( !terminated )
{
// Block waiting
...
// Do stuff (handle event/message for example)
...
}
// Clean-up
...
}
Сигнатура функции потока зависит от библиотеки потоков/ОС; например, для потоков pthreads это нормально. Также поток может работать бесконечно с while(1)
for(;;)
циклом или. Точный механизм завершения (т. е. terminate
в моем примере зависит от вас; это не требуется.
Циклы потоков часто являются реализациями конечных автоматов.
Конечно, у вас может быть поток выполнения до завершения, но в этом случае вам придется создавать новый поток для каждого события-это довольно неэффективно.
Вызов блокировки может быть таким же простым, как sleep()
вызов для периодических задач. Можно блокировать в нескольких местах на протяжении всей задачи, но это усложняет проектирование и отладку, и шаблон конечного автомата обычно является лучшим решением.
Ответ №2:
Теперь, когда Клиффорд предоставил общую информацию, я рассмотрю конкретные вопросы в вашей программе и предложу соответствующие изменения.
- Как она, ваша
clientFun
не удается обработать не только несколько (подряд) клиентских подключений, а также несколько отдельных сообщений от одного соединения (транспортная обработка сообщений от одного соединения в несколько потоков не разумно, так как клиент в любом случае не предназначен для отправки параллельных запросов); в качестве первого шага нам нужно петлю вокруг обработка одно сообщение, я. Эл. линийRETURN_SYSCALL(lenghtRead, read(fdAccept, buffer, DIM_BUFFER), "Errore: read(fdSkt_com, buffer, DIM_BUFFER)") … SYSCALL(writen(fdAccept, buffer, lenghtRead), "Errore: writen(fdSkt_com, buffer, lengthBuffer)")
поместите
for (; ; ) { … }
петлю и заменитеreturn NULL;
ее наbreak;
. - Теперь, чтобы обрабатывать несколько соединений, нам нужна внешняя
for (; ; ) { … }
петля вокругLOCK(amp;mutex) … SYSCALL(write(pipeW2M_Write, amp;fdAccept, sizeof(int)), "Erroer: write(pipeW2M_Write, amp;fdAccept, sizeof(int))")
- Для совместной работы с этими изменениями
run_server
необходимы корректировки. Серверу больше не нужно следить за принятым дескриптором сокета, поэтому изменитеFD_SET(fdSkt_accept, amp;set);
Для
gestioneCoda(maxFd, fdSkt_accept, set);
и удалите более поздний вызов
gestioneCoda
, чтобы принятый сокет немедленно передавался рабочему потоку. Наконец-то изменитьсяFD_SET(pipeFdSoccket, amp;set);
Для
close(pipeFdSoccket);
— когда соединение завершено, его не следует добавлять в список наблюдения, а скорее закрывать, чтобы избежать утечки дескриптора сокета.
Комментарии:
1. Но разве таким образом работник не обрабатывает все запросы только от одного клиента за раз? В то время как работник должен обрабатывать только один запрос от одного клиента и ждать других запросов (даже от других клиентов).
2. Тем временем я изменил код в соответствии с ответом Клиффорда; теперь, по крайней мере, по-видимому, код, похоже, работает: pastebin.com/11FhFB5y
3. Это полный код сервера: pastebin.com/MeNxjf1t
4. извините, я забыл удалить изменение из кода, теперь я обновил ссылку
5. Не мог бы он тем временем заняться другими клиентами? Например, в этом случае, если есть клиент, который не пишет в течение «очень долгого» времени, поток вместо ожидания может обработать запрос другого клиента.