#apache-nifi
Вопрос:
В настоящее время я использую Apache Nifi для передачи данных в Elasticsearch. Данные отправляются по одному индексу, но я хочу динамически перенаправлять данные по разным индексам в зависимости от содержимого client_id
поля.
У меня есть QueryRecord
процессор, который выполняет некоторую фильтрацию перед отправкой в Elasticsearch или удалением. Устанавливается QueryProcessor
с JsonTreeReader
помощью и JsonRecordWriter
Как бы я мог настроить параметр индекса PutElasticSearchRecord
, чтобы использовать содержимое поля client_id
в качестве имени индекса?
Ответ №1:
Вы должны иметь возможность использовать Index Record Path
свойство и указать путь к записи поля индекса.
Ответ №2:
PutElasticsearchRecord имеет два варианта определения индекса.
- Вы можете задать
Index
свойство, которое может быть либо статическим значением, либо использовать язык выражений для извлечения значения из атрибутов. Вы можете использовать PartitionRecord для разделения записей в потоковые файлы по значениюclient_id
. - Вы можете установить
Index Record Path
, какой путь к записи будет использоваться для извлечения значения из содержимого записи. Путь к записи был/client_id
бы тем, что вам нужно в вашем случае.
Комментарии:
1. Спасибо вам за ваш комментарий. Полагаю, я все еще немного не в себе в этом вопросе. Если я оставлю
index
поле пустым и установлюIndex Record Path
/client_id
значение, которое автоматически создаст индексы на основе значения/client_id
? Если бы у меня было два поля, которые я решил объединить, чтобы создать индекс, такой как/client_id
/admin_level
, смог бы я все равно добавить это вIndex Record Path
подобное${/client_id}-${admin_level}
?2. Настройка
Index Record Path
заполнит индекс любым значением/client_id
, равным. Автоматическое создание индексов обрабатывается ES, поэтому вам нужно убедиться, что ES настроен на автоматическое создание индексов. Если вы хотите объединить два поля, вам все равно нужно будет использовать пути записи и использоватьconcat()
функцию — напримерconcat(/client_id, '-', /admin_level)