#python #multithreading #apache-kafka #multiprocessing #microservices
#python #многопоточность #apache-kafka #многопроцессорность #микросервисы
Вопрос:
Я запускаю микросервис на python, который реализует несколько обработчиков для каждого вида сообщений.
class MsFeatureLandmark(BaseMicroservice):
def __init__(self):
self.config = safe_load(open(sys.argv[1]))
client = MongoClient(self.config.get('mongo').get('connection_url'))
database = client[self.config.get('mongo').get('mongo_db')]
self.dict = {
MessageType.create_model.name: ComputeLocDescHandler(self.config),
MessageType.detect_all.name: ExtractFeatureHandler(self.config, database),
MessageType.detect_landmark.name: ExtractFeatureHandler(self.config, database)
}
super().__init__(self.dict, self.config.get('kafka'))
def on_message_received(self, generic_message):
# self.dict.get(generic_message.metadata_type).handle(generic_message.message)
p = Process(target=self.dict.get(generic_message.metadata_type).handle, args=(generic_message.message,))
p.daemon = True
p.start()
MsFeatureLandmark().run()
При каждом сообщении, которое я получаю, я запускаю соответствующий метод .handle() .
В конце вычисления (которое включает tensorflow) Я использую методы, унаследованные от суперкласса, для отправки сообщения с использованием kafka
def write_message(self, message):
if(self.is_prod_init):
output_topic = self.config.get('kafka').get('output_topic')
cl.logging.info("Sending on " output_topic " message: " str(message))
self.producer.send(output_topic, message)
else:
raise ValueError('Producer is not initialized.')
def init_producer(self):
self.producer = KafkaProducer(bootstrap_servers=self.config.get('kafka').get('bootstrap_servers'),
value_serializer=lambda m: json.dumps(m).encode('utf-8'))
self.is_prod_init = True
self.producer.flush()
Когда я запускаю весь этот код синхронизированным способом (без использования Process (target=self. и т.д. ) ), все работает правильно, но у производителя наступает тайм-аут, поскольку обработка данных занимает слишком много времени.
Если я запускаю его с помощью процесса, я не получаю сообщений об ошибках, но производитель, похоже, не выдает никаких сообщений по какой-либо причине.
Чего мне не хватает?
РЕДАКТИРОВАТЬ: По какой-то причине при последнем запуске этого микросервиса возникло исключение, и потребитель (который запускается в другом процессе) успешно прочитал сообщение. Это заставило меня задуматься: если я создам исключение, будет ли отправлено сообщение? ДА.
self.write_message(message)
raise Exception('spakkiggnustel')
добавление этой последней строки «решило» проблему. Почему? Я еще больше запутался, чем был раньше