#python #apache-kafka #confluent-platform
Вопрос:
Я использую confluent-кафку, и мне нужно сериализовать свои ключи в виде строк и создать несколько сообщений. У меня есть рабочий код для случая, когда я извлекаю схему из реестра схем и использую ее для создания сообщения. Проблема в том, что это не удается, когда я вместо этого пытаюсь прочитать схему из локального файла.
Приведенный ниже код является рабочим для реестра схем:
import argparse
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import StringSerializer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka import SerializingProducer
import avro.schema
SCHEMA_HOST = '192.168.40.10'
TOPIC = 'my_topic'
SCHEMA = 'path/to/schema.avsc'
# Just parse argumments
parser = argparse.ArgumentParser(description="Avro Kafka Generator")
parser.add_argument('--schema_registry_host', default=SCHEMA_HOST, help="schema registry host")
parser.add_argument('--schema', type=str, default=SCHEMA, help="schema to produce under")
parser.add_argument('--topic', type=str, default=TOPIC, help="topic to publish to")
parser.add_argument('--frequency', type=float, default=1.0, help="number of message per second")
args = parser.parse_args()
# Actual code
schema_registry_conf = {'url': "http://{}:8081".format(SCHEMA_HOST)}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
schema = schema_registry_client.get_latest_version(subject_name=TOPIC "-value")
# schema = schema_registry_client.get_schema(schema.schema_id)
schema = schema_registry_client.get_schema(schema.schema_id)
schema_str = schema.schema_str
pro_conf = {"auto.register.schemas": False}
avro_serializer = AvroSerializer(schema_registry_client=schema_registry_client, schema_str=schema_str, conf=pro_conf)
conf = {'bootstrap.servers': "{}:9095".format(args.schema_registry_host),
'schema.registry.url': "http://{}:8081".format(args.schema_registry_host)}
# avro_producer = AvroProducer(conf, default_value_schema=value_schema)
producer_conf = {'bootstrap.servers': "{}:9095".format(SCHEMA_HOST),
'key.serializer': StringSerializer('utf_8'),
'value.serializer': avro_serializer}
avro_producer = SerializingProducer(producer_conf)
Но когда я пытаюсь использовать вариант для локального файла, это не удается:
# Read schema from local file
value_schema = avro.schema.Parse(open(args.schema, "r").read())
schema_str = open(args.schema, "r").read().replace(' ', '').replace('n', '')
pro_conf = {"auto.register.schemas": True}
avro_serializer = AvroSerializer(schema_registry_client=schema_registry_client, schema_str=schema_str, conf=pro_conf)
эта часть является общей для обеих версий:
producer_conf = {'bootstrap.servers': "{}:9095".format(SCHEMA_HOST),
'key.serializer': StringSerializer('utf_8'),
'value.serializer': avro_serializer}
avro_producer = SerializingProducer(producer_conf)
avro_producer.produce(topic=args.topic, value=message)
Ошибка, которую я получаю, заключается в следующем;
Ошибка Кафки{код=_VALUE_SERIALIZATION,val=-161,str=»Объект’RecordSchema’ не имеет атрибута ‘lookup_schema'»}
Очевидно, что это не лучший подход, и я думаю, что если он сработал, код кажется уродливым и подверженным ошибкам. Но это даже не работает, поэтому мне нужна помощь в том, как я мог бы прочитать локальную схему и использовать AvroSerializer
ее впоследствии.
Комментарии:
1. Ваша ошибка связана со значением, но заголовок вашего вопроса касается ключей (которые вы даже не отправляете)… Что это такое?
2. До сериализации строки ключа я не отправлял никакого ключа, но теперь мне нужно наложить это ограничение на ключ. Я не уверен, как я могу отправить ключ (или если быть честным)
3. Нет, это не обязательно, но
producer.send(topic, key="some string"...)
. Однако это не исправит ошибку, которую вы получаете. Кроме того, я не вижу большой разницы между использованием реестра и файла. Возможно, замена пробелов-не лучшая идея4. И как же мне тогда пройти мимо сообщения об ошибке?