#apache-kafka #spring-cloud #apache-kafka-streams #spring-cloud-stream
#apache-kafka #spring-cloud #apache-kafka-streams #spring-cloud-stream
Вопрос:
Это мое приложение, которое просто получает ссылку на KStream из темы клиента (привязка ввода) и другую из темы заказа (привязка заказа). Затем он создает KTable из темы клиента и выполняет соединение с order KStream:
@Configuration
class ShippingKStreamConfiguration {
@StreamListener
@SendTo("output")
fun process(@Input("input") input: KStream<Int, Customer>, @Input("order") order: KStream<Int, Order>): KStream<Int, OrderShipped> {
val intSerde = Serdes.IntegerSerde()
val customerSerde = JsonSerde<Customer>(Customer::class.java)
val orderSerde = JsonSerde<Order>(Order::class.java)
val stateStore: Materialized<Int, Customer, KeyValueStore<Bytes, ByteArray>> =
Materialized.`as`<Int, Customer, KeyValueStore<Bytes, ByteArray>>("customer-store")
.withKeySerde(intSerde)
.withValueSerde(customerSerde)
val customerTable: KTable<Int, Customer> = input.groupByKey(Serialized.with(intSerde, customerSerde))
.reduce({ _, y -> y }, stateStore)
return (order.selectKey { key, value -> value.customerId } as KStream<Int, Order>)
.join(customerTable, { orderIt, customer ->
OrderShipped(orderIt.id)
},
Joined.with(intSerde, orderSerde, customerSerde))
}
}
Предположительно, это должна быть запись в output binding ( @SendTo("output")
) , указывающая на тему ordershipment. Однако в эту тему не записывается никаких сообщений.
Конфигурация процессора:
interface ShippingKStreamProcessor {
@Input("input")
fun input(): KStream<Int, Customer>
@Input("order")
fun order(): KStream<String, Order>
@Input("output")
fun output(): KStream<String, OrderShipped>
}
**application.yml**
spring:
application:
name: spring-boot-shipping-service
cloud:
stream:
kafka:
streams:
binder:
configuration:
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
value:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings:
input:
destination: customer
contentType: application/json
order:
destination: order
contentType: application/json
output:
destination: ordershipments
contentType: application/json
Ответ №1:
Процессор был определен неправильно, это хороший вариант, использующий @Output
вместо @Input
:
interface ShippingKStreamProcessor {
@Input("input")
fun input(): KStream<Int, Customer>
@Input("order")
fun order(): KStream<String, Order>
@Output("output")
fun output(): KStream<String, OrderShipped>
}