Изучение Ruby threading — запуск события при завершении потока

#ruby #multithreading #concurrency #threadpool

#ruby #многопоточность #параллелизм #пул потоков

Вопрос:

Я новичок в многопоточности, и мне нужна некоторая помощь в понимании идиоматического способа выполнения чего-либо по завершении потока, например, обновления индикатора выполнения. В следующем примере у меня есть несколько списков элементов и подпрограмм для выполнения некоторого «синтаксического анализа» каждого элемента. Я планирую иметь индикатор выполнения для каждого списка, поэтому я хотел бы иметь возможность обновлять процедуру синтаксического анализа каждого списка процент завершенных элементов. Единственная «триггерная» точка, которую я вижу, находится в операторе puts в конце метода sleepy элемента (метод является потоковым). Какова общепринятая стратегия для фиксации завершения, особенно когда область действия находится за пределами метода, запущенного в потоке?

Спасибо!

 # frozen_string_literal: true

require 'concurrent'

$stdout.sync = true

class TheList
  attr_reader :items

  def initialize(list_id, n_items)
    @id = list_id
    @items = []
    n_items.times { |n| @items << Item.new(@id, n) }
  end

  def parse_list(pool)
    @items.each do |item|
      pool.post { item.sleepy(rand(3..8)) }
    end
  end
end

class Item
  attr_reader :id

  def initialize (list_id, item_id)
    @id = item_id
    @list_id = list_id
  end

  def sleepy(seconds)
    sleep(seconds)
    # This puts statement signifies the end of the method threaded
    puts "List ID: #{@list_id} item ID:#{@id} slept for #{seconds} seconds"
  end
end

lists = []
5.times do |i|
  lists << TheList.new(i, rand(5..10))
end

pool = Concurrent::FixedThreadPool.new(Concurrent.processor_count)

lists.each do |list|
  list.parse_list(pool)
end
pool.shutdown
pool.wait_for_termination
 

Ответ №1:

На самом деле проблема заключается не в том, чтобы «знать, когда поток завершен», а в том, как вы можете обновить общий индикатор выполнения без условий гонки.

Чтобы объяснить проблему: допустим, у вас была центральная ThreadList#progress_var переменная, и в качестве последней строки каждого потока вы увеличили ее на = . Это привело бы к возникновению условия гонки, поскольку два потока могут выполнять операцию одновременно (и могут перезаписывать результаты друг друга).

Чтобы обойти это, типичным подходом является использование мьютекса, который является важной концепцией для понимания того, изучаете ли вы многопоточность.

Фактическая реализация не так уж и сложна:

 require 'mutex'

class ThreadList
  def initialize
    @semaphore = Mutex.new   
    @progress_bar = 0
  end
  def increment_progress_bar(amount)
    @semaphore.synchronize do
      @progress_bar  = amount
    end
  end 
end
 

Из-за этого @semaphore.synchronize блока теперь вы можете безопасно вызывать этот increment_progress_bar метод из потоков, без риска возникновения состояния гонки.

Комментарии:

1. Спасибо @max-pleaner! Это дало мне довольно хороший скачок в понимании. Я добавил счетчик и @semaphore = Mutex.new в свой инициализатор класса theList, и вместо того, чтобы просто добавлять sleepy в пул, я также увеличил счетчик внутри synchronize. Очень просто. Сейчас я просто пытаюсь понять, как контролировать пул потоков извне, но это уже совсем другой вопрос.