#python #apache-kafka #confluent-kafka-python
Вопрос:
У меня есть такой тестовый сценарий:
- вставьте некоторые записи в БД
- Подождите, пока эти записи будут обработаны Кафкой
- Проверьте, существуют ли эти записи в теме Кафки
Шаг 2 иногда занимает 5 секунд, а иногда 2 минуты.
1-й шаг возвращает некоторые идентификаторы, которые я хочу найти на шаге 2 (тема Кафки). Чего я хочу добиться-это, чтобы проверить, если эти документы(с 1-го шага) обрабатываются в Кафка в данный тайм-аут (например, 20 секунд), и двигаться вперед с другими мерами, например:
извлечь все сообщения из Кафки и чек, если эти документы существуют, если не на другой электронный.g. 20 сек. извлеките эти сообщения и проверьте, есть ли идентификаторы во вновь полученных сообщениях. Если все идентификаторы найдены, то верните список этих сообщений (до истечения тайм — аута) или после истечения тайм-аута-верните список уже полученных сообщений.
Надеюсь, я хорошо объяснил это, потому что это может быть немного сложно, и мне интересно, есть ли такая возможность сделать это.
До сих пор я выяснял только такое.
def fetch_all_kafka_records_from_topic(self, items_to_verify: List, timeout: int = 30) -> List:
counter = 0
result = []
while counter <= timeout:
res = self._get_all_kafka_KM_values() # at the very bottom it's super(AvroConsumer,self).poll(timeout) from confluent_kafka
result.extend(res)
if all(item in res for item in items_to_verify):
break
time.sleep(1)
counter = 1
return result
Есть ли какой-нибудь предпочтительный способ сделать это? Может быть, что-то с нитками, декораторами? Я просто не знаю, как проверить, существует ли что — то в списке — если да, то верните что-то, если нет-попробуйте еще раз и так далее… пока не будет достигнут тайм-аут.
Комментарии:
1. Вам действительно следует рассмотреть возможность использования ksql или KTable для поиска ключей, а не простого потребителя