#multithreading #rust #concurrency
Вопрос:
Я хочу эффективно искать ключи двух хэш-карт для одного значения и завершать оба потока, как только значение будет найдено. В настоящее время я делаю это, используя два отдельных канала сообщений (т. Е. Два передатчика и два приемника), но я не уверен, что это правильный подход. Учитывая, что компонент «mpsc» mpsc::channel
означает «несколько производителей, один потребитель», кажется неправильным иметь несколько производителей и нескольких потребителей. Итак, есть ли лучший способ одновременного поиска в двух массивах?
Мой код также доступен на игровой площадке:
use std::collections::HashMap;
use std::array::IntoIter;
use std::thread;
use std::time::Duration;
use std::iter::FromIterator;
use std::sync::mpsc;
fn main() {
let m1 = HashMap::<_, _>::from_iter(IntoIter::new([(1, 2), (3, 4), (5, 6), (7,8), (9, 10)]));
let m2 = HashMap::<_, _>::from_iter(IntoIter::new([(1, 2), (3, 4), (5, 6), (7,8), (9, 10), (11, 12), (13, 14), (15, 16), (17,18), (19, 20)]));
let (tx1, rx1) = mpsc::channel::<u8>();
let (tx2, rx2) = mpsc::channel::<u8>();
let handle1 = thread::spawn(move || {
let iter_keys1 = m1.keys();
for k in iter_keys1 {
if k.clone() == 11u8 {
tx2.send(*k);
break
} else {
println!("Key from handle1: {}", k);
}
thread::sleep(Duration::from_millis(1));
}
for received in rx1 {
let into: u8 = received;
if into == 11u8 {
println!("handle2 sent a message to receiver1: {}", into);
break
}
}
m1
});
let handle2 = thread::spawn(move || {
let iter_keys2 = m2.keys();
for k in iter_keys2 {
if k.clone() == 11u8 {
tx1.send(*k);
break
} else {
println!("Key from handle2: {}", k);
}
thread::sleep(Duration::from_millis(1));
}
for received in rx2 {
let into: u8 = received;
if into == 11u8 {
println!("handle1 sent a message to receiver2: {}", into);
break
}
}
m2
});
handle1.join().unwrap();
handle2.join().unwrap();
}
Несколько связанный с этим вопрос: Есть ли практическая причина для использования sleep
, или это просто облегчает просмотр результатов параллельной обработки на небольших выборках? Когда я комментирую thread::sleep(Duration::from_millis(1));
строки, кажется, что потоки обрабатываются последовательно:
Key from handle1: 9
Key from handle1: 5
Key from handle1: 3
Key from handle1: 1
Key from handle1: 7
Key from handle2: 1
handle2 sent a message to receiver1: 11
Осветление:
Я пытаюсь найти ключ, который мог бы существовать в двух разных хэш-картах. В этом примере я ищу 11 в обоих наборах ключей и хочу завершить оба потока, когда найду его в любом из наборов ключей.
Комментарии:
1. Поскольку вы пишете, а затем читаете, я бы сказал, что то, что вы здесь представляете, является плохой моделью, но это трудно посоветовать вам больше
2. Что вы на самом деле пытаетесь сделать ? Этот пример не имеет смысла.
3. Я пытаюсь найти ключ, который мог бы существовать в двух разных хэш-картах. В этом примере я ищу 11 в обоих наборах ключей и хочу завершить оба потока, когда найду его в любом из наборов ключей.
Ответ №1:
Я пытаюсь найти ключ, который мог бы существовать в двух разных хэш-картах. В этом примере я ищу 11 в обоих наборах ключей и хочу завершить оба потока, когда найду его в любом из наборов ключей.
В этом случае нет причин использовать mpsc
для передачи условия остановки. Вы можете использовать простой атомарный bool:
use std::array::IntoIter;
use std::collections::HashMap;
use std::iter::FromIterator;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
fn main() {
let m1 = HashMap::<_, _>::from_iter(IntoIter::new([(1, 2), (3, 4), (5, 6), (7, 8), (9, 10)]));
let m2 = HashMap::<_, _>::from_iter(IntoIter::new([
(1, 2),
(3, 4),
(5, 6),
(7, 8),
(9, 10),
(11, 12),
(13, 14),
(15, 16),
(17, 18),
(19, 20),
]));
let stop_signal = Arc::new(AtomicBool::new(false));
let stop = stop_signal.clone();
let h1 = thread::spawn(move || {
let keys = m1.keys();
for amp;k in keys {
if stop.load(Ordering::Relaxed) {
println!("Another thread found it!");
break;
}
if k == 11u8 {
stop.store(true, Ordering::Relaxed);
// do something with the found key
println!("Found by thread 1");
break;
}
}
m1
});
let stop = stop_signal.clone();
let h2 = thread::spawn(move || {
let keys = m2.keys();
for amp;k in keys {
if stop.load(Ordering::Relaxed) {
println!("Another thread found it!");
break;
}
if k == 11u8 {
stop.store(true, Ordering::Relaxed);
// do something with the found key
println!("Found by thread 2");
break;
}
}
m2
});
h1.join().unwrap();
h2.join().unwrap();
}
В вашем исходном коде было несколько проблем:
- Один из потоков оставался бы живым даже тогда, когда он закончил работу со своей картой, пока не получил сообщение.
- Даже если бы один из участников потока нашел ключ, другой все равно продолжал бы его искать
- Нет смысла делать
thread::sleep()
это в цикле. Это ничего не дает, кроме замедления работы приложения