Как искать сообщения в кафке для заданного времени ожидания

#python #apache-kafka #confluent-kafka-python

Вопрос:

У меня есть такой тестовый сценарий:

  1. вставьте некоторые записи в БД
  2. Подождите, пока эти записи будут обработаны Кафкой
  3. Проверьте, существуют ли эти записи в теме Кафки

Шаг 2 иногда занимает 5 секунд, а иногда 2 минуты.
1-й шаг возвращает некоторые идентификаторы, которые я хочу найти на шаге 2 (тема Кафки). Чего я хочу добиться-это, чтобы проверить, если эти документы(с 1-го шага) обрабатываются в Кафка в данный тайм-аут (например, 20 секунд), и двигаться вперед с другими мерами, например:
извлечь все сообщения из Кафки и чек, если эти документы существуют, если не на другой электронный.g. 20 сек. извлеките эти сообщения и проверьте, есть ли идентификаторы во вновь полученных сообщениях. Если все идентификаторы найдены, то верните список этих сообщений (до истечения тайм — аута) или после истечения тайм-аута-верните список уже полученных сообщений.

Надеюсь, я хорошо объяснил это, потому что это может быть немного сложно, и мне интересно, есть ли такая возможность сделать это.

До сих пор я выяснял только такое.

     def fetch_all_kafka_records_from_topic(self, items_to_verify: List, timeout: int = 30) -> List:
        counter = 0
        result = []
        while counter <= timeout:
            res = self._get_all_kafka_KM_values() # at the very bottom  it's super(AvroConsumer,self).poll(timeout) from confluent_kafka
            result.extend(res)
            if all(item in res for item in items_to_verify):
                break
            time.sleep(1)
            counter  = 1
        return result
 

Есть ли какой-нибудь предпочтительный способ сделать это? Может быть, что-то с нитками, декораторами? Я просто не знаю, как проверить, существует ли что — то в списке — если да, то верните что-то, если нет-попробуйте еще раз и так далее… пока не будет достигнут тайм-аут.

Комментарии:

1. Вам действительно следует рассмотреть возможность использования ksql или KTable для поиска ключей, а не простого потребителя