#java #apache-kafka #apache-kafka-connect
#java #apache-kafka #apache-kafka-connect
Вопрос:
Я пытаюсь создать конвертер значений kafka connect, который обертывает недопустимые записи json допустимым объектом json.
Я считываю значения из kinesis (используя KinesisSourceConnector), поэтому входные данные находятся в кодировке base64.
Моя реализация пытается обработать входные данные через ByteArrayConverter, который декодирует данные и делегирует выходные данные JsonConverter следующим образом (decode инициализируется в методе configure как true):
private final Converter delegate = new JsonConverter();
private final Converter decoder = new ByteArrayConverter();
private boolean decode = false;
@Override
public byte[] fromConnectData(String topic, Schema schema, Object value) {
try {
String decoded = new String(decoder.fromConnectData(topic, schema, value));
LOG.info("decoded stringn" decoded);
if(decode) {
byte[] bytes = decoder.fromConnectData(topic, schema, value);
return delegate.fromConnectData(topic, schema, bytes);
}
return delegate.fromConnectData(topic, schema, value);
} catch (Exception e) {
LOG.error("something went wrong", e);
return delegate.fromConnectData(topic, schema, wrapInvalidJson(new String(decoder.fromConnectData(topic, schema, value))));
}
}
Когда я печатаю декодированную строку, она выглядит нормально (декодированная строка json)
Но когда я использую тему вывода, она снова выглядит как base64, и я не уверен, чего мне не хватает
Ответ №1:
Не уверен, что это оптимально, но пошел на этот подход
private final Converter delegate = new JsonConverter();
private final Converter decoder = new ByteArrayConverter();
private final Converter stringConverter = new StringConverter();
private final ObjectMapper mapper = new ObjectMapper();
private boolean decode = false;
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
delegate.configure(Collections.singletonMap("schemas.enable", false), false);
if (configs.containsKey("ni.decode.data") amp;amp; Boolean.valueOf((String) configs.get("ni.decode.data"))) {
decode = true;
}
}
@Override
public byte[] fromConnectData(String topic, Schema schema, Object value) {
if (decode) {
String decoded = new String(decoder.fromConnectData(topic, schema, value));
try {
return mapper.readTree(decoded).toString().getBytes();
} catch (Exception e) {
return wrapInvalidJson(decoded).getBytes();
}
} else {
try {
return delegate.fromConnectData(topic, schema, value);
} catch (Exception e) {
byte[] msg = stringConverter.fromConnectData(topic, schema, value);
return wrapInvalidJson(new String(msg)).getBytes();
}
}
}