Одновременное выполнение постоянного количества потоков в C

#c #multithreading #pthreads

#c #многопоточность #pthreads

Вопрос:

Я пытаюсь скопировать файлы из одного каталога в другой, используя pthreads . Каждый поток отвечает за копирование ровно одного файла. Максимальное количество потоков задается с помощью аргумента командной строки.

Что я хочу сделать, так это ЕСЛИ текущие потоки меньше максимальных потоков, создать поток для выполнения работы. В противном случае дождитесь текущих потоков и, когда один из них завершится, уменьшите количество текущих потоков.

Я не могу понять, как дождаться потока через pthread_join , не блокируя основной поток.

Вот что у меня есть до сих пор:

 #include <stdlib.h>
#include <stdio.h>
#include <dirent.h>
#include <stdlib.h>
#include <stdio.h>
#include <dirent.h>
#include <sys/stat.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#include <pthread.h>

#define MAXNAME 80
#define R_FLAGS O_RDONLY
#define W_FLAGS (O_WRONLY | O_CREAT)
#define W_PERMS (S_IRUSR | S_IWUSR)
#define KILOBYTE 0.001

void *copyfilepass(void *arg);

int main(int argc, char* argv[]){

    int num_threads;        //number of threads to execute in parallel
    int cur_threads = 0;    //number of threads currently executing
    char filename[MAXNAME]; //will temporarily hold the file name
    DIR* source;            //pointer to the source directory
    DIR* dest;              //pointer to the destination directory
    struct dirent* dentry;  //pointer to the internal structure of the source directory
    struct stat fst;        //stats for each file in a directory
    int error;
    void *status;

    //***BEGIN ERROR CHECKING***

    if (argc != 4) {
        fprintf(stderr, "Usage: %s sourceDir destDir numThreadsn", argv[0]);
        return 1; 
    } 

    //check if source directory name is too long
    if ( snprintf(filename, MAXNAME, "%s", argv[1]) == MAXNAME ) {
        fprintf(stderr, "Source directory name %s too longn", argv[1]); 
        return 1;
    }
    //check if destination directory name is too long
    if ( snprintf(filename, MAXNAME, "%s", argv[2]) == MAXNAME ) {
        fprintf(stderr, "Source directory name %s too longn", argv[2]); 
        return 1;
    }

    //check if we can successfully open the source directory
    if( (source = opendir(argv[1])) == NULL ) {
        fprintf(stderr, "Error opening source directory %sn", argv[1]); 
        return 1;
    }

    //check if we can successfully open the destination directory
    if( (dest = opendir(argv[2])) == NULL ) {
        fprintf(stderr, "Error opening destination directory %sn", argv[2]); 
        return 1;
    }

    //***END ERROR CHECKING***

    num_threads = atoi(argv[3]);

    while( (dentry = readdir(source)) != NULL ){
        //source path
        char* path = (char*)malloc(sizeof(char) * (strlen(dentry->d_name)   strlen(argv[1])   2)); //need '.'   '/'   ''
        sprintf(path, "%s%c%s", argv[1], '/', dentry->d_name);

        //destination path
        char* dest_path = (char*)malloc(sizeof(char) * (strlen(dentry->d_name)   strlen(argv[2])   2)); //need '.'   '/'   ''
        sprintf(dest_path, "%s%c%s", argv[2], '/', dentry->d_name);

        if(!stat(path, amp;fst)){ //stat() return 0 if successful
            if(S_ISREG(fst.st_mode)){

                int args[3];
                pthread_t tid;

                if ( (args[0] = open(path, R_FLAGS)) == -1 ) {
                    fprintf(stderr, "Failed to open source file %s: %sn", path, strerror(errno));
                    continue; 
                }
                if ( (args[1] = open(dest_path, W_FLAGS, W_PERMS)) == -1 ) {
                    fprintf(stderr, "Failed to open destination file %s: %sn", dest_path, strerror(errno));
                    continue;
                }

                if(cur_threads < num_threads) {

                      cur_threads;

                    if ( (error = pthread_create((amp;tid), NULL, copyfilepass, args)) ) {
                        --cur_threads;
                        fprintf(stderr, "Failed to create thread: %sn", strerror(error));
                        tid = pthread_self();    /* cannot be value for new thread */
                    }

                    printf("file: %.03fKB %sn", (fst.st_size * KILOBYTE), path);

                }

            }
        }
    }
    //close directory
    closedir(source);

    return 0;

}
  

Комментарии:

1. Если вы сохраняете список своих идентификаторов потоков и предоставляете потокам возможность указывать, когда они завершаются, вы можете не присоединяться к ним, пока они не будут выполнены, и в этом случае pthread_join() они должны вернуться практически сразу. Вы также можете использовать переменные условия, чтобы позволить основному потоку ожидать завершения потока при достижении максимального количества потоков. Вы также можете рассмотреть возможность использования фиксированного количества потоков и заставить их копировать по одному файлу за раз (каждый из очереди), пока не останется ни одного, и в этот момент нормально, если основной поток блокирует их соединение.

2. » … без блокирования основного потока » создайте другой поток, который возьмет на себя управление потоками, объединяя их, извлекая их результат. Для этого вам необходимо отслеживать все запущенные потоки. Только одна pthread_t переменная больше не будет работать.

3. тип, возвращаемый функциями выделения памяти кучи (malloc, calloc, realloc), является void* , поэтому может быть присвоен любому другому указателю. Приведение не требуется и просто загромождает код. Предлагаю удалить приведение при вызовах malloc()

4. в опубликованном коде отсутствует определение функции: copyfilepass()

5. коду не удается передать указатели на пути к именам входных и выходных файлов free() , когда с ними покончено.

Ответ №1:

Лучше, чем порождать и собирать потоки, просто создать пул фиксированного размера в начале и заставить их все использовать из рабочей очереди. Это уменьшит накладные расходы и упростит ваш код.

Кстати, использование потоков для решения этой проблемы может не повысить производительность, в зависимости от файловой системы, с которой вы работаете. Пища для размышлений.