#apache-kafka #spring-cloud #avro #apache-kafka-streams #spring-cloud-stream
#apache-kafka #spring-cloud #avro #apache-kafka-streams #spring-cloud-stream
Вопрос:
Предположим, что приложение Spring Cloud Stream создает a KStream
из an order topic
. Его интересуют OrderCreated {"id":x, "productId": y, "customerId": z}
события. Как только один из них поступает, он обрабатывает его и генерирует для него выходное событие OrderShipped {"id":x, "productId": y, "customerName": <, "customerAddress": z}
order topic
.
Проблема, с которой я сталкиваюсь, заключается в том, что, поскольку он читает и записывает из / в одну и ту же тему, потоковое приложение Kafka пытается обработать свои собственные записи, что не имеет смысла.
Как я могу запретить этому приложению обрабатывать события, которые оно генерирует?
ОБНОВЛЕНИЕ: Как отмечают Артем Билан и собичако, я рассматривал возможность использования KStream.filter()
, но есть некоторые детали, которые заставляют меня сомневаться в том, как с этим бороться:
Прямо сейчас приложение KStream выглядит следующим образом:
interface ShippingKStreamProcessor {
...
@Input("order")
fun order(): KStream<String, OrderCreated>
@Output("output")
fun output(): KStream<String, OrderShipped>
Конфигурация KStream
@StreamListener
@SendTo("output")
fun process(..., @Input("order") order: KStream<Int, OrderCreated>): KStream<Int, OrderShipped> {
Как привязки заказа, так и привязки вывода указывают на тему заказа в качестве назначения.
OrderCreated класс:
data class OrderCreated(var id: Int?, var productId: Int?, var customerId: Int?) {
constructor() : this(null, null, null)
}
Заказанный класс
data class OrderShipped(var id: Int?, var productId: Int?, var customerName: String?, var customerAddress: String?) {
constructor() : this(null, null, null, null)
}
Я использую JSON в качестве формата сообщения, поэтому сообщения выглядят следующим образом:
- ПОРЯДОК ВВОДА создан:
{"id":1, "productId": 7,"customerId": 20}
- ВЫХОДНЫЕ ДАННЫЕ — отправленный заказ:
{"id":1, "productId": 7, "customerName": "X", "customerAddress": "Y"}
Я ищу наилучший подход для фильтрации нежелательных сообщений, учитывая это:
Если я просто использую KStream.filter()
прямо сейчас, когда я получу {"id":1, "productId": 7, "customerName": "X", "customerAddress": "Y"}
my KStream<Int, OrderCreated>
, я отменю отправленное событие как созданный объект OrderCreated с некоторыми пустыми полями : OrderCreated(id:1, productId: 7, customerId: null)
. Проверка нулевых полей не кажется надежной.
Возможным решением может быть добавление другого поля, eventType = OrderCreated|OrderShipped
, к каждому виду сообщения / класса, который использует эту тему. Даже в этом случае у меня будет класс OrderCreated (помните KStream< Int,OrderCreated > ) с атрибутом EventType=OrderShipped . Это выглядит как уродливый обходной путь. Есть идеи по ее улучшению?
Есть ли другой, более автоматический способ справиться с этим? Например, будет ли другой вид сериализации (AVRO?) Предотвращать обработку сообщений, если они не соответствуют ожидаемой схеме (OrderCreated)? Согласно этой статье, этот способ поддержки нескольких схем (типов событий) в одной теме представляется хорошей практикой: https://www.confluent.io/blog/put-several-event-types-kafka-topic / Однако неясно, как отменить / десериализовать разные типы.
Комментарии:
1. Почему
KStream.filter()
у вас это не работает? Поскольку все находится в теме Кафки,OrderShipped
они по-прежнему будут доступны для других пользователей по этой теме.2. Как упоминал @ArtemBilan, это должно быть что-то, чем можно управлять с помощью
filter
. Если вы можете поделиться еще каким-нибудь кодом, мы можем взглянуть.3. Я обновил вопрос с более подробной информацией
Ответ №1:
Я принял ответ Бруно как действительный способ решения этой проблемы. Однако я думаю, что я придумал более простой / логичный способ, используя иерархию событий JsonTypeInfo
, аннотированных с.
Сначала вам нужен базовый класс для упорядочивания событий и указания всех подклассов. Обратите внимание, что в документ JSON будет добавлено свойство type, которое поможет Джексону маршалировать / отменять DTO:
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
@JsonSubTypes(value = [
JsonSubTypes.Type(value = OrderCreatedEvent::class, name = "orderCreated"),
JsonSubTypes.Type(value = OrderShippedEvent::class, name = "orderShipped")
])
abstract class OrderEvent
data class OrderCreatedEvent(var id: Int?, var productId: Int?, var customerId: Int?) : OrderEvent() {
constructor() : this(null, null, null)
}
data class OrderShippedEvent(var id: Int?, var productId: Int?, var customerName: String?, var customerAddress: String?) : OrderEvent () {
constructor() : this(null, null, null, null)
}
При этом производитель объектов OrderCreatedEvent сгенерирует сообщение, подобное этому:
key: 1 value: {"type":"orderCreated","id":1,"productId":24,"customerId":1}
Теперь очередь KStream. Я изменил подпись на KStream<Int, OrderEvent>
, поскольку она может получать OrderCreatedEvent или OrderShippedEvent. В следующих двух строках…
orderEvent.filter { _, value -> value is OrderCreatedEvent }
.map { key, value -> KeyValue(key, value as OrderCreatedEvent) }
… Я фильтрую, чтобы сохранить только сообщения класса OrderCreatedEvent, и сопоставляю их для преобразования KStream<Int, OrderEvent>
в KStream<Int, OrderCreatedEvent>
Полная логика KStream:
@StreamListener
@SendTo("output")
fun process(@Input("input") input: KStream<Int, Customer>, @Input("order") orderEvent: KStream<Int, OrderEvent>): KStream<Int, OrderShippedEvent> {
val intSerde = Serdes.IntegerSerde()
val customerSerde = JsonSerde<Customer>(Customer::class.java)
val orderCreatedSerde = JsonSerde<OrderCreatedEvent>(OrderCreatedEvent::class.java)
val stateStore: Materialized<Int, Customer, KeyValueStore<Bytes, ByteArray>> =
Materialized.`as`<Int, Customer, KeyValueStore<Bytes, ByteArray>>("customer-store")
.withKeySerde(intSerde)
.withValueSerde(customerSerde)
val customerTable: KTable<Int, Customer> = input.groupByKey(Serialized.with(intSerde, customerSerde))
.reduce({ _, y -> y }, stateStore)
return (orderEvent.filter { _, value -> value is OrderCreatedEvent }
.map { key, value -> KeyValue(key, value as OrderCreatedEvent) }
.selectKey { _, value -> value.customerId } as KStream<Int, OrderCreatedEvent>)
.join(customerTable, { orderIt, customer ->
OrderShippedEvent(orderIt.id, orderIt.productId, customer.name, customer.address)
}, Joined.with(intSerde, orderCreatedSerde, customerSerde))
.selectKey { _, value -> value.id }
//.to("order", Produced.with(intSerde, orderShippedSerde))
}
После этого процесса я создаю новое сообщение key: 1 value: {"type":"orderShipped","id":1,"productId":24,"customerName":"Anna","customerAddress":"Cipress Street"}
в теме заказа, но оно будет отфильтровано потоком.
Комментарии:
1. Для меня это работает нормально, если известны все типы событий. Однако, есть предложения, как обрабатывать / игнорировать события неизвестного типа? Например, может быть
OrderDeletedEvent
сообщение, которое должно быть проигнорировано службой поддержки потребителей, поскольку оно не имеет отношения к делу. В этом случае я не хочу включать дополнительный, ненужныйOrderDeletedEvent
класс. Одним из решений было бы зарегистрировать ошибку и продолжить обработку путем настройкиorg.apache.kafka.streams.errors.LogAndContinueExceptionHandler
, но это означает, что все исключения десериализации игнорируются, что может быть нежелательно.
Ответ №2:
Вы могли бы использовать заголовки записей Kafka для хранения типа записи. См. КИП-82. Вы можете установить заголовки внутри ProducerRecord
.
Обработка будет выглядеть следующим образом:
- Прочитайте
stream
типKStream<Integer, Bytes>
со значением serdeSerdes.BytesSerde
из темы. - Используется
KStream#transformValues()
для фильтрации и создания объектов. Более конкретно, внутриtransformValues()
вы можете получить доступProcessorContext
к тому, что дает вам доступ к заголовкам записей, которые содержат информацию о типе записи. Затем:- Если тип есть
OrderShipped
, верните значение returnnull
. - В противном случае создайте
OrderCreated
объект изBytes
объекта и верните его.
- Если тип есть
Для решения с помощью AVRO вы можете ознакомиться со следующими документами
Комментарии:
1. Я думаю, что ваше решение может сработать нормально, но я сомневаюсь в этом, transformValues(ValueTransformerSupplier<? super V,? расширяет VR> valueTransformerSupplier, java.lang. String … stateStoreNames) — это операция с отслеживанием состояния, и она ожидает имена хранилищ состояний в методе преобразования, необходимо ли передавать имя хранилища или его можно опустить…
2. Хранилище состояний можно опустить.