Как дублировать поток Rust

#rust #rust-tokio

#Ржавчина #rust-tokio

Вопрос:

У меня есть программа, похожая на следующую:

 struct MyEvent { /* some fields */ }
struct MyStruct { /* some fields */ }
struct MyStreamer { /* holds some state */ }

impl MyStreamer {
    pub fn stream_objects<'a, 'b: 'a>(
        amp;'a self,
        event_stream: Pin<Box<dyn Stream<Item = MyEvent>   Send   'b>>,
    ) -> Pin<Box<dyn Stream<Item = Arc<MyStruct>>   Send   'a>> { /* implementation */ }
}
 

Цель состоит в том, чтобы обрабатывать события и создавать поток MyStruct . Затем у меня есть два потребителя для потока MyStruct , и я изо всех сил пытаюсь его дублировать.

Я пытаюсь написать следующую функцию (также см. Мою попытку реализации):

 pub fn duplicate_stream<'a, 'b: 'a>(
    amp;'a self,
    struct_stream: Pin<Box<dyn Stream<Item = Arc<MyStruct>>   Send   'b>>,
) -> (
   Pin<Box<dyn Stream<Item = Arc<MyStruct>>   Send   'b>>,
   Pin<Box<dyn Stream<Item = Arc<MyStruct>>   Send   'b>>
) {
        let (s1, r1) = mpsc::unbounded::<Arc<MyStruct>>();
        let (s2, r2) = mpsc::unbounded::<Arc<MyStruct>>();

        let s = s1.fanout(s2);

        let _handle = tokio::spawn(async move { struct_stream.map(Ok).forward(s).await });

        (r1.boxed(), r2.boxed())
}
 

На данный момент мне сказали следующее:

     |
155 |         struct_stream: Pin<Box<dyn Stream<Item = Arc<MyStruct>>   Send   'b>>,
    |                               ----------------------------------------------- this data with lifetime `'b`...
...
165 |         let _handle = tokio::spawn(async move { struct_stream.map(Ok).forward(s).await });
    |                                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ...is captured here...
    |
note: ...and is required to live as long as `'static` here
   --> *file name here*
    |
165 |         let _handle = tokio::spawn(async move { struct_stream.map(Ok).forward(s).await });
    |                       ^^^^^^^^^^^^
 

Я могу удалить время жизни, но затем static будет выведено, и я получаю ошибки в вызывающих абонентах.

Мне любопытно понять, какой наилучший способ — клонировать все элементы потока и получить два идентичных потока. Использование tokio::spawn mpsc каналов и, похоже, требует изменения большого времени жизни на static .

Комментарии:

1. Я думаю, что «дублировать» — неправильный термин для того, что вы ищете. Возможно, «разделить».

2. @Elazar Я думал об использовании «разделения», но почувствовал, что это подразумевает, что я хочу, чтобы некоторые элементы потока переходили в поток 1, а некоторые — в поток 2.

3. @RaduSzasz Два идентичных потока, как в a.next() и b.next() , будут одинаковыми, или a и b оба совместно используют базовый поток, и вызов .next() одного из них продвинет оба (аналогично Iterator::by_ref )?

4. @Aplet123 a.next() и b.next() должен возвращать то же самое. Предположим struct_stream = stream::iter!(vec![1, 2, 3].into_iter()) , и let (s1, s2) = duplicate_stream(struct_stream) . Затем s1.next() возвращается 1 . То же самое, s2.next() возвращает 1 .

Ответ №1:

Я думаю, что это проблема XY

Компилятор правильный, и вам, вероятно, следует подумать о времени жизни, потому что разветвление в порядке.

tokio::spawn требуется 'static , и у вас есть 'b время жизни, указанное для вашего struct_stream . Может быть, обернуть struct_stream в Arc/Rc ?

Комментарии:

1. Спасибо за ваш ответ! Мой вопрос касается строго «У меня есть один поток, и я хочу иметь два потока. Как мне это сделать? » Не стесняйтесь предлагать лучшую формулировку, чтобы было понятно. Обычно я включаю в свой вопрос то, что я уже пробовал, чтобы было очевидно, что я не просто сразу же опубликовал в StackOverflow, как только столкнулся с проблемой, и люди знают путь, по которому я шел до сих пор. Я не ожидаю, что ответы, которые я получаю, будут определять время жизни (если только это не лучший способ ответить на вопрос).

2. @RaduSzasz и я думаю, что я ответил в последней части своего комментария. Я думаю (я не лучший rustacean, но большую часть использовал tokio), что то, что вы сделали (используйте fanout), является правильным подходом, который не работает, потому что вы неправильно используете время жизни, и я предположил, что одним из способов приблизиться к несоответствию времени жизни было бы обернуть struct_stream с помощью ref count . Ваш подход работает не потому, что он неправильный, а потому, что код неверен.