Как отправлять и использовать json-сообщения с помощью confluent-kafka в Python

#django #python-3.x #apache-kafka #confluent-platform

#django #python-3.x #apache-kafka #confluent-платформа

Вопрос:

Я довольно новичок в Python и начинаю работать с Kafka. Итак, я настроил брокера Kafka и пытаюсь связаться с ним с помощью confluent-kafka. Я смог создавать и использовать простые сообщения, используя его, однако у меня есть несколько объектов django, которые мне нужно сериализовать и отправить их в kafka.

Ранее я использовал kafka-python, на котором я мог отправлять и использовать сообщения json, однако у меня были некоторые странные проблемы с этим.

#Producer.py

 def send_message(topic,message) :
try :
    try :
        p.produce(topic,message,callback=delivery_callback)
    except BufferError as b :
        sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try againn' %len(p))
    # Serve delivery callback queue.
    # NOTE: Since produce() is an asynchronous API this poll() call
    #       will most likely not serve the delivery callback for the
    #       last produce()d message.
    p.poll(0)
    # Wait until all messages have been delivered
    sys.stderr.write('%% Waiting for %d deliveriesn' % len(p))
    p.flush()
except Exception as e :
    import traceback
    print(traceback.format_exc())
  

#Consumer.py

 conf = {'bootstrap.servers': "localhost:9092", 'group.id': 'test', 'session.timeout.ms': 6000,
        'auto.offset.reset': 'earliest'}
c = Consumer(conf)
c.subscribe(["mykafka"])
try:
    while True:
        msg = c.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            raise KafkaException(msg.error())
        else:
            sys.stderr.write('%% %s [%d] at offset %d with key %s:n' %
                                (msg.topic(), msg.partition(), msg.offset(),
                                str(msg.key())))
            print(msg.value())
except Exception as e:
    import traceback
    print(traceback.format_exc())
finally:
    c.close()
  

Я сериализую свои объекты модели django следующим образом :

 from django.core import serializers
# assuming obj is a model instance
serialized_obj = serializers.serialize('json', [ obj, ])
  

Итак, какие изменения мне нужно внести в моего производителя и потребителя, чтобы создавать и использовать сообщения json?

Ответ №1:

Попробуйте для производителя

 send_message(topic, serialized_obj)
  

И потребитель, вы бы десериализовали байты только в строку

 print(msg.value().decode('utf8'))
  

Если вам нужны объекты json, вы можете использовать json.loads