#python #asynchronous #twisted
#python #асинхронный #скрученный
Вопрос:
Я новичок в Twisted, это моя первая программа.
Я не могу найти способ использовать KafkaConsumer из библиотеки kafka-python и использовать treq для запуска post-запроса к elasticsearch.
Я мог бы разложить проблему на мелкие кусочки: создать итератор-потребитель kafka и считывать из него данные (тема может быть огромной)
def consumeKafka():
consumer = KafkaConsumer(bootstrap_servers="kafka:9092", auto_offset_reset='earliest')
consumer.subscribe(['kafkapipeline'])
for v in consumer:
v.value
опубликовать в elasticsearch с помощью treq
def post(self):
d = treq.post('http://es:9200/pro/pr/', self.data)
d.addCallbacks(lambda x: print(x), lambda x: print("error %s " % x))
запустите реактор
from twisted.internet import reactor
reactor.callWhenRunning(consumeKafka)
reactor.run()
Есть идеи, как заставить это работать?
Ответ №1:
Я вообще не использую Kafka, поэтому я не уверен, что это сработает для вас. Кроме того, я предполагаю, что у вас возникли проблемы с одновременным запуском Kafka и treq. Общий способ, которым я работаю с итераторами в Twisted, — это использовать inlineCallbacks
для ожидания результата, а затем что-то делать с этим результатом впоследствии.
from twisted.internet import defer
@defer.inlineCallbacks
def consumeKafka():
consumer = KafkaConsumer(bootstrap_servers="kafka:9092", auto_offset_reset='earliest')
consumer.subscribe(['kafkapipeline'])
for v in consumer:
value = yield v.value
# do stuff with value
Затем вы можете просто вызвать эту функцию, а реактор позаботится обо всем остальном. Итак, ваш основной раздел будет выглядеть так:
consumeKafka()
reactor.run()
Обратите внимание, что consumeKafka()
функция возвращает a Deferred
, поэтому вы добавляете обратные вызовы и ошибки по мере необходимости. Как только вы освоитесь с этой моделью, взгляните на Cooperator
объекты для большей функциональности.
Комментарии:
1. спасибо за ваш ответ, я все еще пытаюсь заставить его работать, но я думаю, что сначала мне нужно прочитать документацию twisted. Это непросто.