кафка-python: создание и использование сообщений из одной и той же темы одновременно путем запуска параллельных процессов/сценариев

#python #apache-kafka #python-multiprocessing #kafka-python

Вопрос:

Кафка настроен локально:

 bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
 

и создан пример тестовой темы для хранения данных:

 bin/kafka-topics.sh --create --topic fortest --bootstrap-server localh
ost:9092 --replication-factor 1 --partitions 1
 

Пример сценария создается для отправки примеров данных, а затем для их чтения из того же раздела теста

 import time
from kafka import KafkaProducer, KafkaConsumer
import multiprocessing

TOPIC = 'fortest'
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

consumer = KafkaConsumer(
    TOPIC,
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='latest',
    group_id='my-consumer-1'
    )

def store_message():
    for _ in range(100):
        msg = b'message'
        producer.send(topic=TOPIC, value=msg)
        print(f'{msg} sent by Producer')
        time.sleep(3)

def get_processed_message():
    while True:
        messages = consumer.poll(timeout_ms=5000)

        if not messages:
             print('wait for messsages')
             time.sleep(5)
        else:
            print(f"Get messages: {messages.values()}")
 

Это работает последовательно, как:

 if __name__ == '__main__':
    store_message()
    get_processed_message()
 

Но вопрос в том, можно ли запускать обе функции одновременно, когда производитель постоянно отправляет, а потребитель постоянно читает сообщения с использованием одной и той же темы в одно и то же время? Попытался сделать это с помощью многопроцессорной обработки:

 if __name__ == '__main__':
    produce_initial_message = multiprocessing.Process(target=store_message)
    consume_processed_message = multiprocessing.Process(target=get_processed_message)
    produce_initial_message.start()
    consume_processed_message.start()
 

но работает только отправка, consumer.poll() в этом случае никогда не возвращает никакого значения и продолжает «ждать» сообщений. То же самое, если переместить инициализацию и логику потребителя в другой .py сценарий и запустить их одновременно в разных терминалах
Как это нужно настроить, чтобы работать таким образом?(Или для этого требуется более сложная логика/дополнительные агенты, помимо потребителя и производителя, для обработки?)

Комментарии:

1. Попробуйте позвонить producer.flush() за пределы цикла «Для». Кроме того, вам действительно не нужно, чтобы он спал 3 секунды между каждым сообщением

2. @OneCricketeer спасибо, что указал, действительно, этого не хватало. задержка в 3 секунды добавляется только для лучшей визуализации рабочего процесса

Ответ №1:

Решается с помощью:

  1. Изменение store_message() метода для использования бесконечного цикла, а также с использованием producer.flush() после каждого отправленного сообщения
 def store_message():
    while True:
        msg = b'message'
        producer.send(topic=TOPIC, value=msg)
        print(f'{msg} sent by Producer')
        producer.flush()
        time.sleep(3)
 
  1. Используйте потоковую обработку для одновременного выполнения вместо многопроцессорной обработки:
 if __name__ == '__main__':
    t_producer = threading.Thread(target=store_message)
    t_consumer = threading.Thread(target=get_processed_message)
    t_producer.setDaemon(True)
    t_consumer.setDaemon(True)
    t_producer.start()
    t_consumer.start()
    while True:
        pass
 

Теперь все идет так, как планировалось, спасибо.

Комментарии:

1. Кстати, я бы посоветовал смывать реже, чем каждое сообщение