#apache-spark #pyspark #threadpool #threadpoolexecutor
Вопрос:
Я использую объединение потоков, чтобы попытаться распараллелить несколько циклов for, которые у меня есть в моем коде pyspark. Однако, похоже, что при использовании reduce
функции в конце она просто суммирует содержимое результирующего списка кадров данных. Вот код, который я использую:
from multiprocessing.pool import ThreadPool
from multiprocessing import cpu_count
from pyspark.sql import functions as f
days=[180,5]
def run_threads(iterr):
for a in range(len(datepaths)-iterr, len(datepaths)-(iterr-1)):
path="s3://" datepaths[a] "/"
date_inpath.append(path)
uva=spark.read.option("basePath",inpath).parquet(*date_inpath)
.select(*uvakeepcols)
.filter((col('RESULT').isNotNull()))
.dropDuplicates()
uva1=uva.groupBy('SEQ_ID','TOOL_ID')
.agg(f.count('RESULT').alias('count'))
return uva1
pool = ThreadPool(cpu_count()-1)
df_list = pool.map(run_threads, days)
pool.close()
pool.join()
df=reduce(DataFrame.unionByName, df_list)
Приведенный выше код извлекает результаты определенной SEQ_ID
TOOL_ID
комбинации в течение 2 дней , а затем подсчитывает, сколько раз RESULT
столбец содержит значения.
Когда я использую объединение потоков и сравниваю подсчеты за 2 дня, я получаю следующий результат:
------- ------- -----
| SEQ_ID|TOOL_ID|count|
------- ------- -----
|2945783| 15032| 574|
|2945783| 15032| 574|
------- ------- -----
Однако, когда я извлекаю данные вручную из каждого из 2 дней, я получаю следующий результат:
С 180 дней назад:
------- ------- -------------
| SEQ_ID|TOOL_ID|count(RESULT)|
------- ------- -------------
|2945783| 15032| 285|
------- ------- -------------
С 5 дней назад:
------- ------- -------------
| SEQ_ID|TOOL_ID|count(RESULT)|
------- ------- -------------
|2945783| 15032| 289|
------- ------- -------------
Каким-то образом метод объединения добавляет два счетчика 289 285=574
Что именно здесь происходит и есть ли способ это исправить? Любая информация была бы весьма признательна.
Спасибо
Комментарии:
1. Нет никакого смысла в искре потока. Spark уже является параллельной структурой, и ее параллельный вызов может вызвать проблемы.
2. Понял. Но является ли это ожидаемым поведением объединения потоков независимо от того, используется ли Spark или нет?