#ruby #multithreading #concurrency #threadpool
#ruby #многопоточность #параллелизм #пул потоков
Вопрос:
Я новичок в многопоточности, и мне нужна некоторая помощь в понимании идиоматического способа выполнения чего-либо по завершении потока, например, обновления индикатора выполнения. В следующем примере у меня есть несколько списков элементов и подпрограмм для выполнения некоторого «синтаксического анализа» каждого элемента. Я планирую иметь индикатор выполнения для каждого списка, поэтому я хотел бы иметь возможность обновлять процедуру синтаксического анализа каждого списка процент завершенных элементов. Единственная «триггерная» точка, которую я вижу, находится в операторе puts в конце метода sleepy элемента (метод является потоковым). Какова общепринятая стратегия для фиксации завершения, особенно когда область действия находится за пределами метода, запущенного в потоке?
Спасибо!
# frozen_string_literal: true
require 'concurrent'
$stdout.sync = true
class TheList
attr_reader :items
def initialize(list_id, n_items)
@id = list_id
@items = []
n_items.times { |n| @items << Item.new(@id, n) }
end
def parse_list(pool)
@items.each do |item|
pool.post { item.sleepy(rand(3..8)) }
end
end
end
class Item
attr_reader :id
def initialize (list_id, item_id)
@id = item_id
@list_id = list_id
end
def sleepy(seconds)
sleep(seconds)
# This puts statement signifies the end of the method threaded
puts "List ID: #{@list_id} item ID:#{@id} slept for #{seconds} seconds"
end
end
lists = []
5.times do |i|
lists << TheList.new(i, rand(5..10))
end
pool = Concurrent::FixedThreadPool.new(Concurrent.processor_count)
lists.each do |list|
list.parse_list(pool)
end
pool.shutdown
pool.wait_for_termination
Ответ №1:
На самом деле проблема заключается не в том, чтобы «знать, когда поток завершен», а в том, как вы можете обновить общий индикатор выполнения без условий гонки.
Чтобы объяснить проблему: допустим, у вас была центральная ThreadList#progress_var
переменная, и в качестве последней строки каждого потока вы увеличили ее на =
. Это привело бы к возникновению условия гонки, поскольку два потока могут выполнять операцию одновременно (и могут перезаписывать результаты друг друга).
Чтобы обойти это, типичным подходом является использование мьютекса, который является важной концепцией для понимания того, изучаете ли вы многопоточность.
Фактическая реализация не так уж и сложна:
require 'mutex'
class ThreadList
def initialize
@semaphore = Mutex.new
@progress_bar = 0
end
def increment_progress_bar(amount)
@semaphore.synchronize do
@progress_bar = amount
end
end
end
Из-за этого @semaphore.synchronize
блока теперь вы можете безопасно вызывать этот increment_progress_bar
метод из потоков, без риска возникновения состояния гонки.
Комментарии:
1. Спасибо @max-pleaner! Это дало мне довольно хороший скачок в понимании. Я добавил счетчик и
@semaphore = Mutex.new
в свой инициализатор класса theList, и вместо того, чтобы просто добавлять sleepy в пул, я также увеличил счетчик внутри synchronize. Очень просто. Сейчас я просто пытаюсь понять, как контролировать пул потоков извне, но это уже совсем другой вопрос.