Kafka подключает JsonConverter с проверкой

#java #apache-kafka #apache-kafka-connect

#java #apache-kafka #apache-kafka-connect

Вопрос:

Я пытаюсь создать конвертер значений kafka connect, который обертывает недопустимые записи json допустимым объектом json.

Я считываю значения из kinesis (используя KinesisSourceConnector), поэтому входные данные находятся в кодировке base64.

Моя реализация пытается обработать входные данные через ByteArrayConverter, который декодирует данные и делегирует выходные данные JsonConverter следующим образом (decode инициализируется в методе configure как true):

 private final Converter delegate = new JsonConverter();
    private final Converter decoder = new ByteArrayConverter();
    private boolean decode = false;

    @Override
    public byte[] fromConnectData(String topic, Schema schema, Object value) {
        try {
            String decoded = new String(decoder.fromConnectData(topic, schema, value));
            LOG.info("decoded stringn"   decoded);

            if(decode) {
                byte[] bytes = decoder.fromConnectData(topic, schema, value);
                return delegate.fromConnectData(topic, schema, bytes);
            }
            return delegate.fromConnectData(topic, schema, value);
        } catch (Exception e) {
            LOG.error("something went wrong", e);
            return delegate.fromConnectData(topic, schema, wrapInvalidJson(new String(decoder.fromConnectData(topic, schema, value))));
        }
    }
  

Когда я печатаю декодированную строку, она выглядит нормально (декодированная строка json)

Но когда я использую тему вывода, она снова выглядит как base64, и я не уверен, чего мне не хватает

Ответ №1:

Не уверен, что это оптимально, но пошел на этот подход

     private final Converter delegate = new JsonConverter();
    private final Converter decoder = new ByteArrayConverter();
    private final Converter stringConverter = new StringConverter();
    private final ObjectMapper mapper = new ObjectMapper();
    private boolean decode = false;

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        delegate.configure(Collections.singletonMap("schemas.enable", false), false);
        if (configs.containsKey("ni.decode.data") amp;amp; Boolean.valueOf((String) configs.get("ni.decode.data"))) {
            decode = true;
        }
    }

    @Override
    public byte[] fromConnectData(String topic, Schema schema, Object value) {

        if (decode) {
            String decoded = new String(decoder.fromConnectData(topic, schema, value));
            try {
                return mapper.readTree(decoded).toString().getBytes();
            } catch (Exception e) {
                return wrapInvalidJson(decoded).getBytes();
            }
        } else {
            try {
                return delegate.fromConnectData(topic, schema, value);
            } catch (Exception e) {
                byte[] msg = stringConverter.fromConnectData(topic, schema, value);
                return wrapInvalidJson(new String(msg)).getBytes();
            }
        }
    }