Вложенные циклы с «Отправителем» ржавчины и «for_each_with ()» района

#multithreading #rust #rayon

Вопрос:

У меня есть многопоточный код, который предполагает использование вложенного цикла, где внутренний выполняется параллельно. «Общий доступ» в каждом потоке Sender -это функция, которая будет возвращать результаты. В частности Sender , реализует Send , так что не должно быть никаких проблем с его клонированием и отправкой с использованием <url for_each_with() > . Однако компиляция этого кода:

 use std::sync::mpsc::channel;
use rayon::prelude::*;

fn main(){
    let (sender, receiver) = channel();

    (0..5).for_each(|i|{
        (0..5).into_par_iter().for_each_with(amp;sender, |sender, j|{
            sender.send(i   j).unwrap();   
        });
    });
}
 

Дает мне:

 8 |         (0..5).into_par_iter().for_each_with(amp;sender, |sender, j|{
  |                                              ^^^^^^^ `Sender<_>` cannot be shared between threads safely
  |
  = help: the trait `Sync` is not implemented for `Sender<_>`
  = note: required because of the requirements on the impl of `Send` for `amp;Sender<_>`
 

(Детская площадка)

Теперь я подумал, что это может быть связано с тем, что я пытаюсь клонировать ссылку, но если я перенесу фактическое sender в for_each_with() (т. Е. for_each_with(sender, ...) ), оно будет использовано первой итерацией внешнего цикла:

 error[E0507]: cannot move out of `sender`, a captured variable in an `FnMut` closure
 

(Детская площадка).

Как я могу реализовать этот шаблон таким образом, чтобы он удовлетворял компилятору Rust?

Комментарии:

1. просто клонировать play.rust-lang.org/…

2. предупреждение о стиле, for_each это плохо и не имеет смысла, используйте для цикла, мир благодарит вас.

3. Я не согласен, я нахожу for_each гораздо более читабельным, как часть длинной цепочки методов итератора. И в любом случае я использую try_for_each и for_each_with которые добавляют дополнительную функциональность, не подходящую для for цикла.

4. Хорошо, что клонирование исправляет это, но я хочу знать: а) зачем мне это нужно, и б) не означает ли это, что я клонирую n x m m раз, n для внутреннего цикла и m для внешнего цикла? Потому for_each_with что предполагается клонировать init аргумент, означающий, что у нас есть n x m клоны, но вызов clone напрямую добавляет другие m клоны. Это кажется расточительным

5. Я не могу ответить на вопрос о смысле, когда код не имеет смысла, да, клона можно было бы избежать, если for_each_with вернуть элемент инициализации. Но это, вероятно, не так, потому что ваш первый цикл не имеет смысла в случае с миром чтения. Я не использовал вискозу, и мне это не очень нравится. также клонировать отправителя дешево. «И в любом случае я использую try_for_each и for_each_, которые добавляют дополнительную функциональность, не подходящую для цикла for». мой совет был только для for_each std. «почему мне нужно это сделать», если вы отправите amp;sender запрос в район, чтобы клонировать ссылку, и, как компилятор, скажете, что небезопасно делиться в потоке.

Ответ №1:

AFAIK, район использует пул потоков, это означает, что району требуется реализовать элемент Send , потому что он отправит его в поток. вискоза клонируется не для каждого элемента, а для каждой нити:

 fn for_each_with<OP, T>(self, init: T, op: OP) where
    OP: Fn(amp;mut T, Self::Item)   Sync   Send,
    T: Send   Clone,
 

Мы видим OP , что взятие а amp;mut T не а T . Это означает for_each_with() , что клон для каждого числа потоков, используемых не для количества продуктов изделия.

Ссылку нужно внедрить Sync , чтобы внедрить Send . Sender это определить, чтобы не реализовывать Sync . Я все равно не знаю подробностей этого выбора, это означает amp;Sender , что не может быть разделено между потоками. Я не думаю, что есть решение для устранения этого ограничения.

Но если вы хотите, вы можете использовать поперечную балку-канал, который реализует Sync :

 use crossbeam_channel::unbounded; // 0.5.1
use rayon::prelude::*; // 1.5.1

fn main() {
    let (sender, _receiver) = unbounded();

    for i in 0..5 {
        (0..5).into_par_iter().for_each_with(amp;sender, |sender, j| {
            sender.send(i   j).unwrap();
        });
    }
}
 

было бы хорошо скомпилировать. Бонус в том, что поперечный канал утверждает, что он быстрее. Тем не менее, клонирование полностью подходит для ЗППП Sender :

 use rayon::prelude::*;
use std::sync::mpsc::channel;

fn main() {
    let (sender, _receiver) = channel();

    for i in 0..5 {
        let sender = sender.clone();
        (0..5).into_par_iter().for_each_with(sender, |sender, j| {
            sender.send(i   j).unwrap();
        });
    }
}
 

Это действительно O(n) увеличило бы время клонирования, но Sender из-за зппп клонировать много значит. (На самом деле это, вероятно, просто добавить один клон, потому что вы решили сделать вложенный цикл, код, вероятно, не клонирует последний, а просто дает ему последний поток, и поэтому вы просто слишком сильно клонируете его, насколько мы можем проверить)

В любом случае, все ваши проблемы возникают из-за странной ситуации, следует сгладить итерацию, например:

 use rayon::prelude::*;
use std::sync::mpsc::channel;

fn main() {
    let (sender, _recever) = channel();

    (0..5)
        .into_par_iter()
        .flat_map_iter(|i| (0..5).map(move |j| (i, j)))
        .for_each_with(sender, |sender, (i, j)| {
            sender.send(i   j).unwrap();
        });
}
 

Вы также могли бы подумать о том, чтобы не использовать Sender вискозу, вероятно, для использования, collect() и вместо отправки товара вы могли бы просто забрать их в конце.

Комментарии:

1. Здесь много замечательных объяснений и решений, спасибо. Итак, если Район клонируется только один раз в потоке, означает ли это, что использование .clone() приведет только к n t клонам, где n количество итераций во внешнем цикле и t количество потоков? Потому что это вполне терпимо.

2. @Migwell нет , это n * t (вероятно) потому, что вы делаете n итераций, где вы клонируете (t — 1) раз, но вы клонируете себя 1 раз, так n * (t - 1 1) что, поскольку вы на самом деле не использовали исходного отправителя, вы клонируете слишком много. Опять же, АФАИК, может быть, район делает это не совсем так.

3. @Migwell На самом деле мы можем немного это проверить play.rust-lang.org/…

Ответ №2:

Sender сделано для клонирования, это буквально в документах. У каждого дочернего потока должен быть свой собственный отправитель.

Комментарии:

1.Моя проблема немного более тонкая, чем это. Я понимаю, что отправителя нужно клонировать, и рад это сделать, но for_each_with делает клонирование за меня, поэтому любое дополнительное клонирование, которое я делаю, излишне.