Структурирование критической секции для многопоточности

#c #multithreading #file #producer-consumer #critical-section

#c #многопоточность #файл #производитель-потребитель #критическая секция

Вопрос:

У меня есть поток-производитель, который считывает каждый символ и его смещение из исходного файла, а затем записывает его в общий циклический буфер.

У меня также есть поток-потребитель, который считывает самый старый элемент в буфере, а затем записывает его в файл копирования.

Количество потоков производителя и потребителя указывается в качестве аргумента командной строки, также каждый раз, когда поток производителя / потребителя считывает или записывает в буфер, или читает или записывает в файл, определенная строка записывается в файл журнала.

Прямо сейчас у меня есть один критический раздел для потоков производителя и потребителя. Как я могу структурировать ее так, чтобы минимизировать время, затрачиваемое на критическую секцию (она становится медленной). Я думал о наличии нескольких критических разделов для моих потоков производителя и потребителя. Например, в моем потоке-производителе у меня может быть критический раздел для чтения из исходного файла и записи определенной строки (например, «чтение из файла») в файл журнала, и другой критический раздел для записи в буфер и записи определенной строки (например, «запись в буфер») в файл журнала. Но как это меняет ситуацию, если критические секции расположены одна за другой?

Вот мой поток производителя:

 void *INthread(void *arg)
{
    printf("INSIDE INthreadn");
    FILE *srcFile = (FILE*)arg;
    FILE *lp; // Log file pointers.
    int t_id = INid  ; // Thread number.
    int curOffset;
    BufferItem resu<
    struct timespec t;

    t.tv_sec = 0;
    t.tv_nsec = rand()%(TEN_MILLIS_IN_NANOS 1);
    nanosleep(amp;t, NULL);

    fseek(srcFile, 0, SEEK_CUR);
    curOffset = ftell(srcFile); // Save the current byte offset.

    fseek(srcFile, 0, SEEK_END);
    if(len == 0) // If len hasnt been set by the first IN thread yet.
        len = ftell(srcFile); // Save the length of the srcFile (number of chars).

    fseek(srcFile, curOffset, SEEK_SET); // Revert file pointer to curOffset.
    printf("ID %d number of bytes %dn", t_id, len);

    int offs;
    int ch;
    while(len > 0) // Go through each byte/char in file.
    {

        /*** CRITICAL SECTION ********************************/
        sem_wait(amp;empty); /* acquire the empty lock */
        pthread_mutex_lock( amp;pt_mutex );
        if(len > 0){
            fseek(srcFile, 0, SEEK_CUR);
            if((offs = ftell(srcFile)) != -1)
                result.offset = offs;     /* get position of byte in file */
            if((ch = fgetc(srcFile)) != EOF)
                result.data = ch;       /* read byte from file */

            // Write to log file "read_byte PTn Ox Bb I-1".
            if (!(lp = fopen(log, "a"))) {
                printf("could not open log file for writing");
            }
            if(fprintf(lp, "read_byte PT%d O%d B%d I-1n", t_id, offs, ch) < 0){
                printf("could not write to log file");
            }
            printf("ID %d --- offset %d char %c len%dn", t_id, result.offset, result.data, len);
            addItem(amp;cBuff, amp;result);

            // Write to log file "produce PTn Ox Bb Ii  ".
            if(fprintf(lp, "produce PT%d O%d B%d I%dn", t_id, offs, ch, cBuff.lastInd) < 0){
                printf("could not write to log file");
            }
            fclose(lp);

            len--;
        }
        pthread_mutex_unlock( amp;pt_mutex );
        sem_post(amp;full); /* signal full */
        /*** END CRITICAL SECTION ********************************/

        t.tv_sec = 0;
        t.tv_nsec = rand()%(TEN_MILLIS_IN_NANOS 1);
        nanosleep(amp;t, NULL);
    }

    inJoin[t_id] = 1; // This IN thread is ready to be joined.
    printf("EXIT INthreadn");
    pthread_exit(0);
}
  

Вот мой потребительский поток:

 void *OUTthread(void *arg)
{
    printf("INSIDE OUTthreadn");
    struct timespec t;
    t.tv_sec = 0;
    t.tv_nsec = rand()%(TEN_MILLIS_IN_NANOS 1);
    nanosleep(amp;t, NULL);

    int processing = 1;
    FILE *targetFile, *lp;
    BufferItem OUTresu<
    int t_id = OUTid  ;
    int offs, ch;
    int numBytes = len;

    while(processing){

        /*** CRITICAL SECTION ********************************/
        sem_wait(amp;full); /* acquire the full lock */
        pthread_mutex_lock( amp;pt_mutex );
        cbRead(amp;cBuff, amp;OUTresult);
        offs = OUTresult.offset;
        ch = OUTresult.data;

        if (!(lp = fopen(log, "a"))) {
            printf("could not open log file for writing");
        }
        // Write to log file "consume CTn Ox Bb Ii".
        if(fprintf(lp, "consume CT%d O%d B%d I%dn", t_id, offs, ch, cBuff.lastInd) < 0){
            printf("could not write to log file");
        }

        printf("From buffer: offset %d char %cn", OUTresult.offset, OUTresult.data);
        if (!(targetFile = fopen(arg, "r "))) {
            printf("could not open output file for writing");
        }
        if (fseek(targetFile, OUTresult.offset, SEEK_SET) == -1) {
            fprintf(stderr, "error setting output file position to %un",
                    (unsigned int) OUTresult.offset);
            exit(-1);
        }
        if (fputc(OUTresult.data, targetFile) == EOF) {
            fprintf(stderr, "error writing byte %d to output filen", OUTresult.data);
            exit(-1);
        }

        // Write to log file "write_byte CTn Ox Bb I-1".
        if(fprintf(lp, "write_byte CT%d O%d B%d I-1n", t_id, offs, ch) < 0){
            printf("could not write to log file");
        }
        fclose(lp);
        fclose(targetFile);

        pthread_mutex_unlock( amp;pt_mutex );
        sem_post(amp;empty); /* signal empty */
        /*** END CRITICAL SECTION ********************************/

        t.tv_sec = 0;
        t.tv_nsec = rand()%(TEN_MILLIS_IN_NANOS 1);
        nanosleep(amp;t, NULL);

    }

    outJoin[t_id] = 1; // This OUT thread is ready to be joined.
    printf("EXIT OUTthreadn");
    pthread_exit(0);

}
  

Ответ №1:

Я бы организовал это так, чтобы ваш код принимал форму:

 ... processing ...
mutex lock
resource read / write
mutex unlock
... continue processing
  

вокруг каждого ресурса, который должен быть общим. Таким образом, в конечном итоге у вас будет несколько мьютексов, один для производителей, читающих файл, другой для производителей, записывающих в циклический буфер. Один для потребителей, считывающих из циклического буфера … доб. И каждый из них будет инкапсулировать одну операцию чтения / записи в свой уважаемый ресурс.

Я бы также сделал так, чтобы ваш циклический буфер не мог перезаписывать сам себя, иначе вы не сможете одновременно читать и записывать в буфер, создавая только один мьютекс для обеих операций чтения / записи, эффективно увеличивая время ожидания потока потребителя и производителя.

Возможно, это не самое «лучшее» решение, но оно должно обрабатываться быстрее, чем при гигантской блокировке большей части вашего кода.