Параллельные фьючерсы Python не возвращаются, даже если есть исключение. параллельные.фьючерсы.FIRST_EXCEPTION не убивает будущее

#python #concurrent.futures

Вопрос:

У меня есть этот код:

 done, not_done = concurrent.futures.wait([reader, writer], return_when=concurrent.futures.FIRST_EXCEPTION)
self.logger.info("DONE: NOT DONE:", done=done, not_done=not_done)
 

где читатель и писатель-это соединения с ресурсом сообщений.

Я хочу, чтобы это вызывало ошибку, когда done возникает ошибка. Как мне это сделать?

Но это то, что происходит, когда возникает ошибка:

 {"done": "{<Future at 0x1031c6c10 state=finished raised KafkaException>}", "not_done": "{<Future at 0x1031c6810 state=running>}", "event": "DONE: NOT DONE:", "logger": "de_core.operators.archive", "level": "info", "timestamp": "2021-09-24T17:07:38.642078Z"}
{"reader": "<Future at 0x1031c6c10 state=finished raised KafkaException>", "writer": "<Future at 0x1031c6810 state=running>", "event": "KafkaArchiver status", "logger": "de_core.operators.archive", "level": "info", "timestamp": "2021-09-24T17:07:38.642356Z"}
Traceback (most recent call last):
  File "/Users/JeffreyWan/.pyenv/versions/de_etl/bin/de-etl", line 33, in <module>
    ...
    raise KafkaException
cimpl.KafkaException
{"qsize": 0, "reader": "<Future at 0x1031c6c10 state=finished raised KafkaException>", "writer": "<Future at 0x1031c6810 state=running>", "event": "KafkaArchiver status", "logger": "de_core.operators.archive", "level": "info", "timestamp": "2021-09-24T17:08:37.643285Z"}
{"qsize": 0, "reader": "<Future at 0x1031c6c10 state=finished raised KafkaException>", "writer": "<Future at 0x1031c6810 state=running>", "event": "KafkaArchiver status", "logger": "de_core.operators.archive", "level": "info", "timestamp": "2021-09-24T17:09:37.649044Z"}
{"qsize": 0, "reader": "<Future at 0x1031c6c10 state=finished raised KafkaException>", "writer": "<Future at 0x1031c6810 state=running>", "event": "KafkaArchiver status", "logger": "de_core.operators.archive", "level": "info", "timestamp": "2021-09-24T17:10:37.652676Z"}
 

Процесс остается живым, но я не хочу, чтобы он оставался живым… как вы можете видеть, регистратор продолжает работать, даже если считыватель закончен. Что я делаю не так?

Комментарии:

1. Вы должны отменить отложенные фьючерсы ( not_done те, которые): for f in not_done: f.cancel()