Гонка данных при создании вложенного потока

#c #multithreading #pthreads #valgrind

#c #многопоточность #pthreads #valgrind

Вопрос:

При попытке создать вложенные потоки helgrind сообщает о нескольких разных типах гонок данных.

 ==4429== Possible data race during write of size 8 at 0x5673830 by thread #13
==4429== Locks held: none
==4429==    at 0x4C379EF: memset (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==4429==    by 0x5060C85: get_cached_stack (allocatestack.c:250)
==4429==    by 0x5060C85: allocate_stack (allocatestack.c:501)
==4429==    by 0x5060C85: pthread_create@@GLIBC_2.2.5 (pthread_create.c:537)
==4429==    by 0x4C32BF7: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==4429==    by 0x4022D7: read_group (c_esp.c:318)
==4429==    by 0x4C32DF6: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==4429==    by 0x50606A9: start_thread (pthread_create.c:333)
==4429==  Address 0x5673830 is 16 bytes inside a block of size 560 alloc'd
==4429==    at 0x4C2EFB5: calloc (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==4429==    by 0x40134C4: allocate_dtv (dl-tls.c:322)
==4429==    by 0x40134C4: _dl_allocate_tls (dl-tls.c:544)
==4429==    by 0x50610D2: allocate_stack (allocatestack.c:588)
==4429==    by 0x50610D2: pthread_create@@GLIBC_2.2.5 (pthread_create.c:537)
==4429==    by 0x4C32BF7: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==4429==    by 0x4022D7: read_group (c_esp.c:318)
==4429==    by 0x4C32DF6: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==4429==    by 0x50606A9: start_thread (pthread_create.c:333)
==4429==  Block was alloc'd by thread #3


==4429== Possible data race during write of size 1 at 0x724368F by thread #13
==4429== Locks held: none
==4429==    at 0x4C3856C: mempcpy (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==4429==    by 0x40132F6: _dl_allocate_tls_init (dl-tls.c:520)
==4429==    by 0x5060C8D: get_cached_stack (allocatestack.c:253)
==4429==    by 0x5060C8D: allocate_stack (allocatestack.c:501)
==4429==    by 0x5060C8D: pthread_create@@GLIBC_2.2.5 (pthread_create.c:537)
==4429==    by 0x4C32BF7: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==4429==    by 0x4022D7: read_group (c_esp.c:318)
==4429==    by 0x4C32DF6: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==4429==    by 0x50606A9: start_thread (pthread_create.c:333)
==4429== 
==4429== This conflicts with a previous write of size 1 by thread #4
==4429== Locks held: none
==4429==    at 0x5060612: start_thread (pthread_create.c:265)
==4429==  Address 0x724368f is in a rw- anonymous segment


==4429== Possible data race during read of size 8 at 0x7243728 by thread #14
==4429== Locks held: none
==4429==    at 0x40178C: read_record (c_esp.c:171)
==4429==    by 0x4C32DF6: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==4429==    by 0x50606A9: start_thread (pthread_create.c:333)
==4429== 
==4429== This conflicts with a previous write of size 8 by thread #13
==4429== Locks held: none
==4429==    at 0x5060DB7: pthread_create@@GLIBC_2.2.5 (pthread_create.c:589)
==4429==    by 0x4C32BF7: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==4429==    by 0x4022D7: read_group (c_esp.c:318)
==4429==    by 0x4C32DF6: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==4429==    by 0x50606A9: start_thread (pthread_create.c:333)
==4429==  Address 0x7243728 is in a rw- anonymous segment
  

Упомянутые строки:

  • 318: pthread_create(pth_array k, amp;attrs, read_record, args_array k);

  • 171: void *read_record(void *_args){

Итак, кажется, что при создании потоков происходит гонка данных? Возможно ли, что выделенный стек для потоков в «разных ветвях» перекрывается? Или я где-то облажался?

Я уже пытался уменьшить размер стека для каждого потока, но безуспешно.

Как и было запрошено, минимальный воспроизводимый пример:

 #include <pthread.h>

void *func_b(void *args){
    return args;
}

void *func_a(void *args){
    pthread_t pth[5];
    int i;
    for(i = 0; i < 5; i  ){
        pthread_create(amp;pth[i], NULL, func_b, NULL);
    }

    for(i = 0; i < 5; i  ){
        pthread_join(pth[i], NULL);
    }
    return args;
}

int main(void){
    pthread_t pth[100];
    int i;
    for(i = 0; i < 100; i  ){
        pthread_create(amp;pth[i], NULL, func_a, NULL);
    }

    for(i = 0; i < 100; i  ){
        pthread_join(pth[i], NULL);
    }
}
  

Полный вывод из helgrind находится здесь, и весь соответствующий код находится рядом. Этот фрагмент переходит из строки 171 в строку 477.

 void *read_record(void *_args){
    struct _thread_args *args = (struct _thread_args *)_args;
    uint32_t data_size = 0;

    args->item->record->type = calloc(5, sizeof *args->item->record->type);
    memcpy(args->item->record->type, *args->map, 4);
    *args->map  = 4;
    memcpy(amp;data_size, *args->map, 4);
    *args->map  = 4;
    memcpy(amp;args->item->record->flags, *args->map, 4);
    *args->map  = 4;
    memcpy(amp;args->item->record->id, *args->map, 4);
    *args->map  = 4;
    memcpy(amp;args->item->record->revision, *args->map, 4);
    *args->map  = 4;
    memcpy(amp;args->item->record->version, *args->map, 2);
    *args->map  = 2;
    memcpy(amp;args->item->record->unknown, *args->map, 2);
    *args->map  = 2;

    args->item->record->misc_data = NULL;
    args->item->record->children = NULL;
    args->item->record->last = NULL;
    // args->item->record->next = NULL;
    // args->item->record->previous = NULL;
    args->item->record->_proxy = NULL;
    args->item->record->compression_level = 0x7f;  // some invalid number, doesn't matter

    if(args->parse_records){
        args->item->record->is_parsed = TRUE;
        if(args->item->record->flags amp; 0x00040000){
            uLongf uncompressed_size = 0;
            memcpy(amp;uncompressed_size, *args->map, 4);
            *args->map  = 4;

            if(*(*args->map   1) amp; (1 << 6)){
                if(*(*args->map   1) amp; (1 << 7)){
                    args->item->record->compression_level = Z_BEST_COMPRESSION;  // both hight bits are set  11

                } else{
                    args->item->record->compression_level = Z_BEST_SPEED;  // 01
                }
            } else{
                if(*(*args->map   1) amp; (1 << 7)){
                    args->item->record->compression_level = Z_DEFAULT_COMPRESSION;  // 10

                } else{
                    args->item->record->compression_level = Z_NO_COMPRESSION;  // 00
                }
            }

            uint8_t *uncompressed_data = malloc(uncompressed_size * sizeof *uncompressed_data);
            uint8_t *start = uncompressed_data;
            uncompress(uncompressed_data, amp;uncompressed_size, *args->map, data_size - 4);
            parse_record_data((uint8_t **)amp;uncompressed_data, uncompressed_size, amp;args->item->record->children, amp;args->item->record->last);
            *args->map  = data_size - 4;
            free(start);
        } else{
            parse_record_data(args->map, data_size, amp;args->item->record->children, amp;args->item->record->last);
        }
    } else{
        args->item->record->is_parsed = FALSE;
        args->item->record->misc_data = malloc(sizeof *args->item->record->misc_data);
        args->item->record->misc_data->data = malloc(data_size * sizeof *args->item->record->misc_data->data);
        args->item->record->misc_data->data_size = data_size;
        memcpy(args->item->record->misc_data->data, *args->map, data_size);
        *args->map  = data_size;
    }

    return NULL;
}

void *read_group(void *_args){
    struct _thread_args *args = (struct _thread_args *)_args;
    uint32_t data_size = 0;

    args->item->group->type = "GRUP";
    *args->map  = 4;
    memcpy(amp;data_size, *args->map, 4);
    *args->map  = 4;
    memset(args->item->group->label, 0, 5);
    memcpy(args->item->group->label, *args->map, 4);
    *args->map  = 4;
    memcpy(amp;args->item->group->group_type, *args->map, 4);
    *args->map  = 4;
    memcpy(amp;args->item->group->stamp, *args->map, 2);
    *args->map  = 2;
    memcpy(amp;args->item->group->unknown1, *args->map, 2);
    *args->map  = 2;
    memcpy(amp;args->item->group->version, *args->map, 2);
    *args->map  = 2;
    memcpy(amp;args->item->group->unknown2, *args->map, 2);
    *args->map  = 2;

    args->item->group->children = NULL;
    args->item->group->last = NULL;
    args->item->group->_proxy = NULL;

    uint8_t *data_start = *args->map;
    int item_count = 0;
    while(*args->map < (data_start   data_size - 24)){
        item_count  ;
        char next_type[5] = {0};
        memcpy(next_type, *args->map, 4);
        uint32_t next_data_size = 0;
        memcpy(amp;next_data_size, *args->map   4, 4);
        *args->map  = next_data_size;
        if(strcmp(next_type, "GRUP") != 0){
            *args->map  = 24;
        }
    }
    *args->map = data_start;

    pthread_attr_t  attrs;
    pthread_attr_init(amp;attrs);
    pthread_attr_setstacksize(amp;attrs, 1000);

    struct _group_item *current = NULL;
    bool current_is_record = FALSE;
    pthread_t *pth_array = malloc(item_count * sizeof *pth_array);
    uint8_t **map_array = malloc(item_count * sizeof *map_array);
    struct _thread_args *args_array = malloc(item_count * sizeof *args_array);
    int k;
    for(k = 0; k < item_count; k  ){
        char next_type[5] = {0};
        memcpy(next_type, *args->map, 4);
        uint32_t new_data_size = 0;
        memcpy(amp;new_data_size, *args->map   4, 4);
        *(map_array   k) = *args->map;
        *args->map  = new_data_size;

        (*(args_array   k)).map = map_array   k;
        (*(args_array   k)).parse_records = args->parse_records;
        (*(args_array   k)).item = malloc(sizeof *(*(args_array   k)).item);
        if(strcmp(next_type, "GRUP") == 0){
            (*(args_array   k)).is_record = FALSE;
            (*(args_array   k)).item->group = malloc(sizeof *(*(args_array   k)).item->group);
            pthread_create(pth_array   k, amp;attrs, read_group, args_array   k);
            //read_group(args_array   k);
        } else{
            *args->map  = 24;
            (*(args_array   k)).is_record = TRUE;
            (*(args_array   k)).item->record = malloc(sizeof *(*(args_array   k)).item->record);
            pthread_create(pth_array   k, amp;attrs, read_record, args_array   k);
            //read_record(args_array   k);
        }

        if(args->item->group->children == NULL){
            args->item->group->children = current = (*(args_array   k)).item;
            args->item->group->children_is_record = current_is_record = (*(args_array   k)).is_record;
            if((*(args_array   k)).is_record){
                current->record->previous = NULL;
            } else{
                current->group->previous = NULL;
            }
            continue;
        }

        if((*(args_array   k)).is_record){
            if(current_is_record){
                current->record->next = (*(args_array   k)).item;
                current->record->next_is_record = (*(args_array   k)).is_record;
                (*(args_array   k)).item->record->previous = current;
                (*(args_array   k)).item->record->previous_is_record = current_is_record;
            } else{
                current->group->next = (*(args_array   k)).item;
                current->group->next_is_record = (*(args_array   k)).is_record;
                (*(args_array   k)).item->record->previous = current;
                (*(args_array   k)).item->record->previous_is_record = current_is_record;
            }
        } else{
            if(current_is_record){
                current->record->next = (*(args_array   k)).item;
                current->record->next_is_record = (*(args_array   k)).is_record;
                (*(args_array   k)).item->group->previous = current;
                (*(args_array   k)).item->group->previous_is_record = current_is_record;
            } else{
                current->group->next = (*(args_array   k)).item;
                current->group->next_is_record = (*(args_array   k)).is_record;
                (*(args_array   k)).item->group->previous = current;
                (*(args_array   k)).item->group->previous_is_record = current_is_record;
            }
        }

        current = (*(args_array   k)).item;
        current_is_record = (*(args_array   k)).is_record;
    }

    args->item->group->last = current;
    args->item->group->last_is_record = current_is_record;
    if(current != NULL){
        if(current_is_record){
            current->record->next = NULL;
        } else{
            current->group->next = NULL;
        }
    }

    int i;
    for(i = 0; i < item_count; i  ){
       pthread_join(*(pth_array   i), NULL);
    }

    free(pth_array);
    free(map_array);
    free(args_array);

    assert(*args->map == (data_start   data_size - 24));
    return NULL;
}

Plugin *plugin_read(const char *filename, const bool parse_records){
    Plugin *plugin = malloc(sizeof *plugin);
    plugin->children = NULL;
    plugin->last = NULL;
    plugin->_proxy = NULL;

    FILE *fileobject = fopen(filename, "rb");
    if(fileobject == NULL){
        printf("No file found.n");
        return NULL;
    }
    fseek(fileobject, 0, SEEK_END);
    long int filesize = ftell(fileobject);
    rewind(fileobject);
    uint8_t *map = malloc(filesize * sizeof *map);
    fread(map, filesize, 1, fileobject);
    fclose(fileobject);

    uint8_t *start = map;

    pthread_attr_t  attrs;
    pthread_attr_init(amp;attrs);
    pthread_attr_setstacksize(amp;attrs, 1000);

    pthread_t header_pth;
    plugin->header = malloc(sizeof *plugin->header);
    struct _group_item header_item;
    header_item.record = plugin->header;
    uint8_t *header_map = map;
    struct _thread_args header_args;
    header_args.item = amp;header_item;
    header_args.map = amp;header_map;
    header_args.parse_records = parse_records;
    pthread_create(amp;header_pth, amp;attrs, read_record, amp;header_args);
    uint32_t header_data_size = 0;
    memcpy(amp;header_data_size, map   4, 4);
    map  = header_data_size   24;

    uint8_t *after_header = map;
    int group_count = 0;
    while(map < (start   filesize)){
        group_count  ;
        uint32_t next_data_size = 0;
        memcpy(amp;next_data_size, map   4, 4);
        map  = next_data_size;
    }
    map = after_header;

    pthread_t *pth = malloc(group_count * sizeof *pth);
    uint8_t **map_array = malloc(group_count * sizeof *map_array);
    struct _thread_args *args = malloc(group_count * sizeof *args);
    struct _group_item *current = NULL;
    int k;
    for(k = 0; k < group_count; k  ){
        uint32_t new_data_size = 0;
        memcpy(amp;new_data_size, map   4, 4);
        *(map_array   k) = map;
        map  = new_data_size;

        (*(args   k)).item = malloc(sizeof *(*(args   k)).item);
        (*(args   k)).item->group = malloc(sizeof *(*(args   k)).item->group);
        (*(args   k)).map = map_array   k;
        (*(args   k)).parse_records = parse_records;
        pthread_create(pth   k, amp;attrs, read_group, args   k);

        if(plugin->children == NULL){
            plugin->children = (*(args   k)).item;
            current = (*(args   k)).item;
            current->group->previous = NULL;
            continue;
        }

        current->group->next = (*(args   k)).item;
        current->group->next_is_record = FALSE;
        (*(args   k)).item->group->previous = current;
        (*(args   k)).item->group->previous_is_record = FALSE;
        current = (*(args   k)).item;
    }
    plugin->last = current;
    plugin->last->group->next = NULL;

    pthread_join(header_pth, NULL);
    int i;
    for(i = 0; i < group_count; i  ){
        pthread_join(*(pth   i), NULL);
    }

    assert(map == (start   filesize));

    free(start);
    free(pth);
    free(args);
    free(map_array);

    return plugin;
}
  

Спасибо!

Комментарии:

1. Опубликуйте код здесь. И, пожалуйста, создайте минимальный пример, который воспроизводит проблему.

2. Готово! Спасибо за совет 🙂

3. Помимо того, что не проверяется возвращаемые значения, минимальный пример выглядит корректно. Возможно, вы создали слишком много потоков, не проверили возвращаемое значение при сбое функции pthread и вызвали неопределенное поведение.

4. Позже я попробовал использовать количество потоков в минимальном примере на уровне двух в основном цикле. Мне удалось запустить более ста потоков в этой системе, почему она блокируется на 10? Просто добавил небольшой оператор if для проверки возвращаемых значений, и все они равны 0.

5. Ваш минимальный пример выглядит правильно для меня. С каким условием гонки вы сталкиваетесь в своем минимальном примере?

Ответ №1:

Попробуйте —sim-hints=no-nptl-pthread-stackcache.

См. http://www.valgrind.org/docs/manual/manual-core.html#manual-core.rareopts для получения дополнительной информации.

Комментарии:

1. 1: Эта ложноположительная гипотеза кажется правдоподобной, потому что в тестовом примере минимального воспроизведения нет ничего плохого.

2. Спасибо! С учетом этого мне удалось выяснить, что действительно не так с моими методами: D

3. Интересно, может ли кто-нибудь подключиться и сказать, видели ли они / решили эту проблему на ARM, и использует ли valgrind правильную smc-проверку? В отличие от всех других примеров в stackoverflow, мой фрейм самого высокого уровня — это просто вызов allocate_stack (allocatestack.c: 561). Итак, я предполагаю ложное срабатывание.