#python #multithreading #queue
#python #многопоточность #очередь
Вопрос:
У меня есть файл, содержащий более 1 млн транзакций (csv), который мне нужно очистить от лишних пробелов и подтвердить тип ввода (например: int, float, …).
Я каждый раз передаю фрагменты строк и запускаю поток для обработки данных. Когда поток завершается, требуется обработать еще один фрагмент и так далее, пока фрагменты не будут завершены. но проблема в том, что при окончательном просмотре очереди в ней есть только последняя обработанная строка * количество строк (1,01 млн транзакций).
Я попытался объявить очередь глобально, ничего не изменилось. Я попытался распечатать результаты до того, как они попадут в очередь, в правильных результатах отображается тема, но в моей очереди она указана неправильно. Я пытался использовать глобальный список вместо этого, но это было крайне не рекомендуется из-за характера потоков.
вызов многопоточности:
threads = []
for chunck in reader:
threads.append(threading.Thread(target=clean , args=([chunck, queue])))
threads[-1].start()
for t in threads:
t.join()
функция очистки:
def clean(i, queue):
details = {}
for index, column in i.iterrows():
for key,val in column.items():
if isinstance(val, str):
details[" ".join(key.split()).replace(" ","_").replace('.','').lower()] = " ".join(val.split())
else:
details[" ".join(key.split()).replace(" ","_").replace('.','').lower()] = val
queue.put(details)
# queue.task_done()
return queue
Я ожидаю, что общее количество строк будет очищено и помещено в очередь, чтобы я мог сгенерировать окончательно очищенный csv. но теперь это дает мне файл с 1,01 млн транзакций с тем же значением последней обработанной строки.
Ответ №1:
Поскольку я получил большую помощь от сообщества python в Reddit.
Проблема заключалась в функции очистки, из-за того, что я объявил dict вне первого цикла, который я должен объявить внутри первого цикла.
Код:
def clean(i, queue):
for index, column in i.iterrows():
details = {}
for key,val in column.items():
if isinstance(val, str):
details[" ".join(key.split()).replace(" ","_").replace('.','').lower()] = " ".join(val.split())
else:
details[" ".join(key.split()).replace(" ","_").replace('.','').lower()] = val
queue.put(details)
# queue.task_done()
return queue
Проблема заключалась в том, что dict во всех случаях дублируется всякий раз, когда обработка потоков завершена.