#multithreading #parallel-processing #thread-safety #julia
#многопоточность #параллельная обработка #безопасность потоков #джулия
Вопрос:
Я написал алгоритм оптимизации роя частиц в Julia и попытался использовать потоки для ускорения вычислений для каждой совокупности. Вот код
function uniform(dim::Int, lb::Array{Float64, 1}, ub::Array{Float64, 1})
arr = rand(Float64, dim)
@inbounds for i in 1:dim; arr[i] = arr[i] * (ub[i] - lb[i]) lb[i] end
return arr
end
function initialize_particles(problem, population)
dim = problem.dim
lb = problem.lb
ub = problem.ub
cost_func = problem.cost_func
gbest_position = uniform(dim, lb, ub)
gbest = Gbest(gbest_position, cost_func(gbest_position))
particles = []
for i in 1:population
position = uniform(dim, lb, ub)
velocity = zeros(dim)
cost = cost_func(position)
best_position = copy(position)
best_cost = copy(cost)
push!(particles, Particle(position, velocity, cost, best_position, best_cost))
if best_cost < gbest.cost
gbest.position = copy(best_position)
gbest.cost = copy(best_cost)
end
end
return gbest, particles
end
function PSO(problem; max_iter=100,population=100,c1=1.4962,c2=1.4962,w=0.7298,wdamp=1.0)
dim = problem.dim
lb = problem.lb
ub = problem.ub
cost_func = problem.cost_func
gbest, particles = initialize_particles(problem, population)
# main loop
for iter in 1:max_iter
@threads for i in 1:population
particles[i].velocity .= w .* particles[i].velocity .
c1 .* rand(dim) .* (particles[i].best_position .- particles[i].position) .
c2 .* rand(dim) .* (gbest.position .- particles[i].position)
particles[i].position .= particles[i].position . particles[i].velocity
particles[i].position .= max.(particles[i].position, lb)
particles[i].position .= min.(particles[i].position, ub)
particles[i].cost = cost_func(particles[i].position)
if particles[i].cost < particles[i].best_cost
particles[i].best_position = copy(particles[i].position)
particles[i].best_cost = copy(particles[i].cost)
if particles[i].best_cost < gbest.cost
gbest.position = copy(particles[i].best_position)
gbest.cost = copy(particles[i].best_cost)
end
end
end
w = w * wdamp
if iter % 50 == 1
println("Iteration " * string(iter) * ": Best Cost = " * string(gbest.cost))
println("Best Position = " * string(gbest.position))
println()
end
end
gbest, particles
end
Очевидно, что это не является потокобезопасным в основном цикле PSO. Это связано с тем, что при вычислении необходимо изменить позицию gbest.
Я пытался использовать Atomic, но позиция каждой частицы представляет собой массив. Так что этот метод не подходит.
Блокировка нуждается в инициализации, поэтому она также не будет работать.
Я также пытался использовать Floop, но это пошло не так.
Итак, есть ли способ убедиться, что данные заблокированы при выполнении параллельных вычислений?
Спасибо за ваше время!
Ван
Ответ №1:
Нет необходимости блокировать переменные. Когда вы выполняете параллельные вычисления, вы всегда можете мыслить в терминах стратегии разделения-объединения (извините, просто придумайте этот термин по аналогии с вычислением фреймов данных, вы также можете назвать это подходом с уменьшением карты). Идея заключается в том, что вы разделяете вычисления по разным потокам и выполняете их независимо, а на заключительном этапе вы объединяете результаты в одном потоке. Итак, ваш код может выглядеть следующим образом (могут быть синтаксические ошибки, поскольку я не могу запустить код без Particle
и других определений, но я надеюсь, что этого достаточно, чтобы дать идею)
function minby(itr; by=identity, init=nothing)
init = isnothing(init) ? pop!(itr) : init
mapreduce(x->(by(x)=>x), (x,y)->(first(x)>first(y)) ? y : x, itr; init=by(init)=>init).second
end
function PSO(problem; max_iter=100,population=100,c1=1.4962,c2=1.4962,w=0.7298,wdamp=1.0)
dim = problem.dim
lb = problem.lb
ub = problem.ub
cost_func = problem.cost_func
gbest, particles = initialize_particles(problem, population)
# main loop
for iter in 1:max_iter
gbests = fill((gbest.cost, 0), Threads.nthreads())
@threads for i in 1:population
particles[i].velocity .= w .* particles[i].velocity .
c1 .* rand(dim) .* (particles[i].best_position .- particles[i].position) .
c2 .* rand(dim) .* (gbest.position .- particles[i].position)
particles[i].position .= particles[i].position . particles[i].velocity
particles[i].position .= max.(particles[i].position, lb)
particles[i].position .= min.(particles[i].position, ub)
particles[i].cost = cost_func(particles[i].position)
if particles[i].cost < particles[i].best_cost
particles[i].best_position = copy(particles[i].position)
particles[i].best_cost = copy(particles[i].cost)
if particles[i].best_cost < gbests[Threads.threadid()][1]
gbests[Threads.threadid()] = (particles[i].best_cost, i)
end
end
end
gbest1 = minby(gbests, by = x -> x[1])
if gbest1[2] != 0
idx = gbest1[2]
gbest.position = copy(particles[idx].best_position)
gbest.cost = copy(particles[idx].best_cost)
end
w = w * wdamp
if iter % 50 == 1
println("Iteration " * string(iter) * ": Best Cost = " * string(gbest.cost))
println("Best Position = " * string(gbest.position))
println()
end
end
gbest, particles
end
Кстати, вы можете обнаружить, что пакет UnPack.jl довольно удобен. Вместо назначений вручную вы можете просто
using UnPack
function PSO(problem; max_iter=100,population=100,c1=1.4962,c2=1.4962,w=0.7298,wdamp=1.0)
@unpack dim, lb, ub, cost_func = problem
....
Комментарии:
1. Спасибо, Андрей Оськин! Думаю, я понял вашу идею. Просто сохраните оцененные результаты из разных потоков в разных местах, затем выберите наименьшую пригодность в качестве лучшей в мире. Хотя процесс вычисления немного отличается от предыдущего кода. Предыдущий код изменяет gbest для каждой совокупности, в то время как новый код изменяет gbest для каждой итерации.
2. О, кстати, Unpack package — это хорошо, но как вы находите пакеты такого типа? Я знал, что куча пакетов довольно полезна, но их нет в руководстве и они просто скрываются в сети. Откуда вы знаете, что они могут быть полезны для конкретных требований?
3. К сожалению (или, может быть, к счастью) экосистема Julia развивается очень быстро, поэтому нет установленного набора пакетов, «которые все должны знать». Лучший способ оставаться на связи — присоединиться к сообществу и не стесняться задавать вопросы: discourse, zulip и slack. Вы можете найти ссылки здесь julialang.org/community
4. Да, эта версия обновляется
gbest
один раз для каждого населения, но разве это не то, что вам действительно нужно? Еслиi 1
вычисление зависит отi
чего, оно не может быть выполнено параллельно. Извините, я мало что знаю о PSO.5. Спасибо за ваше время, Андрей Оськин! Да, кстати. Я запускал многопоточную версию программы выше десять раз, чтобы убедиться, что смогу получить минимум. И я заметил, что использование памяти на моем ПК будет сильно колебаться при первых трех или четырех запусках, затем оно будет оставаться стабильным до конца запуска. Скорость вычисления также выше для остальной части выполнения. Похоже, что Джулия нашла шаблон работы, а затем оптимизировала использование памяти или процесс сборки мусора. Итак, я хочу знать, может ли Джулия оптимизировать этот процесс вычисления автоматически?