Ktable в KGroupTable — схема недоступна (схема журнала изменений хранилища состояний не зарегистрирована)

#apache-kafka #apache-kafka-streams

#apache-kafka #apache-kafka-streams

Вопрос:

У меня есть тема Kafka — давайте activity-daily-aggregate, и я хочу выполнить агрегирование (добавление / дополнение) с помощью KGroupTable. Итак, я прочитал тему, используя

 final KTable<String, GenericRecord> inputKTable =
      builder.table("activity-daily-aggregate",Consumed.with(new StringSerde(), getConsumerSerde());

Note: getConsumerSerde - returns >> new GenericAvroSerde(mockSchemaRegistryClient)
 
 2.Next Step,
 
 inputKTable.groupBy(
        (key,value)->KeyValue.pair(KeyMapper.generateGroupKey(value), new JsonValueMapper().apply(value)),
        Grouped.with(AppSerdes.String(), AppSerdes.jsonNode())
        );
 

Перед шагами 1 и 2 я настроил MockSchemaRegistryClient с

  mockSchemaRegistryClient.register("activity-daily-aggregate-key",
                    Schema.parse(AppUtils.class.getResourceAsStream("/avro/key.avsc")));
       mockSchemaRegistryClient.register("activity-daily-aggregate-value",
                    Schema.parse(AppUtils.class.getResourceAsStream("/avro/daily-activity-aggregate.avsc")))
 

Пока я запускаю тестовые примеры с использованием топологии, я получаю сообщение об ошибке на шаге 2.

org.apache.kafka.streams.errors.Исключение StreamsException: исключение, обнаруженное в процессе. TaskId=0_0, processor= KSTREAM-SOURCE-0000000011, topic= activity-daily-aggregate, раздел = 0, смещение = 0, stacktrace =org.apache.kafka.common.errors.Исключение SerializationException: ошибка при получении схемы Avro: {«тип»: «запись», «имя»:»Фактическость», «пространство имен»: «com.ascendlearning.avro», «поля»: …..}

Вызвано: io.confluent.kafka.schemaregistry.client.rest.exceptions.Исключение RestClientException: схема не найдена; код ошибки: 404001

Ошибка исчезает, когда я регистрирую схему с помощью mockSchemaRegistryClient,

stream-app-id-activity-daily-aggregate-STATE-STORE-0000000010-changelog-key

stream-app-id-activity-daily-aggregate-STATE-STORE-0000000010-changelog-value => /avro/daily-activity-aggregate.avsc

Нужно ли нам делать этот шаг? Я думал, что это может быть обработано автоматически топологией

Комментарии:

1. Я не знаю, почему это требуется, но чтобы избежать жесткого кодирования, я назвал процессор и хранилище состояний в определении DSL. ` final KTable<Строка, GenericRecord> learnerActivityDailyAggregateKTable = builder .table(«activity-daily-aggregate», потребляется.с помощью(new StringSerde(),new GenericAvroSerde(analyticsSchemaRegistryClient)) .withName(«ACTIVITY_TOPIC_READER»),Materialized.as («ACTIVITY_TOPIC_READER_STATE_STORE»)); `

2. Таким образом, раздел реестра схемы изменяется на stream-app-id-ACTIVITY_TOPIC_READER_STATE_STORE-key и stream-app-id-ACTIVITY_TOPIC_READER_STATE_STORE-value

Ответ №1:

Из блога, https://blog.jdriven.com/2019/12/kafka-streams-topologytestdriver-with-avro /

Когда вы настраиваете один и тот же макет:// URL в обоих свойствах, переданных в TopologyTestDriver, а также для экземпляров (де) сериализатора, переданных в createInputTopic и createOutputTopic , все (де) сериализаторы будут использовать один и тот же MockSchemaRegistryClient с одним хранилищем схем в памяти.

  // Configure Serdes to use the same mock schema registry URL
    Map<String, String> config = Map.of(
        AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, MOCK_SCHEMA_REGISTRY_URL);
    avroUserSerde.configure(config, false);
    avroColorSerde.configure(config, false);

    // Define input and output topics to use in tests
    usersTopic = testDriver.createInputTopic(
        "users-topic",
        stringSerde.serializer(),
        avroUserSerde.serializer());
    colorsTopic = testDriver.createOutputTopic(
        "colors-topic",
        stringSerde.deserializer(),
        avroColorSerde.deserializer());
 

Я не передавал макет URL-адреса клиентской схемы реестра в serdes, переданном в раздел ввода / вывода.