#apache-kafka #schema #apache-nifi #avro #confluent-schema-registry
#apache-kafka #схема #apache-nifi #avro #confluent-schema-registry
Вопрос:
Я использую Kafka для десериализации сообщений Avro. Для этого программа должна извлечь соответствующую схему из реестра схем. Потоковое приложение реализовано как процессор Nifi, который работает сам по себе. Проблема в том, что после каждого потока запрашивается новая схема. Похоже, что ни одна схема не кэшируется.
Эта часть журнала здесь является проблемой:
2019-04-16 22:08:51,333 INFO [Timer-Driven Process Thread-2] i.c.k.s.KafkaAvroDeserializerConfig KafkaAvroDeserializerConfig values:
schema.registry.url = [http://localhost:8081]
max.schemas.per.subject = 1000
specific.avro.reader = false
При вызове CachedSchemaRegistryClient можно подумать, что схемы кэшируются автоматически?
private SchemaRegistryClient schemaRegistryClient;
this.schemaRegistryClient = new CachedSchemaRegistryClient(schemaUrl, 1000);
Поэтому, когда требуется схема, вызов со следующим
return schemaRegistryClientProvider.getSchemaRegistryClient().getByID(avroSchemaId);
Предоставляет его. Но каждый раз мы делаем новый запрос GET.
В противном случае процессор работает по назначению. Просто все эти дополнительные вызовы Schemaregistry со временем становятся огромным бременем. Любые предложения будут оценены
Редактировать:
Кэш работает так, как задумано. Это была просто проблема инициализации, из-за которой он повторно загружал URL-адрес схемы при каждом OnTrigger() .
Комментарии:
1. Вы написали этот процессор? NiFi имеет собственную реализацию клиента реестра Schema.
2. Нет, я этого не делал. Но мне нужно его отладить. Либо реализация не включает кеш, либо я использую его неправильно / вообще не использую.
3. В прошлый раз, когда я просматривал код Nifi, он регулярно опрашивает все имена объектов. Я не уверен в функции get by id
Ответ №1:
Код для извлечения схемы не кажется правильным.
Интерфейс SchemaRegistryClient имеет только два метода:
RecordSchema getSchema(String schemaName) throws IOException, SchemaNotFoundException;
RecordSchema getSchema(int schemaId) throws IOException, SchemaNotFoundException;
Я не уверен, откуда берется метод GetById, но я думаю, что вы каким-то образом обходите кэширование, используя другой путь кода.