Как использовать сообщения из темы Debezium из Quarkus?

#quarkus #debezium

#quarkus #debezium

Вопрос:

Я пытаюсь настроить приложение, которое производит события изменения с помощью MySQL Debezium Kafka. Я хотел бы использовать сообщения из темы Debezium с помощью приложения Quarkus Microprofile. Я использую следующую конфигурацию на стороне Quarkus для захвата входящих сообщений:

 mp.messaging.incoming.customers.connector=smallrye-kafka
mp.messaging.incoming.customers.topic=dbserver1.inventory.customers
mp.messaging.incoming.customers.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
  

Это работает, однако событие изменения, когда оно записывается с помощью StringDeserializer, содержит не только измененную запись:

 {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"before":null,"after":{"id":1005,"first_name":"myname","last_name":"myusername","email":"amail@mail.com"},"source":{"version":"1.3.0.Final","connector":"mysql","name":"dbserver1","ts_ms":1603634203000,"snapshot":"false","db":"inventory","table":"customers","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":364,"row":0,"thread":6,"query":null},"op":"c","ts_ms":1603634203419,"transaction":null}} 
  

Как я могу извлечь измененные данные из этого огромного JSON?
что в моем случае:

 {"id":1005,"first_name":"myname","last_name":"myusername","email":"amail@mail.com"}
  

Должен ли я продолжать использовать StringDeserializer и использовать JSONB и выполнять итерации по полезной нагрузке JSON? или есть лучшее решение?

Ответ №1:

Однако я не думаю, что для этого есть лучший подход, поскольку текст представляет собой JSON, использование пользовательского десериализатора, который расширяет, будет ли работать JsonbDeserializer:

 @RegisterForReflection
public class CustomerDeserializer extends JsonbDeserializer<Customer> {

    @Override
    public Customer deserialize(String topic, byte[] data) {
        JsonReader reader = Json.createReader(new StringReader(new String(data)));
        JsonObject jsonObject = reader.readObject();

        JsonObject payload = jsonObject.getJsonObject("payload");
        String firstName = payload.getJsonObject("after").getString("first_name");
        String lastName = payload.getJsonObject("after").getString("last_name");
        String email = payload.getJsonObject("after").getString("email");
        return new Customer(firstName,lastName,email);

    }
}
  

Редактировать: вы можете найти полный пример Debezium здесь.