Лучший способ сохранить выходные данные из многопоточных вызовов функций

#multithreading #julia

#многопоточность #джулия

Вопрос:

У меня есть функция f() , которая возвращает фрейм данных, количество строк в котором я заранее не знаю. Я вызываю f() в многопоточном контексте. Я сохраняю результаты следующим образом:

 results = [DataFrame() for _ in 1:100]

Threads.@threads for hi in 1:100
    results[hi] = f(df)
end
  

Когда я запускаю этот код, использование памяти резко возрастает, предположительно из-за results необходимости постоянно изменять размер самого себя, когда он получает размер фрейма данных [РЕДАКТИРОВАТЬ: это неправда]. Каков наилучший способ предварительного выделения массива результатов, чтобы не занимать память?

**** ОБНОВЛЕНИЕ с помощью MWE ****

 function func(df::DataFrame)
    X = df[:time]
    indices = findall(X .> 0)
end

# read in R data
rds = "blablab.rds"
objs = load(rds);

params = collect(0.5:0.005:0.7);

for i in 1:length(objs)
    cols = [string(name) for name in names(objs.data[i]) if occursin("blabla",string(name))]
    hypers = [(a,b) for a in cols, b in params]

    results = [DataFrame() for _ in 1:length(hypers)]

    # HERE IS WHERE THE MEMORY BLOWS UP
    Threads.@threads for hi in 1:length(hypers)
        name, val = hypers[hi]
        results[hi] = func(objs.data[i])
    end
end
  

df составляет 0,7 ГБ. Когда я запускаю этот фрагмент кода, мое использование памяти увеличивается до ~ 30 ГБ!!! Кажется, что простой доступ к столбцу df inside func() копирует все это целиком?

Комментарии:

1. results массиву не нужно постоянно изменять размер самого себя. На самом деле, я не думаю, что это когда -либо изменяет размер во время цикла. Вы помещаете этот цикл внутри функции? Не могли бы вы поделиться MWE для f() , чтобы мы могли более четко увидеть проблему, вызывающую высокое использование памяти? Кстати, все results удержания — это указатель на ваши DataFrame файлы.

2. F () слишком длинный для публикации, но я сузил проблему до того, где сохраняются результаты f (). Если я просто вызываю f (), но не сохраняю результаты в списке, проблем не возникает

3. Я не думаю, что проблема связана с фрагментом кода в вашем вопросе. Если результаты f() не сохраняются, они могут быть безопасно восстановлены сборщиком мусора, следовательно, использование вашей памяти не увеличивается. Таким образом, это мало что говорит о том, в чем проблема. Возможно, у вас действительно нет никакой проблемы. Возможно, ваши DataFrame файлы действительно большие. Вы можете проверить размер одного из ваших DataFrame ов с помощью Base.summarysize(df) . Создание нового минимального f() может помочь быстро определить проблему.

4. Вы правы. Я опубликовал MWE.

5. func в обновленном MWE возвращает Array набор индексов, каждый из которых является Int64 при условии, что ваша архитектура 64-разрядная. Каждый Int64 занимает 8 байт памяти. Вы сохраняете эти массивы после каждой итерации. Следовательно, объем памяти должен увеличиваться. Величина этого прироста будет зависеть от количества строк в каждой, DataFrame где time больше 0. Доступ к DataFrame столбцу не выделяет память. Доступ DataFrame здесь не проблема. Вы должны изменить свой код, чтобы уменьшить использование памяти. Может быть, удалить findall , просто сохранить результаты как BitArray , т. е. просто использовать (X.> 0)…

Ответ №1:

Пожалуйста, найдите ниже две версии одного и того же кода — однопоточную и многопоточную, генерирующую a DataFrame из набора DataFrame s, возвращаемых f() функцией, и имеющей произвольную длину.

 using Random
using DataFrames
using BenchmarkTools

function f(rngs::Vector{Random.MersenneTwister}, offset)::DataFrame
    t = Threads.threadid()
    n = rand(rngs[t offset], 1:20)
    DataFrame(a=1:n,b=21:(20 n),t=t offset)
end

function test_threads(rngs::Vector{Random.MersenneTwister})
    res = DataFrame([Int,Int,Int],[:a,:b,:t],0)
    lock = Threads.SpinLock()
    Threads.@threads for i in 1:100
        df = f(rngs,0)
        Threads.lock(lock)
        append!(res,df)
        Threads.unlock(lock)
    end
    res
end

function test_normal(rngs::Vector{Random.MersenneTwister})    
    res = DataFrame([Int,Int,Int],[:a,:b,:t],0)    
    for i in 1:100
        append!(res,f(rngs, i%2))
    end
    res
end
  

Теперь давайте проведем тестирование:

 julia> rngs = [Random.MersenneTwister(i) for i in 1:2];

julia> @btime test_normal($rngs);

  891.306 μs (5983 allocations: 476.67 KiB)

rngs = [Random.MersenneTwister(i) for i in 1:Threads.nthreads()];

@btime test_threads($rngs);

  674.559 μs (5549 allocations: 425.69 KiB)
  

Комментарии:

1. Спасибо. Это работает. Теперь я вижу, что проблема не в том, как я сохраняю результаты. Проблема с памятью возникает где-то внутри функции, вызываемой каждым потоком. Создает ли это копию аргумента dataframe каждый раз?

2. в моем коде f() каждый раз создается новый фрейм данных. Однако, append! в Julia, в отличие от Gnu R, просто добавляет элементы в DataFrame , а не воссоздает весь новый DataFrame