Соединитель Kafka Confluent cloud ElasticsearchSink: сопоставление столкновения в индексе

#elasticsearch #apache-kafka #confluent-platform #confluent-cloud

#elasticsearch #apache-kafka #confluent-платформа #confluent-cloud

Вопрос:

У меня Kafka работает в confluent cloud, где я могу создавать данные с помощью Node.js клиенту данные отправляются в виде строки, и я получаю следующие поля в confluent cloud.введите описание изображения здесь

Затем я создал ElasticsearchSink Connector и подключил его к облаку поиска elastic. Если я не создаю никаких сопоставлений в эластичном поиске, передача данных проходит успешно как исключенная, и формат выглядит примерно так.

 "_source" : {
          "booked" : false,
          "phone_number" : "919191919191",
          "location" : {
            "lon" : 60.23,
            "lat" : 78.233
          }
        }
  

Теперь проблема в том, что если я захочу запустить какой geo queries -либо, он не позволит мне и выдаст следующую ошибку:

 "root_cause" : [
      {
        "type" : "query_shard_exception",
        "reason" : "failed to find geo_point field [location]",
        "index_uuid" : "C8Xxu9QlTMKN4Lk1LjpOmQ",
        "index" : "locations"
      }
  

Причина в том, что динамическое отображение не поддерживает geo_field. Итак, теперь, когда я пытаюсь создать пользовательское сопоставление для эластичного поиска при создании индекса следующим образом:

 PUT /locations
{
  "mappings": {
    "properties": {
      "phone_number": {
        "type": "text"
      },
      "booked": {
        "type": "boolean"
      },
      "location": {
        "type": "geo_point"
        }
      }
  }
}
  

Затем confluent connector выходит из строя и показывает следующую ошибку:

 There is a mapping collision in your index: Can't merge a non object mapping with an object mapping.
  

Я также пробовал booked как text поле, но, похоже, ничего не работает. Я не применял никакой схемы в Confluent cloud.
Вот несколько базовых конфигураций из confluent cloud.
введите описание изображения здесь

Как я могу обеспечить сопоставление, чтобы я мог работать geo queries в эластичном поиске?

ОБНОВЛЕНИЕ: эта проблема сохраняется в основном из-за формата данных, которые отправляются в Kafka

 {
    "phone_number": "919191919191",
    "location": {
            "lat": 78.233,
            "lon": 60.23
    },
    "booked": false,
}

{
    "phone_number": " 919191919190",
    "location": " 78.233, 60.23",
    "booked": false,
}
  

Оба формата не сопоставляются с указанным выше отображением ElasticSearch и connector sink показывают следующую ошибку:

 Received Illegal Argument Exception from Elasticsearch: One of your fields' type does not match the mapped type in Elasticsearch
  

Комментарии:

1. Вы пытаетесь добавить географическое отображение к уже динамически отображаемому местоположению поля?

2. Нет, сначала я создал индекс, а затем добавил сопоставление, но это не сработало. И я также попытался добавить географические поля в динамически отображаемое поле. Но ни один из них не сработал.

Ответ №1:

Confluent Cloud выполняет скрытую работу по выяснению вещей, связанных со схемой, и сохраняет это обнаружение во встроенном реестре схем. Я боюсь, что ваши сопоставления не будут работать, потому что:

  1. Соединитель все еще пытается отправить предыдущие сохраненные данные.
  2. Предыдущие сохраненные данные по-прежнему привязаны к старой схеме.
  3. Confluent Cloud не осознавал, что схема эволюционировала.

Попробуйте сбросить настройки, создав новую среду в Confluent Cloud (что приведет к принудительному созданию нового экземпляра SR) или, возможно, используйте совершенно новую тему Kafka. В любом случае начните с свежих данных. Соединитель всегда старается быть оптимистичным и следить за тем, чтобы данные не терялись, но в процессе это может быть ошибкой, поскольку схема эволюционировала.

Установите сопоставление в Elasticsearch перед чем-либо. Как только вы это сделаете, соединитель будет сопоставлен с правильной схемой. Кроме того, по какой-то причине это работало только тогда, когда я использовал динамические шаблоны для сопоставления для индекса Elasticsearch.