#python #django #python-3.x #apache-kafka
#python #django #python-3.x #apache-kafka
Вопрос:
У меня есть приложение Django, которое имеет интеграцию с Kafka для обработки некоторых заказов. Темы в очереди Kafka создаются динамически, поэтому потребители также должны быть подписаны динамически. Теперь, когда я инициализирую потребителя, он блокирует основной поток, поэтому я должен запустить потребителя в фоновом потоке, но я не вижу никаких операторов печати, поэтому я не уверен, инициализирован ли потребитель, также, если это правильный подход для этого?
def kafka_consumer(topic) :
try :
if topic is None :
raise Exception("Topic is none, unable to initialize kafka consumer")
conf = {'bootstrap.servers': "localhost:9092", 'group.id': 'test', 'session.timeout.ms': 6000,
'auto.offset.reset': 'earliest'}
c = Consumer(conf)
print("Subscribing consumer to topic ",topic[0])
c.subscribe(topic)
# Read messages from Kafka, print to stdout
try:
while True:
msg = c.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
else:
sys.stderr.write('%% %s [%d] at offset %d with key %s:n' %
(msg.topic(), msg.partition(), msg.offset(),
str(msg.key())))
try :
print(json.loads(msg.value()))
print("---------------------------------")
objs = serializers.deserialize("json", msg.value())
for obj in objs :
print(obj)
print(obj.object)
except Exception as e :
import traceback
print(traceback.format_exc())
except Exception as e:
import traceback
print(traceback.format_exc())
finally:
c.close()
except Exception as e:
import traceback
print(traceback.format_exc())
Ниже показано, как я вызываю функцию :
try :
topic = []
topic.append(offer.offering_order_id)
background_thread = Thread(target=kafka_consumer, args=(topic))
background_thread.start()
except Exception as e :
import traceback
print(traceback.format_exc())
Может кто-нибудь помочь мне с архитектурой, пожалуйста?
Комментарии:
1. Наличие динамических тем кажется плохой идеей… Какие данные вы отправляете сюда?
2. Примечание: вместо того, чтобы выполнять многопоточность самостоятельно, вы могли бы взглянуть на библиотеку aiokafka
3. Данные в основном представляют собой заказы на покупку, поэтому для каждого вида предложений (читай уникальные адреса) я создаю тему, поэтому заказы из разных тем могут обрабатываться параллельно, как только предложения больше не будут доступны, я планирую удалить тему.
4. Что определяет «живость»? Похоже, вам просто нужна очередь pub-sub, не обязательно постоянный журнал, такой как Kafka. И у вас может быть одна
buy-orders
тема с более чем 100 разделами, например, если вы заботитесь о параллелизме