Объединение потоков в pyspark, по-видимому, суммирует содержимое фрейма данных

#apache-spark #pyspark #threadpool #threadpoolexecutor

Вопрос:

Я использую объединение потоков, чтобы попытаться распараллелить несколько циклов for, которые у меня есть в моем коде pyspark. Однако, похоже, что при использовании reduce функции в конце она просто суммирует содержимое результирующего списка кадров данных. Вот код, который я использую:

 from multiprocessing.pool import ThreadPool
from multiprocessing import cpu_count
from pyspark.sql import functions as f

days=[180,5]
    def run_threads(iterr):
        for a in range(len(datepaths)-iterr, len(datepaths)-(iterr-1)):
            path="s3://"   datepaths[a]   "/"
            date_inpath.append(path)
        uva=spark.read.option("basePath",inpath).parquet(*date_inpath)
        .select(*uvakeepcols)
        .filter((col('RESULT').isNotNull()))
        .dropDuplicates()
        uva1=uva.groupBy('SEQ_ID','TOOL_ID')
        .agg(f.count('RESULT').alias('count'))
        return uva1

pool = ThreadPool(cpu_count()-1)
df_list = pool.map(run_threads, days)
pool.close()
pool.join()

df=reduce(DataFrame.unionByName, df_list)
 

Приведенный выше код извлекает результаты определенной SEQ_ID TOOL_ID комбинации в течение 2 дней , а затем подсчитывает, сколько раз RESULT столбец содержит значения.
Когда я использую объединение потоков и сравниваю подсчеты за 2 дня, я получаю следующий результат:

  ------- ------- ----- 
| SEQ_ID|TOOL_ID|count|
 ------- ------- ----- 
|2945783|  15032|  574|
|2945783|  15032|  574|
 ------- ------- ----- 
 

Однако, когда я извлекаю данные вручную из каждого из 2 дней, я получаю следующий результат:

С 180 дней назад:

  ------- ------- ------------- 
| SEQ_ID|TOOL_ID|count(RESULT)|
 ------- ------- ------------- 
|2945783|  15032|          285|
 ------- ------- ------------- 
 

С 5 дней назад:

  ------- ------- ------------- 
| SEQ_ID|TOOL_ID|count(RESULT)|
 ------- ------- ------------- 
|2945783|  15032|          289|
 ------- ------- ------------- 
 

Каким-то образом метод объединения добавляет два счетчика 289 285=574

Что именно здесь происходит и есть ли способ это исправить? Любая информация была бы весьма признательна.

Спасибо

Комментарии:

1. Нет никакого смысла в искре потока. Spark уже является параллельной структурой, и ее параллельный вызов может вызвать проблемы.

2. Понял. Но является ли это ожидаемым поведением объединения потоков независимо от того, используется ли Spark или нет?