Игнорирование схемы JSON в ElasticsearchSinkConnector

#apache-kafka #apache-kafka-connect #jsonschema

Вопрос:

В настоящее время мы используем ElasticsearchSinkConnector в нашем кластере kafka-connect для перемещения данных из Kafka в Elastic. Мы используем JSON-схемы и регистр схем в нашей настройке.

В этом случае наши данные очень динамичны и, следовательно, могут быть определены только как a type: object в нашей JSON-схеме. При отправке следующего сообщения,

 {  data: {  propOne: "hi"    propTwo: "hi"  },  type: "Person", }  

со следующей схемой,

 {  required: ["data", "type"],  properties: {  "type": { type: "String" }  "data": { type: "Object" }  }, }  

мы сталкиваемся со следующей проблемой:

Наш ElasticSinkConnector получает только структуру подключения, содержащую a type с соответствующим строковым значением, но всегда пустой data объект. Это имеет смысл, так как, основываясь на схеме, это максимальная структура, которую он может вывести. Однако мы, конечно, хотели бы переместить весь исходный data объект в наш экземпляр elasticsearch, а не только пустой объект.

Это основной поток: введите описание изображения здесь

Соединитель приемника Elasticsearch поддерживает вывод данных в формате JSON (без схем) из разделов Apache Kafka. Но если мы используем org.apache.kafka.connect.json.JsonConverter конвертер, чтобы игнорировать схему, мы, конечно, получим проблему, заключающуюся в том, что байт subjectid не может быть интерпретирован. Таким образом, я ищу что-то подобное org.apache.kafka.connect.json.JsonConverter , что игнорирует subjectid и magicbyte, берет JSON, как бы он ни появлялся, и сбрасывает его в elastic.

Альтернативы, о которых мы знаем: мы, конечно, могли бы использовать JSON-событие без схемы для этого варианта использования/темы. Мы все еще хотели бы избежать этого, так как наш производитель в настоящее время не поддерживает это.

Комментарии:

1. Что произойдет, если вы установите schema.ignore=true конфигурацию соединителя?

2. Привет, Робин, к сожалению, ничего не меняется, так как я думаю, что schema.ignore при записи в elastic просто игнорируется схема соединителя-структуры-схемы. Но проблема в том, что IMHO уже на шаг раньше, так как данные никогда не попадают в структуру подключения после преобразования значения.