Параллельная оптимизация роя частиц в Julia

#multithreading #parallel-processing #thread-safety #julia

#многопоточность #параллельная обработка #безопасность потоков #джулия

Вопрос:

Я написал алгоритм оптимизации роя частиц в Julia и попытался использовать потоки для ускорения вычислений для каждой совокупности. Вот код

 function uniform(dim::Int, lb::Array{Float64, 1}, ub::Array{Float64, 1})
    arr = rand(Float64, dim)
    @inbounds for i in 1:dim; arr[i] = arr[i] * (ub[i] - lb[i])   lb[i] end
    return arr
end

function initialize_particles(problem, population)
    dim = problem.dim
    lb = problem.lb
    ub = problem.ub
    cost_func = problem.cost_func

    gbest_position = uniform(dim, lb, ub)
    gbest = Gbest(gbest_position, cost_func(gbest_position))

    particles = []
    for i in 1:population
        position = uniform(dim, lb, ub)
        velocity = zeros(dim)
        cost = cost_func(position)
        best_position = copy(position)
        best_cost = copy(cost)
        push!(particles, Particle(position, velocity, cost, best_position, best_cost))

        if best_cost < gbest.cost
            gbest.position = copy(best_position)
            gbest.cost = copy(best_cost)
        end
    end
    return gbest, particles
end

function PSO(problem; max_iter=100,population=100,c1=1.4962,c2=1.4962,w=0.7298,wdamp=1.0)
    dim = problem.dim
    lb = problem.lb
    ub = problem.ub
    cost_func = problem.cost_func

    gbest, particles = initialize_particles(problem, population)

    # main loop
    for iter in 1:max_iter
        @threads for i in 1:population
            particles[i].velocity .= w .* particles[i].velocity . 
                c1 .* rand(dim) .* (particles[i].best_position .- particles[i].position) . 
                c2 .* rand(dim) .* (gbest.position .- particles[i].position)

            particles[i].position .= particles[i].position .  particles[i].velocity
            particles[i].position .= max.(particles[i].position, lb)
            particles[i].position .= min.(particles[i].position, ub)

            particles[i].cost = cost_func(particles[i].position)

            if particles[i].cost < particles[i].best_cost
                particles[i].best_position = copy(particles[i].position)
                particles[i].best_cost = copy(particles[i].cost)
        
                if particles[i].best_cost < gbest.cost
                    gbest.position = copy(particles[i].best_position)
                    gbest.cost = copy(particles[i].best_cost)
                end
            end
        end
        w = w * wdamp
        if iter % 50 == 1
            println("Iteration " * string(iter) * ": Best Cost = " * string(gbest.cost))
            println("Best Position = " * string(gbest.position))
            println()
        end
    end
    gbest, particles
end
  

Очевидно, что это не является потокобезопасным в основном цикле PSO. Это связано с тем, что при вычислении необходимо изменить позицию gbest.
Я пытался использовать Atomic, но позиция каждой частицы представляет собой массив. Так что этот метод не подходит.
Блокировка нуждается в инициализации, поэтому она также не будет работать.
Я также пытался использовать Floop, но это пошло не так.

Итак, есть ли способ убедиться, что данные заблокированы при выполнении параллельных вычислений?

Спасибо за ваше время!

Ван

Ответ №1:

Нет необходимости блокировать переменные. Когда вы выполняете параллельные вычисления, вы всегда можете мыслить в терминах стратегии разделения-объединения (извините, просто придумайте этот термин по аналогии с вычислением фреймов данных, вы также можете назвать это подходом с уменьшением карты). Идея заключается в том, что вы разделяете вычисления по разным потокам и выполняете их независимо, а на заключительном этапе вы объединяете результаты в одном потоке. Итак, ваш код может выглядеть следующим образом (могут быть синтаксические ошибки, поскольку я не могу запустить код без Particle и других определений, но я надеюсь, что этого достаточно, чтобы дать идею)

 function minby(itr; by=identity, init=nothing)
     init = isnothing(init) ? pop!(itr) : init
     mapreduce(x->(by(x)=>x), (x,y)->(first(x)>first(y)) ? y : x, itr; init=by(init)=>init).second
end

function PSO(problem; max_iter=100,population=100,c1=1.4962,c2=1.4962,w=0.7298,wdamp=1.0)
    dim = problem.dim
    lb = problem.lb
    ub = problem.ub
    cost_func = problem.cost_func

    gbest, particles = initialize_particles(problem, population)

    # main loop
    for iter in 1:max_iter
        gbests = fill((gbest.cost, 0), Threads.nthreads())
        @threads for i in 1:population
            particles[i].velocity .= w .* particles[i].velocity . 
                c1 .* rand(dim) .* (particles[i].best_position .- particles[i].position) . 
                c2 .* rand(dim) .* (gbest.position .- particles[i].position)

            particles[i].position .= particles[i].position .  particles[i].velocity
            particles[i].position .= max.(particles[i].position, lb)
            particles[i].position .= min.(particles[i].position, ub)

            particles[i].cost = cost_func(particles[i].position)

            if particles[i].cost < particles[i].best_cost
                particles[i].best_position = copy(particles[i].position)
                particles[i].best_cost = copy(particles[i].cost)
        
                if particles[i].best_cost < gbests[Threads.threadid()][1]
                    gbests[Threads.threadid()] = (particles[i].best_cost, i)
                end
            end
        end
        gbest1 = minby(gbests, by = x -> x[1])
        if gbest1[2] != 0
            idx = gbest1[2]
            gbest.position = copy(particles[idx].best_position)
            gbest.cost = copy(particles[idx].best_cost)
        end
        w = w * wdamp
        if iter % 50 == 1
            println("Iteration " * string(iter) * ": Best Cost = " * string(gbest.cost))
            println("Best Position = " * string(gbest.position))
            println()
        end
    end
    gbest, particles
end
  

Кстати, вы можете обнаружить, что пакет UnPack.jl довольно удобен. Вместо назначений вручную вы можете просто

 using UnPack
function PSO(problem; max_iter=100,population=100,c1=1.4962,c2=1.4962,w=0.7298,wdamp=1.0)
    @unpack dim, lb, ub, cost_func = problem
    ....

  

Комментарии:

1. Спасибо, Андрей Оськин! Думаю, я понял вашу идею. Просто сохраните оцененные результаты из разных потоков в разных местах, затем выберите наименьшую пригодность в качестве лучшей в мире. Хотя процесс вычисления немного отличается от предыдущего кода. Предыдущий код изменяет gbest для каждой совокупности, в то время как новый код изменяет gbest для каждой итерации.

2. О, кстати, Unpack package — это хорошо, но как вы находите пакеты такого типа? Я знал, что куча пакетов довольно полезна, но их нет в руководстве и они просто скрываются в сети. Откуда вы знаете, что они могут быть полезны для конкретных требований?

3. К сожалению (или, может быть, к счастью) экосистема Julia развивается очень быстро, поэтому нет установленного набора пакетов, «которые все должны знать». Лучший способ оставаться на связи — присоединиться к сообществу и не стесняться задавать вопросы: discourse, zulip и slack. Вы можете найти ссылки здесь julialang.org/community

4. Да, эта версия обновляется gbest один раз для каждого населения, но разве это не то, что вам действительно нужно? Если i 1 вычисление зависит от i чего, оно не может быть выполнено параллельно. Извините, я мало что знаю о PSO.

5. Спасибо за ваше время, Андрей Оськин! Да, кстати. Я запускал многопоточную версию программы выше десять раз, чтобы убедиться, что смогу получить минимум. И я заметил, что использование памяти на моем ПК будет сильно колебаться при первых трех или четырех запусках, затем оно будет оставаться стабильным до конца запуска. Скорость вычисления также выше для остальной части выполнения. Похоже, что Джулия нашла шаблон работы, а затем оптимизировала использование памяти или процесс сборки мусора. Итак, я хочу знать, может ли Джулия оптимизировать этот процесс вычисления автоматически?