Завершение потоков при выполнении условия

#multithreading #rust #concurrency

Вопрос:

Я хочу эффективно искать ключи двух хэш-карт для одного значения и завершать оба потока, как только значение будет найдено. В настоящее время я делаю это, используя два отдельных канала сообщений (т. Е. Два передатчика и два приемника), но я не уверен, что это правильный подход. Учитывая, что компонент «mpsc» mpsc::channel означает «несколько производителей, один потребитель», кажется неправильным иметь несколько производителей и нескольких потребителей. Итак, есть ли лучший способ одновременного поиска в двух массивах?

Мой код также доступен на игровой площадке:

 use std::collections::HashMap;
use std::array::IntoIter;
use std::thread;
use std::time::Duration;
use std::iter::FromIterator;
use std::sync::mpsc;

fn main() {
    let m1 = HashMap::<_, _>::from_iter(IntoIter::new([(1, 2), (3, 4), (5, 6), (7,8), (9, 10)]));
    let m2 = HashMap::<_, _>::from_iter(IntoIter::new([(1, 2), (3, 4), (5, 6), (7,8), (9, 10), (11, 12), (13, 14), (15, 16), (17,18), (19, 20)]));

    let (tx1, rx1) = mpsc::channel::<u8>();
    let (tx2, rx2) = mpsc::channel::<u8>();

    let handle1 = thread::spawn(move || {
        let iter_keys1 = m1.keys();
        for k in iter_keys1 {
            if k.clone() == 11u8 {
                tx2.send(*k);
                break
            } else {
                println!("Key from handle1: {}", k);
            }
            thread::sleep(Duration::from_millis(1));
        }
        for received in rx1 {
            let into: u8 = received;
            if into == 11u8 {
                println!("handle2 sent a message to receiver1: {}", into);
                break
            }
        }
        m1
    });

    let handle2 = thread::spawn(move || {
        let iter_keys2 = m2.keys();
        for k in iter_keys2 {
            if k.clone() == 11u8 {
                tx1.send(*k);
                break
            } else {
                println!("Key from handle2: {}", k);
            }
            thread::sleep(Duration::from_millis(1));
        }
        for received in rx2 {
            let into: u8 = received;
            if into == 11u8 {
                println!("handle1 sent a message to receiver2: {}", into);
                break
            }
        }
        m2
    });
    handle1.join().unwrap();
    handle2.join().unwrap();
}

 

Несколько связанный с этим вопрос: Есть ли практическая причина для использования sleep , или это просто облегчает просмотр результатов параллельной обработки на небольших выборках? Когда я комментирую thread::sleep(Duration::from_millis(1)); строки, кажется, что потоки обрабатываются последовательно:

 Key from handle1: 9
Key from handle1: 5
Key from handle1: 3
Key from handle1: 1
Key from handle1: 7
Key from handle2: 1
handle2 sent a message to receiver1: 11
 
Осветление:

Я пытаюсь найти ключ, который мог бы существовать в двух разных хэш-картах. В этом примере я ищу 11 в обоих наборах ключей и хочу завершить оба потока, когда найду его в любом из наборов ключей.

Комментарии:

1. Поскольку вы пишете, а затем читаете, я бы сказал, что то, что вы здесь представляете, является плохой моделью, но это трудно посоветовать вам больше

2. Что вы на самом деле пытаетесь сделать ? Этот пример не имеет смысла.

3. Я пытаюсь найти ключ, который мог бы существовать в двух разных хэш-картах. В этом примере я ищу 11 в обоих наборах ключей и хочу завершить оба потока, когда найду его в любом из наборов ключей.

Ответ №1:

Я пытаюсь найти ключ, который мог бы существовать в двух разных хэш-картах. В этом примере я ищу 11 в обоих наборах ключей и хочу завершить оба потока, когда найду его в любом из наборов ключей.

В этом случае нет причин использовать mpsc для передачи условия остановки. Вы можете использовать простой атомарный bool:

 use std::array::IntoIter;
use std::collections::HashMap;
use std::iter::FromIterator;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;

fn main() {
    let m1 = HashMap::<_, _>::from_iter(IntoIter::new([(1, 2), (3, 4), (5, 6), (7, 8), (9, 10)]));
    let m2 = HashMap::<_, _>::from_iter(IntoIter::new([
        (1, 2),
        (3, 4),
        (5, 6),
        (7, 8),
        (9, 10),
        (11, 12),
        (13, 14),
        (15, 16),
        (17, 18),
        (19, 20),
    ]));

    let stop_signal = Arc::new(AtomicBool::new(false));

    let stop = stop_signal.clone();
    let h1 = thread::spawn(move || {
        let keys = m1.keys();
        for amp;k in keys {
            if stop.load(Ordering::Relaxed) {
                println!("Another thread found it!");
                break;
            }

            if k == 11u8 {
                stop.store(true, Ordering::Relaxed);
                // do something with the found key
                println!("Found by thread 1");
                break;
            }
        }
        m1
    });

    let stop = stop_signal.clone();
    let h2 = thread::spawn(move || {
        let keys = m2.keys();
        for amp;k in keys {
            if stop.load(Ordering::Relaxed) {
                println!("Another thread found it!");
                break;
            }

            if k == 11u8 {
                stop.store(true, Ordering::Relaxed);
                // do something with the found key
                println!("Found by thread 2");
                break;
            }
        }
        m2
    });

    h1.join().unwrap();
    h2.join().unwrap();
}

 

В вашем исходном коде было несколько проблем:

  • Один из потоков оставался бы живым даже тогда, когда он закончил работу со своей картой, пока не получил сообщение.
  • Даже если бы один из участников потока нашел ключ, другой все равно продолжал бы его искать
  • Нет смысла делать thread::sleep() это в цикле. Это ничего не дает, кроме замедления работы приложения