#c #multithreading #posix #semaphore #sigabrt
#c #многопоточность #posix #семафор #sigabrt
Вопрос:
Я работаю над школьным проектом, в котором мы должны создать многопоточный веб-сервер. У меня возникла проблема, когда при вызове sem_wait
моего семафора (который должен быть инициализирован в 0, но, похоже, уже sem_post()
изменен на 1). Я получаю SIGABRT.
Я прилагаю свой код ниже и добавляю комментарий к строке, которая вызывает мою проблему. Я провел несколько часов с отладчиком без особой удачи.
#include <iostream>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <netinet/in.h>
#include <netdb.h>
#include <string>
#include <string.h>
#include <iostream>
#include <fcntl.h>
#include <errno.h>
#include <pthread.h>
#include <vector>
#include <semaphore.h>
#include <stdio.h>
#include <cstdlib>
#include <strings.h>
#define PORTNUM 7000
#define NUM_OF_THREADS 5
#define oops(msg) { perror(msg); exit(1);}
#define FCFS 0
#define SJF 1;
void bindAndListen();
void acceptConnection(int socket_file_descriptor);
void* dispatchJobs(void*);
void* replyToClient(void* pos);
//holds ids of worker threads
pthread_t threads[NUM_OF_THREADS];
//mutex variable for sleep_signal_cond
pthread_mutex_t sleep_signal_mutex[NUM_OF_THREADS];
//holds the condition variables to signal when the thread should be unblocked
pthread_cond_t sleep_signal_cond[NUM_OF_THREADS];
//mutex for accessing sleeping_thread_list
pthread_mutex_t sleeping_threads_mutex = PTHREAD_MUTEX_INITIALIZER;
//list of which threads are sleeping so they can be signaled and given a job
std::vector<bool> *sleeping_threads_list = new std::vector<bool>();
//number of threads ready for jobs
sem_t available_threads;
sem_t waiting_jobs;
//holds requests waiting to be given to one of the threads for execution
//request implemented as int[3] with int[0]== socket_descriptor int[1]== file_size int[2]== file_descriptor of requested file
//if file_size == 0 then HEAD request
std::vector<std::vector<int> >* jobs = new std::vector<std::vector<int> >();
pthread_mutex_t jobs_mutex = PTHREAD_MUTEX_INITIALIZER;
int main (int argc, char * const argv[]) {
//holds id for thread responsible for removing jobs from ready queue and assigning them to worker thread
pthread_t dispatcher_thread;
//initializes semaphores
if(sem_init(amp;available_threads, 0, NUM_OF_THREADS) != 0){
oops("Error Initializing Semaphore");
}
if(sem_init(amp;waiting_jobs, 0, 0) !=0){
oops("Error Initializing Semaphore");
}
//initializes condition variables and guarding mutexes
for(int i=0; i<NUM_OF_THREADS; i ){
pthread_cond_init(amp;sleep_signal_cond[i], NULL);
pthread_mutex_init(amp;sleep_signal_mutex[i], NULL);
}
if(pthread_create(amp;dispatcher_thread, NULL, dispatchJobs, (void*)NULL) !=0){
oops("Error Creating Distributer Thread");
}
for (int i=0; i<NUM_OF_THREADS; i ) {
pthread_mutex_lock(amp;sleeping_threads_mutex);
printf("before");
sleeping_threads_list->push_back(true);
printf("after");
pthread_mutex_unlock(amp;sleeping_threads_mutex);
}
printf("here");
for (int i=0; i<NUM_OF_THREADS; i ) {
//creates threads and stores ID in threads
if(pthread_create(amp;threads[i], NULL, replyToClient, (void*)i) !=0){
oops("Error Creating Thread");
}
}
/*
if(sem_init(amp;available_threads, 0, NUM_OF_THREADS) !=0){
oops("Error Initializing Semaphore");
}
if(sem_init(amp;waiting_jobs, 0, 0) !=0){ //this is the semaphore thats used in the sem_wait
oops("Error Initializing Semaphore");
}*/
bindAndListen();
}
//binds to socket and listens for connections
//being done by main thead
void bindAndListen(){
struct sockaddr_in saddr;
struct hostent *hp;
char hostname[256];
int sock_id, sock_fd;
gethostname(hostname, 256);
hp = gethostbyname(hostname);
bzero(amp;saddr, sizeof(saddr));
//errno = 0;
bcopy(hp->h_addr, amp;saddr.sin_addr, hp->h_length);
saddr.sin_family = AF_INET;
saddr.sin_port = htons(PORTNUM);
saddr.sin_addr.s_addr = INADDR_ANY;
sock_id = socket(AF_INET, SOCK_STREAM, 0);
if(sock_id == -1){
oops("socket");
printf("socket");
}
if(bind(sock_id, (const sockaddr*)amp;saddr, sizeof(saddr)) ==0){
if(listen(sock_id, 5) ==-1){
oops("listen");
}
//each time a new connection is accepted, get file info and push to ready queue
while(1){
int addrlen = sizeof(saddr);
sock_fd = accept(sock_id, (sockaddr*)amp;saddr, (socklen_t*)amp;addrlen);
if (sock_fd > 0) {
acceptConnection(sock_fd);
}else {
oops("Error Accepting Connection");
}
}
}else{
oops("there was an error binding to socket");
}
}// end of bindAndListen()
//accepts connection and gets file info of requested file
//being done by main thread
void acceptConnection(int sock_fd){
printf("**Server: A new client connected!");
//only using loop so on error we can break out on error
while(true){
//used to hold input from client
char* inputBuff = new char[BUFSIZ];
int slen = read(sock_fd, inputBuff, BUFSIZ);
//will sit on space between HEAD/GET and path
int pos1 = 0;
//will sit on space between path and HTTP version
int pos2 = 0;
//need duplicate ptr so we can manipulate one in the loop
char* buffPtr = inputBuff;
//parses client input breaks up query by spaces
for(int i=0; i<slen; i ){
if(*buffPtr == ' '){
if (pos1 == 0) {
pos1 = i;
}else {
pos2 = i;
break;
}
}
buffPtr ;
}
if((pos1 - pos2) >=0){
std::string str = "Invalid Query";
write(sock_fd, str.c_str(), strlen(str.c_str()));
break;
}
printf("slen length %dn", slen);
std::string* method = new std::string(inputBuff, pos1);
printf("method length %lun",method->length());
//increment the ptr for buff to the starting pos of the path
inputBuff = pos1;
printf("pos2 - pos1 %dn", (pos2 - pos1));
printf("pos1 = %d pos2 = %dn", pos1, pos2);
std::string* path = new std::string(inputBuff, (pos2 - pos1));
printf("path length %lun", path->length());
printf("part1 %sn", method->c_str());
printf("part2 %sn", path->c_str());
//opens file requested by client
int fd = open(path->c_str(), O_RDONLY);
if(fd < 0){
std::string* error = new std::string("Error Opening File");
*error = *path std::string(strerror(errno), strlen(strerror(errno)));
write(sock_fd, error->c_str(), strlen(error->c_str()));
break;
}
int file_size;
if(method->compare("GET") == 0){
//gets file info and puts the resulting struct in file_info
struct stat file_info;
if(fstat(fd, amp;file_info) !=0){
oops("Error getting file info");
}
file_size = file_info.st_size;
}else if(method->compare("HEAD")){
file_size = 0;
}else{
write(sock_fd, "Invalid Query", strlen("Invalid Query"));
break;
}
//job to be pushed to ready queue
std::vector<int> job;
job.push_back(sock_fd);
job.push_back(file_size);
job.push_back(fd);
//check mutex guarding the ready queue
pthread_mutex_lock(amp;jobs_mutex);
//push job to back of ready queue
jobs->push_back(job);
//unlock mutex guarding the ready queue
pthread_mutex_unlock(amp;jobs_mutex);
//increment number of jobs in ready queue
sem_post(amp;waiting_jobs);
} //end of while(true)
// we only end up here if there was an error
fflush(stdout);
close(sock_fd);
}// end of acceptConnection()
//routine run by dispather thread
void *dispatchJobs(void*){
while(true){
//wait for a thread to be available to execute a job
sem_wait(amp;available_threads);
//wait for a job to be waiting in the ready queue
sem_wait(amp;waiting_jobs); //this is the line thats crashing
//aquire lock to check which threads are waiting
pthread_mutex_lock(amp;sleeping_threads_mutex);
//go through list of threads to see which is waiting
for(int i=0; i<sleeping_threads_list->size(); i ){
if(sleeping_threads_list->at(i)){
//unlocks lock for access to list of waiting threads
pthread_mutex_unlock(amp;sleeping_threads_mutex);
//allows us access to the list of condition variables to signal the thread to resume execution
pthread_mutex_lock(amp;sleep_signal_mutex[i]);
pthread_cond_signal(amp;sleep_signal_cond[i]);
pthread_mutex_unlock(amp;sleep_signal_mutex[i]);
}
}
}//end of while(true)
}//end of dispatchJobs()
//sends file or metadata to client
//run by worker thread
//pos is position of condition variable that it waits to be signaled in the sleep_signal_cond[] array
void* replyToClient(void* pos){
int position = (long)pos;
while(true){
//waits for dispather thread to signal it
pthread_mutex_lock(amp;sleep_signal_mutex[position]);
pthread_cond_wait(amp;sleep_signal_cond[position], amp;sleep_signal_mutex[position]);
pthread_mutex_unlock(amp;sleep_signal_mutex[position]);
//lock mutex to get job to be executed
pthread_mutex_lock(amp;jobs_mutex);
std::vector<int> job = jobs->front();
//removes job from front of vector
jobs->erase(jobs->begin());
//releases mutex
pthread_mutex_unlock(amp;jobs_mutex);
//socket file descriptor, used for writing to socket
int sock_fd =job[0];
int file_size = job[1];
//file descriptor for requested job
int fd = job[2];
//holds output to be written to socket
char* outputBuffer = new char[BUFSIZ];
//GET request, send file
if(file_size !=0){
int readResult = 0;
while ((readResult = read(fd, outputBuffer, BUFSIZ)) > 0) {
if(write(sock_fd, outputBuffer, readResult) != readResult){
printf("We may have a write error");
}
}
if(readResult < 0){
oops("Error Reading File");
}
if(readResult == 0){
printf("finished sending file");
}
}else{ // HEAD request
}
//increment number of available threads
sem_post(amp;available_threads);
}
}// end of replyToClient()
Комментарии:
1. Можно ли сократить ваш пример? ( sscce.org )
Ответ №1:
Проверьте еще раз всю логику кода — можно добраться сюда:
pthread_mutex_lock(amp;jobs_mutex);
std::vector<int> job = jobs->front();
//removes job from front of vector
jobs->erase(jobs->begin());
//releases mutex
pthread_mutex_unlock(amp;jobs_mutex);
с jobs->size () == 0
помощью , в этом случае front()
и erase()
вызвать неопределенное поведение, которое вполне может привести к эффектам, которые вы наблюдаете.
Проверьте, продолжает ли ваша программа аварийно завершать работу после следующего изменения:
//lock mutex to get job to be executed
pthread_mutex_lock(amp;jobs_mutex);
if (jobs->size () == 0)
{
pthread_mutex_unlock (amp;jobs_mutex);
continue;
}
std::vector<int> job = jobs->front();
//removes job from front of vector
jobs->erase(jobs->begin());
//releases mutex
pthread_mutex_unlock(amp;jobs_mutex);
Ответ №2:
Я не использовал семафоры POSIX, но я считаю, что это то, что происходит. Я знаком только с семафорами ядра Linux, и вы не упоминаете свою систему. 3-й параметр функции инициализации, вероятно, задает переменную count . Вы устанавливаете его в 0 (= занято, но другие процессы не ожидают). Вероятно, функция ожидания вызывает функцию down() , которая начинается с уменьшения переменной count на 1: до -1, что означает, что семафор, который вы собираетесь использовать, теперь заблокирован. В вашей программе нет ничего, что могло бы когда-либо разблокировать ее, я полагаю (из просмотра вашего кода — он довольно длинный), так что у вас проблемы. Попробуйте установить для него значение 1 при инициализации. Это может быть все, что нужно.
Комментарии:
1. Что ж, вы действительно разблокируете его, но в областях кода, которые сейчас не достигнуты.