Как выполнять потоки по порядку в Ruby

#ruby #multithreading #mutex

Вопрос:

У меня есть задание, в котором у меня есть 2 числа (a и b) и количество (n). Мне нужно сначала сгенерировать случайные числа, которые будут временем, в течение которого каждый поток будет спать. Затем каждый поток вычисляет сумму, разницу, произведение и деление (соответственно) a и b и повторяет это n раз. Например:

 a = 10
b = 2
n = 2

SUM: 12
DIFFERENCE: 8
PRODUCT: 20
DIVISION: 5

SUM: 12
DIFFERENCE: 8
PRODUCT: 20
DIVISION: 5
 

и между каждой строкой программа спит несколько секунд. Порядок должен быть sum, difference, product и division. Я не могу использовать очереди или уничтожать / создавать потоки повторно. Моей первой мыслью было использовать 4 условные переменные, чтобы диктовать порядок, в котором должны выполняться потоки. Я придумал это:

 mutex = Mutex.new
resources = Array.new(4) { ConditionVariable.new }

t1 = Thread.new do
  mutex.synchronize do
    puts "Thread 1"
    sleep(3)
    resources[0].signal
  end
end

t2 = Thread.new do
  mutex.synchronize do
    resources[0].wait(mutex)
    puts "Thread 2"
    sleep(3)
    resources[1].signal
  end
end

t3 = Thread.new do
  mutex.synchronize do
    resources[1].wait(mutex)
    puts "Thread 3"
    sleep(3)
    resources[2].signal
  end
end

t4 = Thread.new do
  mutex.synchronize do
    resources[2].wait(mutex)
    puts "Thread 4"
    sleep(3)
  end
end

t1.join
t2.join
t3.join
t4.join
 

но я получаю эту ошибку взаимоблокировки: main.rb:39:in 'join': No live threads left. Deadlock? (fatal)

Может ли этот подход работать? Что я должен сделать, чтобы это исправить? Есть ли другие лучшие подходы, которые могли бы работать?

Ответ №1:

Я не эксперт по ruby, но на любом другом языке, который я использовал, название «переменная условия» является неправильным. Для всего остального, что называется «переменной», мы ожидаем, что если один поток изменит ее, какой-нибудь другой поток может прийти позже и увидеть, что она была изменена. Переменные условия работают не так.

Когда поток A «уведомляет / сигнализирует» переменную условия, он «разбудит» какой-то другой поток, который уже ожидал, но если в этот момент не произошло ожидание другого потока, то сигнал / уведомление абсолютно ничего не делает. вообще.

Условные переменные не запоминают уведомления.

Вот что, я думаю, может произойти:

t1 Поток блокирует мьютекс, а затем переходит в спящий режим.

Все остальные три потока запускаются, и все блокируются в ожидании мьютекса.

t1 Поток возвращается из sleep(3) , и он сигнализирует о переменной условия. Но переменные условия не запоминают уведомления. Ни один из других потоков не смог добраться до их wait(mutex) вызовов, потому что все они все еще пытаются пройти мимо mutex.synchronize . Уведомление потеряно.

t1 Поток покидает синхронизированный блок, другие потоки входят в свои синхронизированные блоки, один за другим, пока все они не будут ожидать сигналов.

Между тем, основной поток зависает t1.join() . Этот вызов возвращается, когда t1 поток завершается, но затем основной поток вызывает t2.join() t2 ожидание сигнала, t3 ожидает сигнала, t4 ожидает сигнала, а основной поток ожидает t2 завершения.

Больше никаких живых потоков.


Опять же, не эксперт по ruby, но на любом другом языке поток, который использует переменную условия для ожидания некоторого «условия», должен делать что-то вроде этого:

 # The mutex prevents other threads from modifying the "condition"
# (i.e., prevents them from modifying the `sharedData`.)
mutex.lock()

while ( sharedData.doesNotSatisfyTheCondition() ) {

    # The `wait()` call _temporarily_ unlocks the mutex so that other
    # threads may make the condition become true, but it's _guaranteed_
    # to re-lock the mutex before it returns.
    conditionVar.wait(mutex)
}

# At this point, the condition is _guaranteed_ to be true.
sharedData.doSomethingThatRequiresTheConditionToBeTrue()

mutex.unlock()
 

Самое важное, что здесь происходит, — это то, что вызывающий не ждет, если условие уже выполнено. Если условие уже выполнено, то уведомление, вероятно, уже произошло. Мы упустили это, и если мы будем ждать этого сейчас, то можем ждать вечно.

Другая важная вещь заключается в том, что после того, как мы дождались и получили уведомление, мы снова проверяем условие. В зависимости от правил языка программирования, операционной системы и архитектуры программы; может быть возможно wait() преждевременный возврат.


Сделать условие истинным просто:

 mutex.lock()
sharedData.doSomethingThatMakesTheConditionTrue()
conditionVar.notify()
mutex.unlock()
 

Комментарии:

1. Думаю, теперь я лучше понимаю проблему. Я сделал это , чтобы посмотреть, смогу ли я заставить потоки выполняться по порядку, и это работает большую часть времени, но время от времени я получаю некоторые ошибки взаимоблокировки. Что может быть причиной этого?

2. Да, это выглядит лучше.

3. Я заменил сигнал на широковещательный, и, похоже, теперь он работает нормально. Можете ли вы подтвердить, что это правильно?