#python #apache-kafka
#python #apache-kafka
Вопрос:
Этот код:
from confluent_kafka import Consumer, KafkaError
settings = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'mygroup',
'client.id': 'client-1',
'enable.auto.commit': True,
'session.timeout.ms': 6000,
'default.topic.config': {'auto.offset.reset': 'smallest'}
}
c = Consumer(settings)
c.subscribe(['mytopic'])
try:
while True:
msg = c.poll(0.1)
if msg is None:
continue
elif not msg.error():
print('Received message: {0}'.format(msg.value()))
elif msg.error().code() == KafkaError._PARTITION_EOF:
print('End of partition reached {0}/{1}'
.format(msg.topic(), msg.partition()))
else:
print('Error occured: {0}'.format(msg.error().str()))
except KeyboardInterrupt:
pass
finally:
c.close()
взято изhttps://www.confluent.io/blog/introduction-to-apache-kafka-for-python-programmers
Я пытаюсь обновить этот код, чтобы тема опрашивалась каждую секунду, но обрабатывалась статистика для всех сообщений в течение 1 минуты.
Вот как я планирую решить:
заменить msg = c.poll(0.1)
на msg = c.poll(1)
введите новую переменную i
, которая будет поддерживать текущее количество сообщений за данную минуту.
Создайте новый класс SharedQueue для хранения данных, подлежащих обработке:
class SharedQueue:
data_queue = deque(maxlen=1000000)
def append_data_queue(self, msg):
self.data_queue.append(msg)
def get_data_queue(self, record_key, record_value, timestamp):
return self.append_data_queue
С изменениями код становится:
from confluent_kafka import Consumer, KafkaError
settings = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'mygroup',
'client.id': 'client-1',
'enable.auto.commit': True,
'session.timeout.ms': 6000,
'default.topic.config': {'auto.offset.reset': 'smallest'}
}
sq = SharedQueue()
c = Consumer(settings)
c.subscribe(['mytopic'])
try:
i = 0
while True:
i = i 1
msg = c.poll(1)
sq.append_data_queue(msg)
if msg is None:
continue
elif not msg.error():
print('Received message: {0}'.format(msg.value()))
elif msg.error().code() == KafkaError._PARTITION_EOF:
print('End of partition reached {0}/{1}'
.format(msg.topic(), msg.partition()))
else:
print('Error occured: {0}'.format(msg.error().str()))
if i == 60:
//process the last 60 items of the queue.
i = 0
except KeyboardInterrupt:
pass
Но это не очень хорошее решение, так как poll
может вернуться немедленно, если есть доступные записи.
Как я могу реализовать обработку сообщений, полученных во временном окне? Я на правильном пути, внедряя очередь?
Комментарии:
1. 1) 60 сообщений не обязательно будут находиться в любом таком «временном окне», поскольку вы не можете гарантировать скорость потребления или производства. 2) Вы, конечно, хотите остановить цикл, когда
i == 60
и возобновить, когда очередь пуста?2. Если вам нужна такая оконная семантика, PySpark или Flink могут обеспечить больший контроль над такими интерфейсами