Преобразовать C callback API в Stream

#rust

#Ржавчина

Вопрос:

Ссылка на игровую площадку

У меня есть доступ к pulseaudio через асинхронный C api.

 pub fn get_sink_info_by_index<F>(
    amp;self,
    index: u32,
    callback: F
) -> Operation<dyn FnMut(ListResult<amp;SinkInfo>)> where
    F: FnMut(ListResult<amp;SinkInfo>)   'static,
  

Источник: https://docs.rs/libpulse-binding/2.16.1/libpulse_binding/context/introspect/struct .Introspector.html#метод.get_sink_info_by_index

Я реализовал вспомогательную функцию (которая отлично компилируется), которая генерирует обратный вызов для передачи функции C и выходного потока.

 pub fn callbackStream<T:Sized   Clone>() -> (Box<dyn FnMut(ListResult<amp;'static T>)>, Box<dyn Stream<Item=T>>) {
    let (sender, recv) = channel(1024); // TODO channel size?
    let cb  = {|c: ListResult<amp;'static T>| match c {
        Item(it) => match sender.try_send(it.clone()) {
            Ok(_) => (),
            Err(err) => () // TODO
        },
        End => (),
        Error => () // TODO
    }};
    (Box::new(cb), Box::new(recv))
}
  

Однако, при попытке его использовать, я не могу заставить время жизни работать:

     fn stream_sink_info_by_index(
        amp;self,
        index: u32
    ) -> Box<dyn Stream<Item=SinkInfo>> {
        let (callback, stream): (Box<dyn FnMut(ListResult<amp;SinkInfo>)   'static>, Box<dyn Stream<Item=SinkInfo>>) = callbackStream();
        self.get_sink_info_by_index(index, callback);
        stream
    }
  

дает мне

 error[E0308]: mismatched types
  --> src/futuristicPulse.rs:31:117
   |
31 |         let (callback, stream): (Box<dyn FnMut(ListResult<amp;SinkInfo>)   'static>, Box<dyn Stream<Item=SinkInfo>>) = callbackStream();
   |                                 ---------------------------------------------------------------------------------   ^^^^^^^^^^^^^^^^ expected concrete lifetime, found bound lifetime parameter
   |                                 |
   |                                 expected due to this
   |
   = note: expected tuple `(std::boxed::Box<(dyn for<'r, 's> std::ops::FnMut(pulse::callbacks::ListResult<amp;'r pulse::context::introspect::SinkInfo<'s>>)   'static)>, std::boxed::Box<dyn futures_core::stream::Stream<Item = pulse::context::introspect::SinkInfo<'_>>>)`
              found tuple `(std::boxed::Box<(dyn std::ops::FnMut(pulse::callbacks::ListResult<amp;'static _>)   'static)>, std::boxed::Box<(dyn futures_core::stream::Stream<Item = _>   'static)>)`

error: aborting due to previous error; 10 warnings emitted
  

Если я изменю аннотацию типа на (callback, stream) в соответствии с тем, что находит программа проверки типов, я ошибка перенаправляется на вызов на get_sink_info_by_index :

 error[E0271]: type mismatch resolving `for<'r, 's> <std::boxed::Box<dyn std::ops::FnMut(pulse::callbacks::ListResult<amp;'static pulse::context::introspect::SinkInfo<'_>>)> as std::ops::FnOnce<(pulse::callbacks::ListResult<amp;'r pulse::context::introspect::SinkInfo<'s>>,)>>::Output == ()`
  --> src/futuristicPulse.rs:32:14
   |
32 |         self.get_sink_info_by_index(index, callback);
   |              ^^^^^^^^^^^^^^^^^^^^^^ expected bound lifetime parameter, found concrete lifetime

error: aborting due to previous error; 10 warnings emitted
  

Комментарии:

1. Вы пробовали объединить callbackStream и stream_sink_info_by_index в одну функцию? Я думаю, это могло бы немного упростить ситуацию.

2. Я, конечно, мог бы попробовать, но у меня есть примерно 8 других функций, подобных get_sink_info_by_index , так что немного абстракции было бы неплохо.

3. Достаточно справедливо. Почему бы вам не попробовать заменить тип в let (callback, stream): xyz на тип после «найден кортеж …» из сообщения об ошибке?

4. Сделал, добавил это в текст вопроса.

Ответ №1:

Очень грубый ответ — проблема действительно заключалась в it.clone() . Это неправильно передало права собственности, потому что это было SinkInfo<'r> вместо просто SinkInfo . Я написал SinkInfo<'r> -> SinkInfo<'static> , и теперь все работает.