POSIX sem_wait() SIGABRT

#c #multithreading #posix #semaphore #sigabrt

#c #многопоточность #posix #семафор #sigabrt

Вопрос:

Я работаю над школьным проектом, в котором мы должны создать многопоточный веб-сервер. У меня возникла проблема, когда при вызове sem_wait моего семафора (который должен быть инициализирован в 0, но, похоже, уже sem_post() изменен на 1). Я получаю SIGABRT.

Я прилагаю свой код ниже и добавляю комментарий к строке, которая вызывает мою проблему. Я провел несколько часов с отладчиком без особой удачи.

 #include <iostream>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <netinet/in.h>
#include <netdb.h>
#include <string>
#include <string.h>
#include <iostream>
#include <fcntl.h>
#include <errno.h>
#include <pthread.h>
#include <vector>
#include <semaphore.h>
#include <stdio.h>
#include <cstdlib>
#include <strings.h>

#define PORTNUM 7000
#define NUM_OF_THREADS 5
#define oops(msg) { perror(msg); exit(1);}
#define FCFS 0
#define SJF 1;

void bindAndListen();
void acceptConnection(int socket_file_descriptor);
void* dispatchJobs(void*);
void* replyToClient(void* pos);

//holds ids of worker threads
pthread_t threads[NUM_OF_THREADS];

//mutex variable for sleep_signal_cond
pthread_mutex_t sleep_signal_mutex[NUM_OF_THREADS];
//holds the condition variables to signal when the thread should be unblocked
pthread_cond_t sleep_signal_cond[NUM_OF_THREADS];

//mutex for accessing sleeping_thread_list
pthread_mutex_t sleeping_threads_mutex = PTHREAD_MUTEX_INITIALIZER;
//list of which threads are sleeping so they can be signaled and given a job
std::vector<bool> *sleeping_threads_list = new std::vector<bool>();

//number of threads ready for jobs
sem_t available_threads;
sem_t waiting_jobs;


//holds requests waiting to be given to one of the threads for execution
//request implemented as int[3] with int[0]== socket_descriptor int[1]== file_size int[2]== file_descriptor of requested file
//if file_size == 0 then HEAD request
std::vector<std::vector<int> >* jobs = new std::vector<std::vector<int> >();

pthread_mutex_t jobs_mutex = PTHREAD_MUTEX_INITIALIZER;


int main (int argc, char * const argv[]) {
    //holds id for thread responsible for removing jobs from ready queue and assigning them to worker thread
    pthread_t dispatcher_thread;

    //initializes semaphores
    if(sem_init(amp;available_threads, 0, NUM_OF_THREADS) != 0){
        oops("Error Initializing Semaphore");
    }

    if(sem_init(amp;waiting_jobs, 0, 0) !=0){
        oops("Error Initializing Semaphore");
    }

    //initializes condition variables and guarding mutexes
    for(int i=0; i<NUM_OF_THREADS; i  ){
        pthread_cond_init(amp;sleep_signal_cond[i], NULL);
        pthread_mutex_init(amp;sleep_signal_mutex[i], NULL);
    }

    if(pthread_create(amp;dispatcher_thread, NULL, dispatchJobs, (void*)NULL) !=0){
        oops("Error Creating Distributer Thread");
    }

    for (int i=0; i<NUM_OF_THREADS; i  ) {
        pthread_mutex_lock(amp;sleeping_threads_mutex);
        printf("before");
        sleeping_threads_list->push_back(true);
        printf("after");
        pthread_mutex_unlock(amp;sleeping_threads_mutex);
    }

    printf("here");
    for (int i=0; i<NUM_OF_THREADS; i  ) {
        //creates threads and stores ID in threads
        if(pthread_create(amp;threads[i], NULL, replyToClient, (void*)i) !=0){
            oops("Error Creating Thread");
        }
    }

    /*
    if(sem_init(amp;available_threads, 0, NUM_OF_THREADS) !=0){
        oops("Error Initializing Semaphore");
    }

    if(sem_init(amp;waiting_jobs, 0, 0) !=0){                 //this is the semaphore thats used in the sem_wait
        oops("Error Initializing Semaphore");
    }*/

    bindAndListen();
}


//binds to socket and listens for connections
//being done by main thead
void bindAndListen(){
    struct sockaddr_in saddr;
    struct hostent *hp;
    char hostname[256];
    int sock_id, sock_fd;

    gethostname(hostname, 256);
    hp = gethostbyname(hostname);
    bzero(amp;saddr, sizeof(saddr));

    //errno = 0;

    bcopy(hp->h_addr, amp;saddr.sin_addr, hp->h_length);

    saddr.sin_family = AF_INET;
    saddr.sin_port = htons(PORTNUM);
    saddr.sin_addr.s_addr = INADDR_ANY;

    sock_id = socket(AF_INET, SOCK_STREAM, 0);

    if(sock_id == -1){
        oops("socket");
        printf("socket");
    }

    if(bind(sock_id, (const sockaddr*)amp;saddr, sizeof(saddr)) ==0){

        if(listen(sock_id, 5) ==-1){
            oops("listen");
        }

        //each time a new connection is accepted, get file info and push to ready queue
        while(1){
            int addrlen = sizeof(saddr);
            sock_fd = accept(sock_id, (sockaddr*)amp;saddr, (socklen_t*)amp;addrlen);
            if (sock_fd > 0) {
                acceptConnection(sock_fd);
            }else {
                oops("Error Accepting Connection");
            }
        }
    }else{
        oops("there was an error binding to socket");
    }
}// end of bindAndListen()


//accepts connection and gets file info of requested file
//being done by main thread
void acceptConnection(int sock_fd){
    printf("**Server: A new client connected!");

    //only using loop so on error we can break out on error
    while(true){
        //used to hold input from client
        char* inputBuff = new char[BUFSIZ];
        int slen = read(sock_fd, inputBuff, BUFSIZ);

        //will sit on space between HEAD/GET and path
        int pos1 = 0;
        //will sit on space between path and HTTP version
        int pos2 = 0;

        //need duplicate ptr so we can manipulate one in the loop
        char* buffPtr = inputBuff;
        //parses client input  breaks up query by spaces
        for(int i=0; i<slen; i  ){
            if(*buffPtr == ' '){
                if (pos1 == 0) {
                    pos1 = i;
                }else {
                    pos2 = i;
                    break;
                }
            }
            buffPtr  ;
        }

        if((pos1 - pos2) >=0){
            std::string str = "Invalid Query";
            write(sock_fd, str.c_str(), strlen(str.c_str()));
            break;
        }

        printf("slen length %dn", slen);

        std::string* method = new std::string(inputBuff, pos1);

        printf("method length %lun",method->length());

        //increment the ptr for buff to the starting pos of the path
        inputBuff  =   pos1;

        printf("pos2 - pos1 %dn", (pos2 - pos1));

        printf("pos1 = %d  pos2 = %dn", pos1, pos2);

        std::string* path = new std::string(inputBuff, (pos2 - pos1));

        printf("path length %lun", path->length());

        printf("part1 %sn", method->c_str());

        printf("part2 %sn", path->c_str());

        //opens file requested by client
        int fd = open(path->c_str(), O_RDONLY);
        if(fd < 0){
            std::string* error = new std::string("Error Opening File");
            *error  = *path   std::string(strerror(errno), strlen(strerror(errno)));
            write(sock_fd, error->c_str(), strlen(error->c_str()));
            break;
        }

        int file_size;
        if(method->compare("GET") == 0){
            //gets file info and puts the resulting struct in file_info
            struct stat file_info;
            if(fstat(fd, amp;file_info) !=0){
                oops("Error getting file info");
            }
            file_size = file_info.st_size;
        }else if(method->compare("HEAD")){
            file_size = 0;
        }else{
            write(sock_fd, "Invalid Query", strlen("Invalid Query"));
            break;
        }

        //job to be pushed to ready queue
        std::vector<int> job;
        job.push_back(sock_fd);
        job.push_back(file_size);
        job.push_back(fd);

        //check mutex guarding the ready queue
        pthread_mutex_lock(amp;jobs_mutex);
        //push job to back of ready queue
        jobs->push_back(job);
        //unlock mutex guarding the ready queue
        pthread_mutex_unlock(amp;jobs_mutex);

        //increment number of jobs in ready queue
        sem_post(amp;waiting_jobs);

    } //end of while(true)
      // we only end up here if there was an error
    fflush(stdout);
    close(sock_fd);
}// end of acceptConnection()


//routine run by dispather thread
void *dispatchJobs(void*){
    while(true){
        //wait for a thread to be available to execute a job
        sem_wait(amp;available_threads);
        //wait for a job to be waiting in the ready queue
        sem_wait(amp;waiting_jobs);                    //this is the line thats crashing
        //aquire lock to check which threads are waiting
        pthread_mutex_lock(amp;sleeping_threads_mutex);
        //go through list of threads to see which is waiting
        for(int i=0; i<sleeping_threads_list->size(); i  ){
            if(sleeping_threads_list->at(i)){
                //unlocks lock for access to list of waiting threads
                pthread_mutex_unlock(amp;sleeping_threads_mutex);
                //allows us access to the list of condition variables to signal the thread to resume execution
                pthread_mutex_lock(amp;sleep_signal_mutex[i]);
                pthread_cond_signal(amp;sleep_signal_cond[i]);
                pthread_mutex_unlock(amp;sleep_signal_mutex[i]);
            }
        }

    }//end of while(true)
}//end of dispatchJobs()


//sends file or metadata to client
//run by worker thread
//pos is position of condition variable that it waits to be signaled in the sleep_signal_cond[] array
void* replyToClient(void* pos){
    int position = (long)pos;
    while(true){
        //waits for dispather thread to signal it
        pthread_mutex_lock(amp;sleep_signal_mutex[position]);
        pthread_cond_wait(amp;sleep_signal_cond[position], amp;sleep_signal_mutex[position]);
        pthread_mutex_unlock(amp;sleep_signal_mutex[position]);


        //lock mutex to get job to be executed
        pthread_mutex_lock(amp;jobs_mutex);
        std::vector<int> job = jobs->front();
        //removes job from front of vector
        jobs->erase(jobs->begin());
        //releases mutex
        pthread_mutex_unlock(amp;jobs_mutex);

        //socket file descriptor, used for writing to socket
        int sock_fd =job[0];
        int file_size = job[1];
        //file descriptor for requested job
        int fd = job[2];

        //holds output to be written to socket
        char* outputBuffer = new char[BUFSIZ];

        //GET request, send file
        if(file_size !=0){
            int readResult = 0;
            while ((readResult = read(fd, outputBuffer, BUFSIZ)) > 0) {
                if(write(sock_fd, outputBuffer, readResult) != readResult){
                    printf("We may have a write error");
                }
            }
            if(readResult < 0){
                oops("Error Reading File");
            }
            if(readResult == 0){
                printf("finished sending file");
            }
        }else{    // HEAD request

        }
        //increment number of available threads
        sem_post(amp;available_threads);
    }
}// end of replyToClient()
  

Комментарии:

1. Можно ли сократить ваш пример? ( sscce.org )

Ответ №1:

Проверьте еще раз всю логику кода — можно добраться сюда:

 pthread_mutex_lock(amp;jobs_mutex);
std::vector<int> job = jobs->front();
//removes job from front of vector
jobs->erase(jobs->begin());
//releases mutex
pthread_mutex_unlock(amp;jobs_mutex);
  

с jobs->size () == 0 помощью , в этом случае front() и erase() вызвать неопределенное поведение, которое вполне может привести к эффектам, которые вы наблюдаете.

Проверьте, продолжает ли ваша программа аварийно завершать работу после следующего изменения:

 //lock mutex to get job to be executed
pthread_mutex_lock(amp;jobs_mutex);
if (jobs->size () == 0)
  {
    pthread_mutex_unlock (amp;jobs_mutex);
    continue;
  }
std::vector<int> job = jobs->front();
//removes job from front of vector
jobs->erase(jobs->begin());
//releases mutex
pthread_mutex_unlock(amp;jobs_mutex);
  

Ответ №2:

Я не использовал семафоры POSIX, но я считаю, что это то, что происходит. Я знаком только с семафорами ядра Linux, и вы не упоминаете свою систему. 3-й параметр функции инициализации, вероятно, задает переменную count . Вы устанавливаете его в 0 (= занято, но другие процессы не ожидают). Вероятно, функция ожидания вызывает функцию down() , которая начинается с уменьшения переменной count на 1: до -1, что означает, что семафор, который вы собираетесь использовать, теперь заблокирован. В вашей программе нет ничего, что могло бы когда-либо разблокировать ее, я полагаю (из просмотра вашего кода — он довольно длинный), так что у вас проблемы. Попробуйте установить для него значение 1 при инициализации. Это может быть все, что нужно.

Комментарии:

1. Что ж, вы действительно разблокируете его, но в областях кода, которые сейчас не достигнуты.