#python #concurrent.futures
Вопрос:
У меня есть этот код:
done, not_done = concurrent.futures.wait([reader, writer], return_when=concurrent.futures.FIRST_EXCEPTION)
self.logger.info("DONE: NOT DONE:", done=done, not_done=not_done)
где читатель и писатель-это соединения с ресурсом сообщений.
Я хочу, чтобы это вызывало ошибку, когда done
возникает ошибка. Как мне это сделать?
Но это то, что происходит, когда возникает ошибка:
{"done": "{<Future at 0x1031c6c10 state=finished raised KafkaException>}", "not_done": "{<Future at 0x1031c6810 state=running>}", "event": "DONE: NOT DONE:", "logger": "de_core.operators.archive", "level": "info", "timestamp": "2021-09-24T17:07:38.642078Z"}
{"reader": "<Future at 0x1031c6c10 state=finished raised KafkaException>", "writer": "<Future at 0x1031c6810 state=running>", "event": "KafkaArchiver status", "logger": "de_core.operators.archive", "level": "info", "timestamp": "2021-09-24T17:07:38.642356Z"}
Traceback (most recent call last):
File "/Users/JeffreyWan/.pyenv/versions/de_etl/bin/de-etl", line 33, in <module>
...
raise KafkaException
cimpl.KafkaException
{"qsize": 0, "reader": "<Future at 0x1031c6c10 state=finished raised KafkaException>", "writer": "<Future at 0x1031c6810 state=running>", "event": "KafkaArchiver status", "logger": "de_core.operators.archive", "level": "info", "timestamp": "2021-09-24T17:08:37.643285Z"}
{"qsize": 0, "reader": "<Future at 0x1031c6c10 state=finished raised KafkaException>", "writer": "<Future at 0x1031c6810 state=running>", "event": "KafkaArchiver status", "logger": "de_core.operators.archive", "level": "info", "timestamp": "2021-09-24T17:09:37.649044Z"}
{"qsize": 0, "reader": "<Future at 0x1031c6c10 state=finished raised KafkaException>", "writer": "<Future at 0x1031c6810 state=running>", "event": "KafkaArchiver status", "logger": "de_core.operators.archive", "level": "info", "timestamp": "2021-09-24T17:10:37.652676Z"}
Процесс остается живым, но я не хочу, чтобы он оставался живым… как вы можете видеть, регистратор продолжает работать, даже если считыватель закончен. Что я делаю не так?
Комментарии:
1. Вы должны отменить отложенные фьючерсы (
not_done
те, которые):for f in not_done: f.cancel()