Как игнорировать некоторые виды сообщений в приложении Kafka Streams, которое считывает и записывает разные типы событий из одной и той же темы

#apache-kafka #spring-cloud #avro #apache-kafka-streams #spring-cloud-stream

#apache-kafka #spring-cloud #avro #apache-kafka-streams #spring-cloud-stream

Вопрос:

Предположим, что приложение Spring Cloud Stream создает a KStream из an order topic . Его интересуют OrderCreated {"id":x, "productId": y, "customerId": z} события. Как только один из них поступает, он обрабатывает его и генерирует для него выходное событие OrderShipped {"id":x, "productId": y, "customerName": <, "customerAddress": z} order topic .

Проблема, с которой я сталкиваюсь, заключается в том, что, поскольку он читает и записывает из / в одну и ту же тему, потоковое приложение Kafka пытается обработать свои собственные записи, что не имеет смысла.

Как я могу запретить этому приложению обрабатывать события, которые оно генерирует?

ОБНОВЛЕНИЕ: Как отмечают Артем Билан и собичако, я рассматривал возможность использования KStream.filter() , но есть некоторые детали, которые заставляют меня сомневаться в том, как с этим бороться:

Прямо сейчас приложение KStream выглядит следующим образом:

 interface ShippingKStreamProcessor {
    ...
    @Input("order")
    fun order(): KStream<String, OrderCreated>

    @Output("output")
    fun output(): KStream<String, OrderShipped>
 

Конфигурация KStream

     @StreamListener
    @SendTo("output")
    fun process(..., @Input("order") order: KStream<Int, OrderCreated>): KStream<Int, OrderShipped> {

 

Как привязки заказа, так и привязки вывода указывают на тему заказа в качестве назначения.

OrderCreated класс:

 data class OrderCreated(var id: Int?, var productId: Int?, var customerId: Int?) {
    constructor() : this(null, null, null)
}
 

Заказанный класс

 data class OrderShipped(var id: Int?, var productId: Int?, var customerName: String?, var customerAddress: String?) {
    constructor() : this(null, null, null, null)
}
 

Я использую JSON в качестве формата сообщения, поэтому сообщения выглядят следующим образом:

  • ПОРЯДОК ВВОДА создан: {"id":1, "productId": 7,"customerId": 20}
  • ВЫХОДНЫЕ ДАННЫЕ — отправленный заказ: {"id":1, "productId": 7, "customerName": "X", "customerAddress": "Y"}

Я ищу наилучший подход для фильтрации нежелательных сообщений, учитывая это:

Если я просто использую KStream.filter() прямо сейчас, когда я получу {"id":1, "productId": 7, "customerName": "X", "customerAddress": "Y"} my KStream<Int, OrderCreated> , я отменю отправленное событие как созданный объект OrderCreated с некоторыми пустыми полями : OrderCreated(id:1, productId: 7, customerId: null) . Проверка нулевых полей не кажется надежной.

Возможным решением может быть добавление другого поля, eventType = OrderCreated|OrderShipped , к каждому виду сообщения / класса, который использует эту тему. Даже в этом случае у меня будет класс OrderCreated (помните KStream< Int,OrderCreated > ) с атрибутом EventType=OrderShipped . Это выглядит как уродливый обходной путь. Есть идеи по ее улучшению?

Есть ли другой, более автоматический способ справиться с этим? Например, будет ли другой вид сериализации (AVRO?) Предотвращать обработку сообщений, если они не соответствуют ожидаемой схеме (OrderCreated)? Согласно этой статье, этот способ поддержки нескольких схем (типов событий) в одной теме представляется хорошей практикой: https://www.confluent.io/blog/put-several-event-types-kafka-topic / Однако неясно, как отменить / десериализовать разные типы.

Комментарии:

1. Почему KStream.filter() у вас это не работает? Поскольку все находится в теме Кафки, OrderShipped они по-прежнему будут доступны для других пользователей по этой теме.

2. Как упоминал @ArtemBilan, это должно быть что-то, чем можно управлять с помощью filter . Если вы можете поделиться еще каким-нибудь кодом, мы можем взглянуть.

3. Я обновил вопрос с более подробной информацией

Ответ №1:

Я принял ответ Бруно как действительный способ решения этой проблемы. Однако я думаю, что я придумал более простой / логичный способ, используя иерархию событий JsonTypeInfo , аннотированных с.

Сначала вам нужен базовый класс для упорядочивания событий и указания всех подклассов. Обратите внимание, что в документ JSON будет добавлено свойство type, которое поможет Джексону маршалировать / отменять DTO:

 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
@JsonSubTypes(value = [
    JsonSubTypes.Type(value = OrderCreatedEvent::class, name = "orderCreated"),
    JsonSubTypes.Type(value = OrderShippedEvent::class, name = "orderShipped")
])
abstract class OrderEvent

data class OrderCreatedEvent(var id: Int?, var productId: Int?, var customerId: Int?) : OrderEvent() {
    constructor() : this(null, null, null)
}

data class OrderShippedEvent(var id: Int?, var productId: Int?, var customerName: String?, var customerAddress: String?) : OrderEvent () {
    constructor() : this(null, null, null, null)
}
 

При этом производитель объектов OrderCreatedEvent сгенерирует сообщение, подобное этому:

key: 1 value: {"type":"orderCreated","id":1,"productId":24,"customerId":1}

Теперь очередь KStream. Я изменил подпись на KStream<Int, OrderEvent> , поскольку она может получать OrderCreatedEvent или OrderShippedEvent. В следующих двух строках…

 orderEvent.filter { _, value -> value is OrderCreatedEvent }
                .map { key, value -> KeyValue(key, value as OrderCreatedEvent) }
 

… Я фильтрую, чтобы сохранить только сообщения класса OrderCreatedEvent, и сопоставляю их для преобразования KStream<Int, OrderEvent> в KStream<Int, OrderCreatedEvent>

Полная логика KStream:

 @StreamListener
@SendTo("output")
fun process(@Input("input") input: KStream<Int, Customer>, @Input("order") orderEvent: KStream<Int, OrderEvent>): KStream<Int, OrderShippedEvent> {

        val intSerde = Serdes.IntegerSerde()
        val customerSerde = JsonSerde<Customer>(Customer::class.java)
        val orderCreatedSerde = JsonSerde<OrderCreatedEvent>(OrderCreatedEvent::class.java)

        val stateStore: Materialized<Int, Customer, KeyValueStore<Bytes, ByteArray>> =
                Materialized.`as`<Int, Customer, KeyValueStore<Bytes, ByteArray>>("customer-store")
                        .withKeySerde(intSerde)
                        .withValueSerde(customerSerde)

        val customerTable: KTable<Int, Customer> = input.groupByKey(Serialized.with(intSerde, customerSerde))
                .reduce({ _, y -> y }, stateStore)


        return (orderEvent.filter { _, value -> value is OrderCreatedEvent }
                .map { key, value -> KeyValue(key, value as OrderCreatedEvent) }
                .selectKey { _, value -> value.customerId } as KStream<Int, OrderCreatedEvent>)
                .join(customerTable, { orderIt, customer ->
                    OrderShippedEvent(orderIt.id, orderIt.productId, customer.name, customer.address)
                }, Joined.with(intSerde, orderCreatedSerde, customerSerde))
                .selectKey { _, value -> value.id }
                //.to("order", Produced.with(intSerde, orderShippedSerde))
    }
 

После этого процесса я создаю новое сообщение key: 1 value: {"type":"orderShipped","id":1,"productId":24,"customerName":"Anna","customerAddress":"Cipress Street"} в теме заказа, но оно будет отфильтровано потоком.

Комментарии:

1. Для меня это работает нормально, если известны все типы событий. Однако, есть предложения, как обрабатывать / игнорировать события неизвестного типа? Например, может быть OrderDeletedEvent сообщение, которое должно быть проигнорировано службой поддержки потребителей, поскольку оно не имеет отношения к делу. В этом случае я не хочу включать дополнительный, ненужный OrderDeletedEvent класс. Одним из решений было бы зарегистрировать ошибку и продолжить обработку путем настройки org.apache.kafka.streams.errors.LogAndContinueExceptionHandler , но это означает, что все исключения десериализации игнорируются, что может быть нежелательно.

Ответ №2:

Вы могли бы использовать заголовки записей Kafka для хранения типа записи. См. КИП-82. Вы можете установить заголовки внутри ProducerRecord .

Обработка будет выглядеть следующим образом:

  1. Прочитайте stream тип KStream<Integer, Bytes> со значением serde Serdes.BytesSerde из темы.
  2. Используется KStream#transformValues() для фильтрации и создания объектов. Более конкретно, внутри transformValues() вы можете получить доступ ProcessorContext к тому, что дает вам доступ к заголовкам записей, которые содержат информацию о типе записи. Затем:
    • Если тип есть OrderShipped , верните значение return null .
    • В противном случае создайте OrderCreated объект из Bytes объекта и верните его.

Для решения с помощью AVRO вы можете ознакомиться со следующими документами

Комментарии:

1. Я думаю, что ваше решение может сработать нормально, но я сомневаюсь в этом, transformValues(ValueTransformerSupplier<? super V,? расширяет VR> valueTransformerSupplier, java.lang. String … stateStoreNames) — это операция с отслеживанием состояния, и она ожидает имена хранилищ состояний в методе преобразования, необходимо ли передавать имя хранилища или его можно опустить…

2. Хранилище состояний можно опустить.