Производитель Avro отправляет ключ без схемы ключа

#python #apache-kafka #avro #kafka-producer-api

#питон #apache-kafka #avro #kafka-producer-api #python

Вопрос:

Я использую Avro Producer в Python 2.7. Мне нужно отправить сообщение с ключом и значением, значение имеет Avro-Schema в теме, но для ключа нет Avro-Schema (я не могу добавить схему по причинам, связанным с унаследованным ключом).

Это мой код:

 def main():
    kafkaBrokers = os.environ.get('KAFKA_BROKERS')
    schemaRegistry = os.environ.get('SCHEMA_REGISTRY')
    topic = os.environ.get('KAFKA_TOPIC')

    subject = '${}-value'.format(topic)
    sr = CachedSchemaRegistryClient(schemaRegistry)

    schema = sr.get_latest_schema(subject).schema

    value_schema = avro.loads(str(schema))

    url = 'test.com'

    value = {'url': u'test.com', 'priority': 10}

    avroProducer = AvroProducer({
        'bootstrap.servers': kafkaBrokers,
        'schema.registry.url': schemaRegistry
    }, default_value_schema=value_schema)


    key = 1638895406382020875
    
    avroProducer.produce(topic=topic, value=value, key=key)
    avroProducer.flush()
  

Я получаю следующую ошибку:

 raise KeySerializerError("Avro schema required for key")
confluent_kafka.avro.serializer.KeySerializerError: Avro schema required for key
  

Если я удалю ключ из функции создания:

 avroProducer.produce(topic=topic, value=value)
  

Это работает.

Как можно отправить ключ без наличия схемы?

Ответ №1:

Вам нужно будет использовать обычный Producer и самостоятельно выполнять функции сериализации

 from confluent_kafka import avro
from confluent_kafka.avro import CachedSchemaRegistryClient
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer as AvroSerializer

avro_serializer = AvroSerializer(schema_registry)
serialize_avro = avro_serializer.encode_record_with_schema  # extract function definition 

value_schema = avro.load('avro_schemas/value.avsc')  # TODO: Create avro_schemas folder 

p = Producer({'bootstrap.servers': bootstrap_servers})

value_payload = serialize_avro(topic, value_schema, value, is_key=False)
p.produce(topic, key=key, value=value_payload, callback=delivery_report)
  

Комментарии:

1. MessageSerializer будет достаточным для сериализации объекта как avro сериализации?

2. ДА. Я просто переименовал его import MessageSerializer as AvroSerializer

Ответ №2:

AvroProducer предполагается, что оба ключа и значения кодируются в реестре схемы, добавляя магический байт и идентификатор схемы к полезной нагрузке как ключа, так и значения.

Если вы хотите использовать пользовательскую сериализацию для ключа, вы могли бы использовать Producer вместо AvroProducer . Но вы будете нести ответственность за сериализацию ключа (используя любой формат, который вы хотите) и значений (что означает кодирование значения и добавление магического байта и идентификатора схемы). Чтобы узнать, как это делается, вы можете посмотреть на AvroProducer код.

Но это также означает, что вам придется написать свой собственный AvroConsumer и вы не сможете использовать kafka-avro-console-consumer .

Комментарии:

1. параметры десериализатора могут быть переданы потребителю консоли avro для ключей, отличных от Avro