Как вызвать метод класса для всех потоков в конце цикла событий для параллельных фьючерсов?

#python #multithreading #io #concurrent.futures

Вопрос:

Предположим, у меня есть следующий класс, который я хочу создать, с атрибутами класса, привязанными к каждому потоку (независимо от того, создан ли concurrent.futures он или нет:

 import pandas as pd 
from concurrent.futures import ThreadPoolExecutor

def ThreadSafeIOClass(threading.local):
    self.df = pd.DataFrame(columns=["col1", "col2"]) 
    self.file_name_prefix = f"my_artifact_{threading.get_ident()}" 

def update_df(row):
    self.df = self.df.append(row, ignore_index=True) 

def flush():
    with open(f"{self.file_name_prefix}.csv", "w") as f:
        self.df.to_csv(f, "w")

def close():
    self.flush() 
 

В типичном цикле concurrent.futures у вас было бы:

    with ThreadPoolExecutor(4) as executor:
       cl = ThreadSafeIOClass()
       for row in rows:
           executor.submit(cl.update_df, row) 
   cl.close() 
 

Теперь меня беспокоит то , что cl.close() я не буду удалять все остатки в памяти из всех созданных потоков concurrent.futures , что приведет к отсутствию некоторых данных в конце шага сохранения.

И мне действительно не нужно блокировать этот класс, но все же, чтобы сделать его потокобезопасным, возможно ли это как-то исправить?

(Простите меня за заимствование термина «цикл событий» в названии, но я думаю, что он выполняет то, чего я хочу достичь здесь.)