Как установить правильное время жизни `self` при вызове tokio ::spawn?

#rust #rust-tokio

#Ржавчина #rust-tokio

Вопрос:

Я работаю над типом, реализующим Stream признак. В основном он принимает два других потока, получает next() из них значения, а затем возвращает next() само значение на основе некоторых условий. Я дошел до того, что настроил код следующим образом:

 impl Stream for HeartbeatStream {
  type Item = char;

  fn poll_next(mut self: Pin<amp;mut Self>, cx: amp;mut Context<'_>) -> Poll<Option<Self::Item>> {
    // ...
    // Dealing with heartbeat and no heartbeat
    match amp;mut self.next_heartbeat {
      Some(hb) => {
        Pin::new(hb).poll(cx);
        tokio::spawn(async move {
          tokio::select! {
            Some(char) = stream1.next(), if self.cooldown.map_or(true, |cd| since_last_signal > cd) => {
              self.reset_last_signal_next_heartbeat();
              return Poll::Ready(Some(char))
            },
            Some(char) = stream2.next(), if self.cooldown.map_or(true, |cd| since_last_signal > cd) => {
              self.reset_last_signal_next_heartbeat();
              return Poll::Ready(Some(char))
            },
            _ = hb => {
              self.reset_last_signal_next_heartbeat();
              return Poll::Ready(Some(self.char))
            },
            else => Poll::Ready(None),
          }
        });
      },
      None => {
        ...
      }
    }

    Poll::Pending
  }
}
  

Здесь показан полный код.

Когда я компилирую это, я получаю сообщение об ошибке:

 error[E0495]: cannot infer an appropriate lifetime for lifetime parameter in function call due to conflicting requirements
  --> src/heartbeat.rs:59:14
   |
59 |         match amp;mut self.next_heartbeat {
   |                    ^^^^
   |
note: first, the lifetime cannot outlive the anonymous lifetime #1 defined on the method body at 52:2...
  --> src/heartbeat.rs:52:2
   |
52 |       fn poll_next(mut self: Pin<amp;mut Self>, cx: amp;mut Context<'_>) -> Poll<Option<Self::Item>> {
   |  _____^
53 | |         let mut stream1 = self.streams[0];
54 | |         let mut stream2 = self.streams[1];
55 | |         let since_last_signal = Instant::now().saturating_duration_since(
...  |
97 | |         Poll::Pending
98 | |     }
   | |_____^
note: ...so that the reference type `amp;mut std::pin::Pin<amp;mut heartbeat::HeartbeatStream>` does not outlive the data it points at
  --> src/heartbeat.rs:59:14
   |
59 |         match amp;mut self.next_heartbeat {
   |                    ^^^^
   = note: but, the lifetime must be valid for the static lifetime...
note: ...so that the type `impl std::future::Future` will meet its required lifetime bounds
  --> src/heartbeat.rs:62:5
   |
62 |                 tokio::spawn(async move {
   |                 ^^^^^^^^^^^^

error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
  --> src/heartbeat.rs:62:29
   |
62 |                   tokio::spawn(async move {
   |  _________________________________________^
63 | |                     tokio::select! {
64 | |                         Some(char) = stream1.next(), if self.cooldown.map_or(true, |cd| since_last_signal > cd) => {
65 | |                             self.reset_last_signal_next_heartbeat();
...  |
77 | |                     }
78 | |                 });
   | |_________________^
   |
note: first, the lifetime cannot outlive the anonymous lifetime #1 defined on the method body at 52:2...
  --> src/heartbeat.rs:52:2
   |
52 |       fn poll_next(mut self: Pin<amp;mut Self>, cx: amp;mut Context<'_>) -> Poll<Option<Self::Item>> {
   |  _____^
53 | |         let mut stream1 = self.streams[0];
54 | |         let mut stream2 = self.streams[1];
55 | |         let since_last_signal = Instant::now().saturating_duration_since(
...  |
97 | |         Poll::Pending
98 | |     }
   | |_____^
note: ...so that the types are compatible
  --> src/heartbeat.rs:62:29
   |
62 |                   tokio::spawn(async move {
   |  _________________________________________^
63 | |                     tokio::select! {
64 | |                         Some(char) = stream1.next(), if self.cooldown.map_or(true, |cd| since_last_signal > cd) => {
65 | |                             self.reset_last_signal_next_heartbeat();
...  |
77 | |                     }
78 | |                 });
   | |_________________^
   = note: expected `std::pin::Pin<amp;mut heartbeat::HeartbeatStream>`
              found `std::pin::Pin<amp;mut heartbeat::HeartbeatStream>`
   = note: but, the lifetime must be valid for the static lifetime...
note: ...so that the type `impl std::future::Future` will meet its required lifetime bounds
  --> src/heartbeat.rs:62:5
   |
62 |                 tokio::spawn(async move {
   |                 ^^^^^^^^^^^^
  

Здесь показан полный код ошибки.

Я вижу из tokio doc, что обычно это можно решить путем асинхронного перемещения и перемещения переменной за пределы tokio::spawn() , которая будет принадлежать внутри асинхронного кода. Но в этом случае он жалуется на self объект.

Что я могу сделать, чтобы решить эту проблему?

Спасибо, что пролили свет 🙏

Комментарии:

1. Перемещение ссылки не помогает — это все еще ссылка. Все, что перемещается в новую задачу, не может содержать ссылки, потому что компилятор не может знать, когда (или если) задача будет завершена. Вот откуда возникает 'static требование.

Ответ №1:

Компилятор Rust не знает tokio::spawn() , присоединится ли перед self удалением. Даже если tokio::spawn() возвращает a JoinHandle , который может быть передан в a std::mem::forget() и никогда не будет присоединен, что делает закрытие доступным для удаленной ссылки self (что небезопасно). Вот почему компилятор применяет 'static время жизни. В общем случае «гарантии безопасности Rust не включают гарантию того, что деструкторы будут выполняться всегда» (источник). Я бы посоветовал вам вызывать напрямую poll_next() вместо next() .