#rust
Вопрос:
Я работаю с rdkafka и serde и пытаюсь переписать свой код во что-то более элегантное, чтобы избежать уродливых вложенных совпадений для обработки сценариев ошибок. Я придумал следующее, но изо всех сил пытаюсь понять, как заставить его работать.
То, что я пытаюсь сделать, и, возможно, ошибочная идея, я новичок в Rust, заключается в том, чтобы выполнить всю логику чтения сообщения, чтения полезной нагрузки, обработки полезной нагрузки путем чтения JSON, и в любой момент, если возникнет ошибка, я просто хочу выйти из системы и перейти к следующему сообщению в Кафке.
В моей первой итерации я просто использовал unwrap, чтобы заставить что-то работать, затем переместился, чтобы соответствовать любому месту, куда могла вернуться ошибка, но это стало грязным и почему-то выглядело хуже, чем версия Go. Я пытаюсь найти более чистый подход.
Это ошибка, которую я получаю…
134 | let res: Result<(), Box<dyn Error>> = msg_result.and_then(|msg| {
| ______________________--------------------------___^
| | |
| | expected due to this
135 | | msg.payload().ok_or(Err(Box::new("empty message")))
136 | | .and_then(|payload| {
137 | | serde_json::from_slice(payload)
... |
150 | | Ok(())
151 | | });
| |______________^ expected struct `Box`, found enum `KafkaError`
|
= note: expected enum `Result<_, Box<dyn StdError>>`
found enum `Result<_, KafkaError>`
И мой фрагмент кода
pub fn consume_cms_documents() {
let consumer: BaseConsumer = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.set("enable.auto.commit", "false")
.set("group.id", "experimentdocument")
.set("auto.offset.reset", "earliest")
.set_log_level(RDKafkaLogLevel::Debug)
.create()
.expect("invalid consumer config");
consumer.subscribe(amp;["CmsDocuments"])
.expect("topic subscribe failed");
loop {
for msg_result in consumer.iter() {
let res: Result<(), Box<dyn Error>> = msg_result.and_then(|msg| {
msg.payload().ok_or(Err(Box::new("empty message")))
.and_then(|payload| {
serde_json::from_slice(payload)
.and_then(|e: CmsEvent| {
if e.r#type.eq(EXPERIMENT_DOCUMENT_TYPE) {
serde_json::from_value(e.datasource)
.and_then(|exp_doc| {
println!("Document: {:?}", exp_doc);
// todo: call something here to actually process the message and store it
Ok(())
});
}
Ok(())
});
Ok(())
});
Ok(())
});
match res {
Ok(_) => (),
Err(e) => println!("failed to process kafka message {:?}", e)
}
}
}
}
Комментарии:
1. Вы пробовали обернуть код анализа полезной нагрузки вашего метода в его собственный метод с типом возвращаемого
Result<(), Box<dyn std::error::Error>
значения и использовать ? оператор для передачи ошибок, которые вы можете сопоставить со своимиmatch res
?2. Возможно, что-то вроде этого (не проверено): pastebin.com/3LsAd2gh
3. Я предлагаю рассмотреть
anyhow
возможность унификации типов ошибок.4. О, и
map_for
чтобы упростить ваш вложенныйand_then
каскад (отказ от ответственности: я написалmap_for
).5. Это самый читаемый код, который я видел за последнее время. Функциональный стиль иногда упрощает вещи, но в других случаях он делает их ужасающими. Почему вы просто не использовали
?
для раннего распространения ошибок ?