#multithreading #julia
#многопоточность #джулия
Вопрос:
У меня есть функция f()
, которая возвращает фрейм данных, количество строк в котором я заранее не знаю. Я вызываю f()
в многопоточном контексте. Я сохраняю результаты следующим образом:
results = [DataFrame() for _ in 1:100]
Threads.@threads for hi in 1:100
results[hi] = f(df)
end
Когда я запускаю этот код, использование памяти резко возрастает, предположительно из-за results
необходимости постоянно изменять размер самого себя, когда он получает размер фрейма данных [РЕДАКТИРОВАТЬ: это неправда]. Каков наилучший способ предварительного выделения массива результатов, чтобы не занимать память?
**** ОБНОВЛЕНИЕ с помощью MWE ****
function func(df::DataFrame)
X = df[:time]
indices = findall(X .> 0)
end
# read in R data
rds = "blablab.rds"
objs = load(rds);
params = collect(0.5:0.005:0.7);
for i in 1:length(objs)
cols = [string(name) for name in names(objs.data[i]) if occursin("blabla",string(name))]
hypers = [(a,b) for a in cols, b in params]
results = [DataFrame() for _ in 1:length(hypers)]
# HERE IS WHERE THE MEMORY BLOWS UP
Threads.@threads for hi in 1:length(hypers)
name, val = hypers[hi]
results[hi] = func(objs.data[i])
end
end
df
составляет 0,7 ГБ. Когда я запускаю этот фрагмент кода, мое использование памяти увеличивается до ~ 30 ГБ!!! Кажется, что простой доступ к столбцу df
inside func()
копирует все это целиком?
Комментарии:
1.
results
массиву не нужно постоянно изменять размер самого себя. На самом деле, я не думаю, что это когда -либо изменяет размер во время цикла. Вы помещаете этот цикл внутри функции? Не могли бы вы поделиться MWE дляf()
, чтобы мы могли более четко увидеть проблему, вызывающую высокое использование памяти? Кстати, всеresults
удержания — это указатель на вашиDataFrame
файлы.2. F () слишком длинный для публикации, но я сузил проблему до того, где сохраняются результаты f (). Если я просто вызываю f (), но не сохраняю результаты в списке, проблем не возникает
3. Я не думаю, что проблема связана с фрагментом кода в вашем вопросе. Если результаты
f()
не сохраняются, они могут быть безопасно восстановлены сборщиком мусора, следовательно, использование вашей памяти не увеличивается. Таким образом, это мало что говорит о том, в чем проблема. Возможно, у вас действительно нет никакой проблемы. Возможно, вашиDataFrame
файлы действительно большие. Вы можете проверить размер одного из вашихDataFrame
ов с помощьюBase.summarysize(df)
. Создание нового минимальногоf()
может помочь быстро определить проблему.4. Вы правы. Я опубликовал MWE.
5.
func
в обновленном MWE возвращаетArray
набор индексов, каждый из которых являетсяInt64
при условии, что ваша архитектура 64-разрядная. КаждыйInt64
занимает 8 байт памяти. Вы сохраняете эти массивы после каждой итерации. Следовательно, объем памяти должен увеличиваться. Величина этого прироста будет зависеть от количества строк в каждой,DataFrame
гдеtime
больше 0. Доступ кDataFrame
столбцу не выделяет память. ДоступDataFrame
здесь не проблема. Вы должны изменить свой код, чтобы уменьшить использование памяти. Может быть, удалитьfindall
, просто сохранить результаты какBitArray
, т. е. просто использовать (X.> 0)…
Ответ №1:
Пожалуйста, найдите ниже две версии одного и того же кода — однопоточную и многопоточную, генерирующую a DataFrame
из набора DataFrame
s, возвращаемых f()
функцией, и имеющей произвольную длину.
using Random
using DataFrames
using BenchmarkTools
function f(rngs::Vector{Random.MersenneTwister}, offset)::DataFrame
t = Threads.threadid()
n = rand(rngs[t offset], 1:20)
DataFrame(a=1:n,b=21:(20 n),t=t offset)
end
function test_threads(rngs::Vector{Random.MersenneTwister})
res = DataFrame([Int,Int,Int],[:a,:b,:t],0)
lock = Threads.SpinLock()
Threads.@threads for i in 1:100
df = f(rngs,0)
Threads.lock(lock)
append!(res,df)
Threads.unlock(lock)
end
res
end
function test_normal(rngs::Vector{Random.MersenneTwister})
res = DataFrame([Int,Int,Int],[:a,:b,:t],0)
for i in 1:100
append!(res,f(rngs, i%2))
end
res
end
Теперь давайте проведем тестирование:
julia> rngs = [Random.MersenneTwister(i) for i in 1:2];
julia> @btime test_normal($rngs);
891.306 μs (5983 allocations: 476.67 KiB)
rngs = [Random.MersenneTwister(i) for i in 1:Threads.nthreads()];
@btime test_threads($rngs);
674.559 μs (5549 allocations: 425.69 KiB)
Комментарии:
1. Спасибо. Это работает. Теперь я вижу, что проблема не в том, как я сохраняю результаты. Проблема с памятью возникает где-то внутри функции, вызываемой каждым потоком. Создает ли это копию аргумента dataframe каждый раз?
2. в моем коде
f()
каждый раз создается новый фрейм данных. Однако,append!
в Julia, в отличие от Gnu R, просто добавляет элементы вDataFrame
, а не воссоздает весь новыйDataFrame