#apache-kafka #apache-kafka-streams
#apache-kafka #apache-kafka-streams
Вопрос:
У меня есть тема Kafka — давайте activity-daily-aggregate, и я хочу выполнить агрегирование (добавление / дополнение) с помощью KGroupTable. Итак, я прочитал тему, используя
final KTable<String, GenericRecord> inputKTable =
builder.table("activity-daily-aggregate",Consumed.with(new StringSerde(), getConsumerSerde());
Note: getConsumerSerde - returns >> new GenericAvroSerde(mockSchemaRegistryClient)
2.Next Step,
inputKTable.groupBy(
(key,value)->KeyValue.pair(KeyMapper.generateGroupKey(value), new JsonValueMapper().apply(value)),
Grouped.with(AppSerdes.String(), AppSerdes.jsonNode())
);
Перед шагами 1 и 2 я настроил MockSchemaRegistryClient с
mockSchemaRegistryClient.register("activity-daily-aggregate-key",
Schema.parse(AppUtils.class.getResourceAsStream("/avro/key.avsc")));
mockSchemaRegistryClient.register("activity-daily-aggregate-value",
Schema.parse(AppUtils.class.getResourceAsStream("/avro/daily-activity-aggregate.avsc")))
Пока я запускаю тестовые примеры с использованием топологии, я получаю сообщение об ошибке на шаге 2.
org.apache.kafka.streams.errors.Исключение StreamsException: исключение, обнаруженное в процессе. TaskId=0_0, processor= KSTREAM-SOURCE-0000000011, topic= activity-daily-aggregate, раздел = 0, смещение = 0, stacktrace =org.apache.kafka.common.errors.Исключение SerializationException: ошибка при получении схемы Avro: {«тип»: «запись», «имя»:»Фактическость», «пространство имен»: «com.ascendlearning.avro», «поля»: …..}
Вызвано: io.confluent.kafka.schemaregistry.client.rest.exceptions.Исключение RestClientException: схема не найдена; код ошибки: 404001
Ошибка исчезает, когда я регистрирую схему с помощью mockSchemaRegistryClient,
stream-app-id-activity-daily-aggregate-STATE-STORE-0000000010-changelog-key
stream-app-id-activity-daily-aggregate-STATE-STORE-0000000010-changelog-value => /avro/daily-activity-aggregate.avsc
Нужно ли нам делать этот шаг? Я думал, что это может быть обработано автоматически топологией
Комментарии:
1. Я не знаю, почему это требуется, но чтобы избежать жесткого кодирования, я назвал процессор и хранилище состояний в определении DSL. ` final KTable<Строка, GenericRecord> learnerActivityDailyAggregateKTable = builder .table(«activity-daily-aggregate», потребляется.с помощью(new StringSerde(),new GenericAvroSerde(analyticsSchemaRegistryClient)) .withName(«ACTIVITY_TOPIC_READER»),Materialized.as («ACTIVITY_TOPIC_READER_STATE_STORE»)); `
2. Таким образом, раздел реестра схемы изменяется на stream-app-id-ACTIVITY_TOPIC_READER_STATE_STORE-key и stream-app-id-ACTIVITY_TOPIC_READER_STATE_STORE-value
Ответ №1:
Из блога, https://blog.jdriven.com/2019/12/kafka-streams-topologytestdriver-with-avro /
Когда вы настраиваете один и тот же макет:// URL в обоих свойствах, переданных в TopologyTestDriver, а также для экземпляров (де) сериализатора, переданных в createInputTopic и createOutputTopic , все (де) сериализаторы будут использовать один и тот же MockSchemaRegistryClient с одним хранилищем схем в памяти.
// Configure Serdes to use the same mock schema registry URL
Map<String, String> config = Map.of(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, MOCK_SCHEMA_REGISTRY_URL);
avroUserSerde.configure(config, false);
avroColorSerde.configure(config, false);
// Define input and output topics to use in tests
usersTopic = testDriver.createInputTopic(
"users-topic",
stringSerde.serializer(),
avroUserSerde.serializer());
colorsTopic = testDriver.createOutputTopic(
"colors-topic",
stringSerde.deserializer(),
avroColorSerde.deserializer());
Я не передавал макет URL-адреса клиентской схемы реестра в serdes, переданном в раздел ввода / вывода.