Приложение Spring Cloud Stream Kafka не генерирует сообщения с правильной схемой Avro

#apache-kafka #apache-kafka-streams #spring-cloud-stream #confluent-platform #confluent-schema-registry

#apache-kafka #apache-kafka-streams #spring-cloud-stream #confluent-платформа #confluent-schema-registry

Вопрос:

У меня есть приложение (spring-boot-shipping-service) с KStream, которое получает сообщения OrderCreatedEvent, сгенерированные внешним производителем (spring-boot-order-service). Этот производитель использует следующую схему:

order-created-event.avsc

 {
  "namespace" : "com.codependent.statetransfer.order",
  "type" : "record",
  "name" : "OrderCreatedEvent",
  "fields" : [
    {"name":"id","type":"int"},
    {"name":"productId","type":"int"},
    {"name":"customerId","type":"int"}
  ]
}
  

My KStream<Int, OrderCreatedEvent> объединяется с KTable<Int, Customer> и публикует в разделе заказа новый вид сообщения: OrderShippedEvent.

заказ-отправлено-событие.avsc

 {
  "namespace" : "com.codependent.statetransfer.order",
  "type" : "record",
  "name" : "OrderShippedEvent",
  "fields" : [
    {"name":"id","type":"int"},
    {"name":"productId","type":"int"},
    {"name":"customerName","type":"string"},
    {"name":"customerAddress","type":"string"}
  ]
}
  

По какой-то причине новые сообщения OrderShippedEvent генерируются не с заголовком, application/vnd.ordershippedevent.v1 avro но application/vnd.ordercreatedevent.v1 avro .

Это исходное OrderCreatedEvent в разделе order:

 Key (4 bytes):  
  Value (4 bytes): V?
  Timestamp: 1555943926163
  Partition: 0
  Offset: 34
  Headers: contentType="application/vnd.ordercreatedevent.v1 avro",spring_json_header_types={"contentType":"java.lang.String"}
  

И созданное OrderShippedEvent с неправильной схемой:

 Key (4 bytes):  
  Value (26 bytes): V?
JamesHill Street
  Timestamp: 1555943926163
  Partition: 0
  Offset: 35
  Headers: contentType="application/vnd.ordercreatedevent.v1 avro",spring_json_header_types={"contentType":"java.lang.String"}
  

Я проверил содержимое реестра Confluent Schema, и схема order-shipped-event.avsc есть:

введите описание изображения здесь

Почему он не использует правильную shema в сгенерированном сообщении?

Ниже вы можете увидеть полную конфигурацию и код примера, который также доступен на Github (https://github.com/codependent/event-carried-state-transfer/tree/avro )

Чтобы протестировать его, просто запустите Confluent Platform (v5.2.1), spring-boot-customer-service, spring-boot-order-service, spring-boot-shipping-service и выполните следующие команды curl:

curl -X POST http://localhost:8080/customers -d '{"id":1,"name":"James","address":"Hill Street"}' -H "content-type: application/json"

curl -X POST http://localhost:8084/orders -H "content-type: application/json" -d '{"id":1,"productId":1001,"/customerId":1}'

application.yml

 server:
  port: 8085

spring:
  application:
    name: spring-boot-shipping-service
  cloud:
    stream:
      kafka:
        streams:
          binder:
            configuration:
              default:
                key:
                  serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
      bindings:
        input:
          destination: customer
          contentType: application/* avro
        order:
          destination: order
          contentType: application/* avro
        output:
          destination: order
          contentType: application/* avro
      schema-registry-client:
        endpoint: http://localhost:8081
  

ShippingKStreamProcessor

 interface ShippingKStreamProcessor {

    @Input("input")
    fun input(): KStream<Int, Customer>

    @Input("order")
    fun order(): KStream<String, OrderCreatedEvent>

    @Output("output")
    fun output(): KStream<String, OrderShippedEvent>
  

ShippingKStreamConfiguration

     @StreamListener
    @SendTo("output")
    fun process(@Input("input") input: KStream<Int, Customer>, @Input("order") orderEvent: KStream<Int, OrderCreatedEvent>): KStream<Int, OrderShippedEvent> {

        val serdeConfig = mapOf(
                AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to "http://localhost:8081")

        val intSerde = Serdes.IntegerSerde()
        val customerSerde = SpecificAvroSerde<Customer>()
        customerSerde.configure(serdeConfig, true)
        val orderCreatedSerde = SpecificAvroSerde<OrderCreatedEvent>()
        orderCreatedSerde.configure(serdeConfig, true)
        val orderShippedSerde = SpecificAvroSerde<OrderShippedEvent>()
        orderShippedSerde.configure(serdeConfig, true)


        val stateStore: Materialized<Int, Customer, KeyValueStore<Bytes, ByteArray>> =
                Materialized.`as`<Int, Customer, KeyValueStore<Bytes, ByteArray>>("customer-store")
                        .withKeySerde(intSerde)
                        .withValueSerde(customerSerde)

        val customerTable: KTable<Int, Customer> = input.groupByKey(Serialized.with(intSerde, customerSerde))
                .reduce({ _, y -> y }, stateStore)

        return (orderEvent.filter { _, value -> value is OrderCreatedEvent amp;amp; value.id != 0 }
                .selectKey { _, value -> value.customerId } as KStream<Int, OrderCreatedEvent>)
                .join(customerTable, { orderIt, customer ->
                    OrderShippedEvent(orderIt.id, orderIt.productId, customer.name, customer.address)
                }, Joined.with(intSerde, orderCreatedSerde, customerSerde))
                .selectKey { _, value -> value.id }
    }
  

UPDATE: I’ve set trace logging level for org.springframework.messaging and apparently it looks ok:

 2019-04-22 23:40:39.953 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate              : HTTP GET http://localhost:8081/subjects/ordercreatedevent/versions/1
2019-04-22 23:40:39.971 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate              : Accept=[application/json, application/* json]
2019-04-22 23:40:39.972 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate              : Writing [] as "application/vnd.schemaregistry.v1 json"
2019-04-22 23:40:39.984 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate              : Response 200 OK
2019-04-22 23:40:39.985 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate              : Reading to [java.util.Map<?, ?>]
2019-04-22 23:40:40.186  INFO 46039 --- [read-1-producer] org.apache.kafka.clients.Metadata        : Cluster ID: 5Sw6sBD0TFOaximF3Or-dQ
2019-04-22 23:40:40.318 DEBUG 46039 --- [-StreamThread-1] AvroSchemaRegistryClientMessageConverter : Obtaining schema for class class com.codependent.statetransfer.order.OrderShippedEvent
2019-04-22 23:40:40.318 DEBUG 46039 --- [-StreamThread-1] AvroSchemaRegistryClientMessageConverter : Avro type detected, using schema from object
2019-04-22 23:40:40.342 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate              : HTTP POST http://localhost:8081/subjects/ordershippedevent/versions
2019-04-22 23:40:40.342 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate              : Accept=[application/json, application/* json]
2019-04-22 23:40:40.342 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate              : Writing [{"schema":"{"type":"record","name":"OrderShippedEvent","namespace":"com.codependent.statetransfer.order","fields":[{"name":"id","type":"int"},{"name":"productId","type":"int"},{"name":"customerName","type":{"type":"string","avro.java.string":"String"}},{"name":"customerAddress","type":{"type":"string","avro.java.string":"String"}}]}"}] as "application/json"
2019-04-22 23:40:40.348 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate              : Response 200 OK
2019-04-22 23:40:40.348 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate              : Reading to [java.util.Map<?, ?>]
2019-04-22 23:40:40.349 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate              : HTTP POST http://localhost:8081/subjects/ordershippedevent
2019-04-22 23:40:40.349 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate              : Accept=[application/json, application/* json]
2019-04-22 23:40:40.349 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate              : Writing [{"schema":"{"type":"record","name":"OrderShippedEvent","namespace":"com.codependent.statetransfer.order","fields":[{"name":"id","type":"int"},{"name":"productId","type":"int"},{"name":"customerName","type":{"type":"string","avro.java.string":"String"}},{"name":"customerAddress","type":{"type":"string","avro.java.string":"String"}}]}"}] as "application/json"
2019-04-22 23:40:40.361 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate              : Response 200 OK
2019-04-22 23:40:40.362 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate              : Reading to [java.util.Map<?, ?>]
2019-04-22 23:40:40.362 DEBUG 46039 --- [-StreamThread-1] AvroSchemaRegistryClientMessageConverter : Finding correct DatumWriter for type com.codependent.statetransfer.order.OrderShippedEvent
  

How come the message is written with an incorrect content type header then?

UPDATE 2:

I’ve kept digging into the source code and found this:

  1. KafkaStreamsMessageConversionDelegate correctly converts and determines the right header values, as seen in the logs above.

  2. Однако в методе serializeOnOutbound мы можем обнаружить, что он возвращает в API Kafka только полезную нагрузку, поэтому заголовки не учитываются:

 return
                    messageConverter.toMessage(message.getPayload(),
                            messageHeaders).getPayload();
  
  1. При продвижении вперед при обработке записей org.apache.kafka.streams.processor.internals.SinkNode.process() обращаются к заголовкам, присутствующим в контексте, которые неправильно содержат application/vnd.ordercreatedevent.v1 avro вместо application/vnd.ordershippedevent.v1 avro (?):
 collector.send(topic, key, value, context.headers(), timestamp, keySerializer, valSerializer, partitioner);

  

ОБНОВЛЕНИЕ 3:

Шаги для воспроизведения:

  1. Загрузите и запустите Confluent 5.2.1 confluent start

  2. Запустите приложения spring-boot-order-service, spring-boot-customer-service, spring-boot-shipping-service

  3. Создайте клиента curl -X POST http://localhost:8080/customers -d '{"id":1,"name":"John","address":"Some Street"}' -H "content-type: application/json"

  4. Создайте заказ, который будет объединен с клиентом: curl -X POST http://localhost:8084/orders -H "content-type: application/json" -d '{"id":1,"productId":1,"customerId":1}'

  5. ShippingKStreamConfiguration process() создаст KTable для клиента и хранилище состояний (customer-store). Кроме того, оно присоединится к потоку заказов с помощью customer KTable для преобразования OrderCreatedEvent в OrderShippedEvent.

  6. Вы можете проверить, что вновь созданное сообщение OrderShippedEvent, добавленное в раздел заказа, имеет неправильный заголовок. Это можно увидеть либо в Confluent Control Center ( localhost:9092 -> topics -> order ), либо при запуске kafkacat:

 $> kafkacat -b localhost:9092 -t order -C 
  -f 'nKey (%K bytes): %k   
  Value (%S bytes): %s
  Timestamp: %T
  Partition: %p
  Offset: %o
  Headers: %hn'

  

Комментарии:

1. Похоже, вы добились значительного прогресса в решении этой проблемы. Я могу взглянуть на это завтра и вернуться к вам. Тем временем, если вы найдете дополнительную информацию, пожалуйста, обновите здесь.

2. Я не вижу spring-boot-order-service в репозитории. Я пытаюсь воспроизвести проблему. Не могли бы вы указать точные шаги для запуска вашей системы и увидеть проблему?

3. Проект находится в ветке avro репозитория: github.com/codependent/event-carried-state-transfer/tree/avro -> github.com/codependent/event-carried-state-transfer/tree/avro /…

4. Обновлено с шагами для воспроизведения. Спасибо, что взглянули на это!!

Ответ №1:

@codependent Это действительно проблема, которую нам нужно решить в binder, которую мы скоро исправим. В то же время, в качестве обходного пути вы можете заставить свой процессор не возвращать a KStream , а выполнять отправку в самом методе. Вы можете вызвать to(TopicNameExtractor) текущее возвращаемое KStream . TopicNameExtractor предоставит вам доступ к контексту записи, с помощью которого вы можете вручную задать тип содержимого.

Комментарии:

1. Вот ссылка на проблему: github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues /…