Ошибки гнезда обработки ржавчины — ожидаемая структура » Коробка`, найдено перечисление

#rust

Вопрос:

Я работаю с rdkafka и serde и пытаюсь переписать свой код во что-то более элегантное, чтобы избежать уродливых вложенных совпадений для обработки сценариев ошибок. Я придумал следующее, но изо всех сил пытаюсь понять, как заставить его работать.

То, что я пытаюсь сделать, и, возможно, ошибочная идея, я новичок в Rust, заключается в том, чтобы выполнить всю логику чтения сообщения, чтения полезной нагрузки, обработки полезной нагрузки путем чтения JSON, и в любой момент, если возникнет ошибка, я просто хочу выйти из системы и перейти к следующему сообщению в Кафке.

В моей первой итерации я просто использовал unwrap, чтобы заставить что-то работать, затем переместился, чтобы соответствовать любому месту, куда могла вернуться ошибка, но это стало грязным и почему-то выглядело хуже, чем версия Go. Я пытаюсь найти более чистый подход.

Это ошибка, которую я получаю…

 134 |               let res: Result<(), Box<dyn Error>> = msg_result.and_then(|msg| {
    |  ______________________--------------------------___^
    | |                      |
    | |                      expected due to this
135 | |                 msg.payload().ok_or(Err(Box::new("empty message")))
136 | |                     .and_then(|payload| {
137 | |                         serde_json::from_slice(payload)
...   |
150 | |                 Ok(())
151 | |             });
    | |______________^ expected struct `Box`, found enum `KafkaError`
    |
    = note: expected enum `Result<_, Box<dyn StdError>>`
               found enum `Result<_, KafkaError>`
 

И мой фрагмент кода

 pub fn consume_cms_documents() {

    let consumer: BaseConsumer = ClientConfig::new()
        .set("bootstrap.servers", "localhost:9092")
        .set("enable.auto.commit", "false")
        .set("group.id", "experimentdocument")
        .set("auto.offset.reset", "earliest")
        .set_log_level(RDKafkaLogLevel::Debug)
        .create()
        .expect("invalid consumer config");

    consumer.subscribe(amp;["CmsDocuments"])
        .expect("topic subscribe failed");

    loop {
        for msg_result in consumer.iter() {
            let res: Result<(), Box<dyn Error>> = msg_result.and_then(|msg| {
                msg.payload().ok_or(Err(Box::new("empty message")))
                    .and_then(|payload| {
                        serde_json::from_slice(payload)
                            .and_then(|e: CmsEvent| {
                                if e.r#type.eq(EXPERIMENT_DOCUMENT_TYPE) {
                                    serde_json::from_value(e.datasource)
                                        .and_then(|exp_doc| {
                                            println!("Document: {:?}", exp_doc);
                                            // todo: call something here to actually process the message and store it
                                            Ok(())
                                        });
                                }
                                Ok(())
                            });
                        Ok(())
                    });
                Ok(())
            });

            match res {
                Ok(_) => (),
                Err(e) => println!("failed to process kafka message {:?}", e)
            }
        }
    }
}
 

Комментарии:

1. Вы пробовали обернуть код анализа полезной нагрузки вашего метода в его собственный метод с типом возвращаемого Result<(), Box<dyn std::error::Error> значения и использовать ? оператор для передачи ошибок, которые вы можете сопоставить со своими match res ?

2. Возможно, что-то вроде этого (не проверено): pastebin.com/3LsAd2gh

3. Я предлагаю рассмотреть anyhow возможность унификации типов ошибок.

4. О, и map_for чтобы упростить ваш вложенный and_then каскад (отказ от ответственности: я написал map_for ).

5. Это самый читаемый код, который я видел за последнее время. Функциональный стиль иногда упрощает вещи, но в других случаях он делает их ужасающими. Почему вы просто не использовали ? для раннего распространения ошибок ?