Kafka producer в другом процессе, похоже, не работает в python

#python #multithreading #apache-kafka #multiprocessing #microservices

#python #многопоточность #apache-kafka #многопроцессорность #микросервисы

Вопрос:

Я запускаю микросервис на python, который реализует несколько обработчиков для каждого вида сообщений.

 class MsFeatureLandmark(BaseMicroservice):

    def __init__(self):
        self.config = safe_load(open(sys.argv[1]))
        client = MongoClient(self.config.get('mongo').get('connection_url'))
        database = client[self.config.get('mongo').get('mongo_db')]
        self.dict = {
            MessageType.create_model.name: ComputeLocDescHandler(self.config),
            MessageType.detect_all.name: ExtractFeatureHandler(self.config, database),
            MessageType.detect_landmark.name: ExtractFeatureHandler(self.config, database)

        }
        super().__init__(self.dict, self.config.get('kafka'))

    def on_message_received(self, generic_message):
        # self.dict.get(generic_message.metadata_type).handle(generic_message.message)
        p = Process(target=self.dict.get(generic_message.metadata_type).handle, args=(generic_message.message,))
        p.daemon = True
        p.start()


MsFeatureLandmark().run()
  

При каждом сообщении, которое я получаю, я запускаю соответствующий метод .handle() .

В конце вычисления (которое включает tensorflow) Я использую методы, унаследованные от суперкласса, для отправки сообщения с использованием kafka

 def write_message(self, message):
    if(self.is_prod_init):
        output_topic = self.config.get('kafka').get('output_topic')
        cl.logging.info("Sending on "   output_topic   " message: "   str(message))
        self.producer.send(output_topic, message)
    else:
        raise ValueError('Producer is not initialized.')

def init_producer(self):
    self.producer = KafkaProducer(bootstrap_servers=self.config.get('kafka').get('bootstrap_servers'),
                                  value_serializer=lambda m: json.dumps(m).encode('utf-8'))
    self.is_prod_init = True
    self.producer.flush()
  

Когда я запускаю весь этот код синхронизированным способом (без использования Process (target=self. и т.д. ) ), все работает правильно, но у производителя наступает тайм-аут, поскольку обработка данных занимает слишком много времени.

Если я запускаю его с помощью процесса, я не получаю сообщений об ошибках, но производитель, похоже, не выдает никаких сообщений по какой-либо причине.

Чего мне не хватает?

РЕДАКТИРОВАТЬ: По какой-то причине при последнем запуске этого микросервиса возникло исключение, и потребитель (который запускается в другом процессе) успешно прочитал сообщение. Это заставило меня задуматься: если я создам исключение, будет ли отправлено сообщение? ДА.

     self.write_message(message)
    raise Exception('spakkiggnustel')
  

добавление этой последней строки «решило» проблему. Почему? Я еще больше запутался, чем был раньше