Обработка сообщений во временном окне с использованием Kafka

#python #apache-kafka

#python #apache-kafka

Вопрос:

Этот код:

 from confluent_kafka import Consumer, KafkaError

settings = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'mygroup',
    'client.id': 'client-1',
    'enable.auto.commit': True,
    'session.timeout.ms': 6000,
    'default.topic.config': {'auto.offset.reset': 'smallest'}
}

c = Consumer(settings)

c.subscribe(['mytopic'])

try:
    while True:
        msg = c.poll(0.1)
        if msg is None:
            continue
        elif not msg.error():
            print('Received message: {0}'.format(msg.value()))
        elif msg.error().code() == KafkaError._PARTITION_EOF:
            print('End of partition reached {0}/{1}'
                  .format(msg.topic(), msg.partition()))
        else:
            print('Error occured: {0}'.format(msg.error().str()))

except KeyboardInterrupt:
    pass

finally:
    c.close()
  

взято изhttps://www.confluent.io/blog/introduction-to-apache-kafka-for-python-programmers

Я пытаюсь обновить этот код, чтобы тема опрашивалась каждую секунду, но обрабатывалась статистика для всех сообщений в течение 1 минуты.

Вот как я планирую решить:

заменить msg = c.poll(0.1) на msg = c.poll(1)

введите новую переменную i , которая будет поддерживать текущее количество сообщений за данную минуту.

Создайте новый класс SharedQueue для хранения данных, подлежащих обработке:

 class SharedQueue:

    data_queue = deque(maxlen=1000000)

    def append_data_queue(self, msg):
        self.data_queue.append(msg)

    def get_data_queue(self, record_key, record_value, timestamp):
        return self.append_data_queue
  

С изменениями код становится:

 from confluent_kafka import Consumer, KafkaError

settings = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'mygroup',
    'client.id': 'client-1',
    'enable.auto.commit': True,
    'session.timeout.ms': 6000,
    'default.topic.config': {'auto.offset.reset': 'smallest'}
}

sq = SharedQueue()

c = Consumer(settings)

c.subscribe(['mytopic'])

try:
    i = 0
    while True:
        i = i   1
        msg = c.poll(1)
        sq.append_data_queue(msg)
        if msg is None:
            continue
        elif not msg.error():
            print('Received message: {0}'.format(msg.value()))
        elif msg.error().code() == KafkaError._PARTITION_EOF:
            print('End of partition reached {0}/{1}'
                  .format(msg.topic(), msg.partition()))
        else:
            print('Error occured: {0}'.format(msg.error().str()))
        if i == 60: 
            //process the last 60 items of the queue.
            i = 0


except KeyboardInterrupt:
    pass
  

Но это не очень хорошее решение, так как poll может вернуться немедленно, если есть доступные записи.

Как я могу реализовать обработку сообщений, полученных во временном окне? Я на правильном пути, внедряя очередь?

Комментарии:

1. 1) 60 сообщений не обязательно будут находиться в любом таком «временном окне», поскольку вы не можете гарантировать скорость потребления или производства. 2) Вы, конечно, хотите остановить цикл, когда i == 60 и возобновить, когда очередь пуста?

2. Если вам нужна такая оконная семантика, PySpark или Flink могут обеспечить больший контроль над такими интерфейсами