Скрученный python для чтения из kafka и записи в elasticsearch

#python #asynchronous #twisted

#python #асинхронный #скрученный

Вопрос:

Я новичок в Twisted, это моя первая программа.

Я не могу найти способ использовать KafkaConsumer из библиотеки kafka-python и использовать treq для запуска post-запроса к elasticsearch.

Я мог бы разложить проблему на мелкие кусочки: создать итератор-потребитель kafka и считывать из него данные (тема может быть огромной)

 def consumeKafka():
    consumer = KafkaConsumer(bootstrap_servers="kafka:9092", auto_offset_reset='earliest')
    consumer.subscribe(['kafkapipeline'])
    for v in consumer:
        v.value
  

опубликовать в elasticsearch с помощью treq

 def post(self):
    d = treq.post('http://es:9200/pro/pr/', self.data)
    d.addCallbacks(lambda x: print(x), lambda x: print("error %s " % x))
  

запустите реактор

 from twisted.internet import reactor
reactor.callWhenRunning(consumeKafka)
reactor.run()
  

Есть идеи, как заставить это работать?

Ответ №1:

Я вообще не использую Kafka, поэтому я не уверен, что это сработает для вас. Кроме того, я предполагаю, что у вас возникли проблемы с одновременным запуском Kafka и treq. Общий способ, которым я работаю с итераторами в Twisted, — это использовать inlineCallbacks для ожидания результата, а затем что-то делать с этим результатом впоследствии.

 from twisted.internet import defer

@defer.inlineCallbacks
def consumeKafka():
    consumer = KafkaConsumer(bootstrap_servers="kafka:9092", auto_offset_reset='earliest')
    consumer.subscribe(['kafkapipeline'])
    for v in consumer:
        value = yield v.value
        # do stuff with value
  

Затем вы можете просто вызвать эту функцию, а реактор позаботится обо всем остальном. Итак, ваш основной раздел будет выглядеть так:

 consumeKafka()
reactor.run()
  

Обратите внимание, что consumeKafka() функция возвращает a Deferred , поэтому вы добавляете обратные вызовы и ошибки по мере необходимости. Как только вы освоитесь с этой моделью, взгляните на Cooperator объекты для большей функциональности.

Комментарии:

1. спасибо за ваш ответ, я все еще пытаюсь заставить его работать, но я думаю, что сначала мне нужно прочитать документацию twisted. Это непросто.