#elasticsearch #apache-kafka #confluent-platform #confluent-cloud
#elasticsearch #apache-kafka #confluent-платформа #confluent-cloud
Вопрос:
У меня Kafka работает в confluent cloud, где я могу создавать данные с помощью Node.js клиенту данные отправляются в виде строки, и я получаю следующие поля в confluent cloud.
Затем я создал ElasticsearchSink Connector
и подключил его к облаку поиска elastic. Если я не создаю никаких сопоставлений в эластичном поиске, передача данных проходит успешно как исключенная, и формат выглядит примерно так.
"_source" : {
"booked" : false,
"phone_number" : "919191919191",
"location" : {
"lon" : 60.23,
"lat" : 78.233
}
}
Теперь проблема в том, что если я захочу запустить какой geo queries
-либо, он не позволит мне и выдаст следующую ошибку:
"root_cause" : [
{
"type" : "query_shard_exception",
"reason" : "failed to find geo_point field [location]",
"index_uuid" : "C8Xxu9QlTMKN4Lk1LjpOmQ",
"index" : "locations"
}
Причина в том, что динамическое отображение не поддерживает geo_field. Итак, теперь, когда я пытаюсь создать пользовательское сопоставление для эластичного поиска при создании индекса следующим образом:
PUT /locations
{
"mappings": {
"properties": {
"phone_number": {
"type": "text"
},
"booked": {
"type": "boolean"
},
"location": {
"type": "geo_point"
}
}
}
}
Затем confluent connector выходит из строя и показывает следующую ошибку:
There is a mapping collision in your index: Can't merge a non object mapping with an object mapping.
Я также пробовал booked
как text
поле, но, похоже, ничего не работает. Я не применял никакой схемы в Confluent cloud.
Вот несколько базовых конфигураций из confluent cloud.
Как я могу обеспечить сопоставление, чтобы я мог работать geo queries
в эластичном поиске?
ОБНОВЛЕНИЕ: эта проблема сохраняется в основном из-за формата данных, которые отправляются в Kafka
{
"phone_number": "919191919191",
"location": {
"lat": 78.233,
"lon": 60.23
},
"booked": false,
}
{
"phone_number": " 919191919190",
"location": " 78.233, 60.23",
"booked": false,
}
Оба формата не сопоставляются с указанным выше отображением ElasticSearch
и connector sink
показывают следующую ошибку:
Received Illegal Argument Exception from Elasticsearch: One of your fields' type does not match the mapped type in Elasticsearch
Комментарии:
1. Вы пытаетесь добавить географическое отображение к уже динамически отображаемому местоположению поля?
2. Нет, сначала я создал индекс, а затем добавил сопоставление, но это не сработало. И я также попытался добавить географические поля в динамически отображаемое поле. Но ни один из них не сработал.
Ответ №1:
Confluent Cloud выполняет скрытую работу по выяснению вещей, связанных со схемой, и сохраняет это обнаружение во встроенном реестре схем. Я боюсь, что ваши сопоставления не будут работать, потому что:
- Соединитель все еще пытается отправить предыдущие сохраненные данные.
- Предыдущие сохраненные данные по-прежнему привязаны к старой схеме.
- Confluent Cloud не осознавал, что схема эволюционировала.
Попробуйте сбросить настройки, создав новую среду в Confluent Cloud (что приведет к принудительному созданию нового экземпляра SR) или, возможно, используйте совершенно новую тему Kafka. В любом случае начните с свежих данных. Соединитель всегда старается быть оптимистичным и следить за тем, чтобы данные не терялись, но в процессе это может быть ошибкой, поскольку схема эволюционировала.
Установите сопоставление в Elasticsearch перед чем-либо. Как только вы это сделаете, соединитель будет сопоставлен с правильной схемой. Кроме того, по какой-то причине это работало только тогда, когда я использовал динамические шаблоны для сопоставления для индекса Elasticsearch.