Как добавить данные в очередь с помощью многопоточности (занимает только последнюю строку и дублирует ее)?

#python #multithreading #queue

#python #многопоточность #очередь

Вопрос:

У меня есть файл, содержащий более 1 млн транзакций (csv), который мне нужно очистить от лишних пробелов и подтвердить тип ввода (например: int, float, …).

Я каждый раз передаю фрагменты строк и запускаю поток для обработки данных. Когда поток завершается, требуется обработать еще один фрагмент и так далее, пока фрагменты не будут завершены. но проблема в том, что при окончательном просмотре очереди в ней есть только последняя обработанная строка * количество строк (1,01 млн транзакций).

Я попытался объявить очередь глобально, ничего не изменилось. Я попытался распечатать результаты до того, как они попадут в очередь, в правильных результатах отображается тема, но в моей очереди она указана неправильно. Я пытался использовать глобальный список вместо этого, но это было крайне не рекомендуется из-за характера потоков.

вызов многопоточности:

 threads = []
for chunck in reader:
   threads.append(threading.Thread(target=clean , args=([chunck, queue])))
   threads[-1].start()

for t in threads:
   t.join()
  

функция очистки:

 def clean(i, queue):
    details = {}
    for index, column in i.iterrows():
        for key,val in column.items():
            if isinstance(val, str):
                details[" ".join(key.split()).replace(" ","_").replace('.','').lower()] =  " ".join(val.split())
            else:
                details[" ".join(key.split()).replace(" ","_").replace('.','').lower()] = val
        queue.put(details)
        # queue.task_done()


    return queue
  

Я ожидаю, что общее количество строк будет очищено и помещено в очередь, чтобы я мог сгенерировать окончательно очищенный csv. но теперь это дает мне файл с 1,01 млн транзакций с тем же значением последней обработанной строки.

Ответ №1:

Поскольку я получил большую помощь от сообщества python в Reddit.

Проблема заключалась в функции очистки, из-за того, что я объявил dict вне первого цикла, который я должен объявить внутри первого цикла.

Код:

  def clean(i, queue):

    for index, column in i.iterrows():
        details = {}
        for key,val in column.items():
            if isinstance(val, str):
                details[" ".join(key.split()).replace(" ","_").replace('.','').lower()] =  " ".join(val.split())
            else:
                details[" ".join(key.split()).replace(" ","_").replace('.','').lower()] = val
        queue.put(details)
        # queue.task_done()


    return queue
  

Проблема заключалась в том, что dict во всех случаях дублируется всякий раз, когда обработка потоков завершена.