#rust #rust-tokio
#Ржавчина #rust-tokio
Вопрос:
У меня есть программа, похожая на следующую:
struct MyEvent { /* some fields */ }
struct MyStruct { /* some fields */ }
struct MyStreamer { /* holds some state */ }
impl MyStreamer {
pub fn stream_objects<'a, 'b: 'a>(
amp;'a self,
event_stream: Pin<Box<dyn Stream<Item = MyEvent> Send 'b>>,
) -> Pin<Box<dyn Stream<Item = Arc<MyStruct>> Send 'a>> { /* implementation */ }
}
Цель состоит в том, чтобы обрабатывать события и создавать поток MyStruct
. Затем у меня есть два потребителя для потока MyStruct
, и я изо всех сил пытаюсь его дублировать.
Я пытаюсь написать следующую функцию (также см. Мою попытку реализации):
pub fn duplicate_stream<'a, 'b: 'a>(
amp;'a self,
struct_stream: Pin<Box<dyn Stream<Item = Arc<MyStruct>> Send 'b>>,
) -> (
Pin<Box<dyn Stream<Item = Arc<MyStruct>> Send 'b>>,
Pin<Box<dyn Stream<Item = Arc<MyStruct>> Send 'b>>
) {
let (s1, r1) = mpsc::unbounded::<Arc<MyStruct>>();
let (s2, r2) = mpsc::unbounded::<Arc<MyStruct>>();
let s = s1.fanout(s2);
let _handle = tokio::spawn(async move { struct_stream.map(Ok).forward(s).await });
(r1.boxed(), r2.boxed())
}
На данный момент мне сказали следующее:
|
155 | struct_stream: Pin<Box<dyn Stream<Item = Arc<MyStruct>> Send 'b>>,
| ----------------------------------------------- this data with lifetime `'b`...
...
165 | let _handle = tokio::spawn(async move { struct_stream.map(Ok).forward(s).await });
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ...is captured here...
|
note: ...and is required to live as long as `'static` here
--> *file name here*
|
165 | let _handle = tokio::spawn(async move { struct_stream.map(Ok).forward(s).await });
| ^^^^^^^^^^^^
Я могу удалить время жизни, но затем static
будет выведено, и я получаю ошибки в вызывающих абонентах.
Мне любопытно понять, какой наилучший способ — клонировать все элементы потока и получить два идентичных потока. Использование tokio::spawn
mpsc
каналов и, похоже, требует изменения большого времени жизни на static
.
Комментарии:
1. Я думаю, что «дублировать» — неправильный термин для того, что вы ищете. Возможно, «разделить».
2. @Elazar Я думал об использовании «разделения», но почувствовал, что это подразумевает, что я хочу, чтобы некоторые элементы потока переходили в поток 1, а некоторые — в поток 2.
3. @RaduSzasz Два идентичных потока, как в
a.next()
иb.next()
, будут одинаковыми, илиa
иb
оба совместно используют базовый поток, и вызов.next()
одного из них продвинет оба (аналогичноIterator::by_ref
)?4. @Aplet123
a.next()
иb.next()
должен возвращать то же самое. Предположимstruct_stream = stream::iter!(vec![1, 2, 3].into_iter())
, иlet (s1, s2) = duplicate_stream(struct_stream)
. Затемs1.next()
возвращается1
. То же самое,s2.next()
возвращает1
.
Ответ №1:
Я думаю, что это проблема XY
Компилятор правильный, и вам, вероятно, следует подумать о времени жизни, потому что разветвление в порядке.
tokio::spawn
требуется 'static
, и у вас есть 'b
время жизни, указанное для вашего struct_stream
. Может быть, обернуть struct_stream
в Arc/Rc
?
Комментарии:
1. Спасибо за ваш ответ! Мой вопрос касается строго «У меня есть один поток, и я хочу иметь два потока. Как мне это сделать? » Не стесняйтесь предлагать лучшую формулировку, чтобы было понятно. Обычно я включаю в свой вопрос то, что я уже пробовал, чтобы было очевидно, что я не просто сразу же опубликовал в StackOverflow, как только столкнулся с проблемой, и люди знают путь, по которому я шел до сих пор. Я не ожидаю, что ответы, которые я получаю, будут определять время жизни (если только это не лучший способ ответить на вопрос).
2. @RaduSzasz и я думаю, что я ответил в последней части своего комментария. Я думаю (я не лучший rustacean, но большую часть использовал tokio), что то, что вы сделали (используйте fanout), является правильным подходом, который не работает, потому что вы неправильно используете время жизни, и я предположил, что одним из способов приблизиться к несоответствию времени жизни было бы обернуть
struct_stream
с помощью ref count . Ваш подход работает не потому, что он неправильный, а потому, что код неверен.