#python #apache-kafka #python-multiprocessing #kafka-python
Вопрос:
Кафка настроен локально:
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
и создан пример тестовой темы для хранения данных:
bin/kafka-topics.sh --create --topic fortest --bootstrap-server localh
ost:9092 --replication-factor 1 --partitions 1
Пример сценария создается для отправки примеров данных, а затем для их чтения из того же раздела теста
import time
from kafka import KafkaProducer, KafkaConsumer
import multiprocessing
TOPIC = 'fortest'
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
consumer = KafkaConsumer(
TOPIC,
bootstrap_servers=['localhost:9092'],
auto_offset_reset='latest',
group_id='my-consumer-1'
)
def store_message():
for _ in range(100):
msg = b'message'
producer.send(topic=TOPIC, value=msg)
print(f'{msg} sent by Producer')
time.sleep(3)
def get_processed_message():
while True:
messages = consumer.poll(timeout_ms=5000)
if not messages:
print('wait for messsages')
time.sleep(5)
else:
print(f"Get messages: {messages.values()}")
Это работает последовательно, как:
if __name__ == '__main__':
store_message()
get_processed_message()
Но вопрос в том, можно ли запускать обе функции одновременно, когда производитель постоянно отправляет, а потребитель постоянно читает сообщения с использованием одной и той же темы в одно и то же время? Попытался сделать это с помощью многопроцессорной обработки:
if __name__ == '__main__':
produce_initial_message = multiprocessing.Process(target=store_message)
consume_processed_message = multiprocessing.Process(target=get_processed_message)
produce_initial_message.start()
consume_processed_message.start()
но работает только отправка, consumer.poll()
в этом случае никогда не возвращает никакого значения и продолжает «ждать» сообщений. То же самое, если переместить инициализацию и логику потребителя в другой .py
сценарий и запустить их одновременно в разных терминалах
Как это нужно настроить, чтобы работать таким образом?(Или для этого требуется более сложная логика/дополнительные агенты, помимо потребителя и производителя, для обработки?)
Комментарии:
1. Попробуйте позвонить
producer.flush()
за пределы цикла «Для». Кроме того, вам действительно не нужно, чтобы он спал 3 секунды между каждым сообщением2. @OneCricketeer спасибо, что указал, действительно, этого не хватало. задержка в 3 секунды добавляется только для лучшей визуализации рабочего процесса
Ответ №1:
Решается с помощью:
- Изменение
store_message()
метода для использования бесконечного цикла, а также с использованиемproducer.flush()
после каждого отправленного сообщения
def store_message():
while True:
msg = b'message'
producer.send(topic=TOPIC, value=msg)
print(f'{msg} sent by Producer')
producer.flush()
time.sleep(3)
- Используйте потоковую обработку для одновременного выполнения вместо многопроцессорной обработки:
if __name__ == '__main__':
t_producer = threading.Thread(target=store_message)
t_consumer = threading.Thread(target=get_processed_message)
t_producer.setDaemon(True)
t_consumer.setDaemon(True)
t_producer.start()
t_consumer.start()
while True:
pass
Теперь все идет так, как планировалось, спасибо.
Комментарии:
1. Кстати, я бы посоветовал смывать реже, чем каждое сообщение