#apache-nifi
#apache-nifi
Вопрос:
я использую версии MiNiFi 0.3 и NiFi 1.5.
у нас есть требование извлекать данные (csv) из папки «A» с помощью MiNiFi и отправлять в NiFi, работающий в Linux.
например, если файл поступает с 10 записями в 1 час ночи. нам нужно переместить (не копировать) файл из папки «A» в NiFi hub. Через 10 минут (1.10 утра) добавленный файл поступит с 10 старыми записями и 10 новыми записями. таким образом, всего он будет содержать 20 записей.
нам нужно отправить только новые 10 записей в NiFi hub.
я попробовал ListFile -> FetchFile, но поскольку нам нужно переместить данные. это не работает. затем я попробовал с процессором GetFile, но он захватывает все 20 записей.
есть ли какой-либо способ достичь сценария.
заранее спасибо.
Комментарии:
1. например, вы могли бы сохранить ее в файле
data.csv
->data.csv.timestamp
2. было бы полезно, если бы вы подробно объяснили. какой процессор и где data.csv должен присутствовать?
3. csv — это просто текстовый файл. чтобы отфильтровать строки — вы должны проанализировать его и использовать QueryRecord для удаления ненужных строк. в одном из полей должно быть что-то уникальное (например, временная метка), которое будет использоваться для фильтрации. и сохраните последнее значение этого уникального поля в отдельный файл вместо входящего файла, но с некоторым суффиксом.
4. как проанализировать и извлечь максимальную дату и время из csv. я использовал ExtractText processor, но не смог найти соответствующее регулярное выражение.
5. как прочитать файл data.csv после того, как я получу исходные данные из процессора ListFile?
Ответ №1:
Используя FetchFile
, вы можете настроить его с помощью свойства Completion Strategy
на Move File
или даже Delete File
(и затем вы можете PutFile
использовать его, когда захотите).
Комментарии:
1. Свойство стратегии завершения со значением Перемещения, позволяющее переместить файл в целевое местоположение. я хотел добиться того, чтобы тот же файл поступал с добавленными новыми записями. на этот раз я хочу переместить только новые записи. как этого добиться?
2. Может быть, извлечь файл, немного подождать и снова извлечь его, пока не будут доступны все записи?
3. наше требование заключается в том, что в файле будут обновляться записи, поступающие каждые 10 минут. И нам нужно обработать, как только появятся новые добавленные данные.
4. как насчет того же процесса, но затем ->
EvaluateJsonPath
с$.length()
свойством (для извлечения количества записей) ->RouteOnAttrribute
(чтобы проверить, сколько там записей) и таким образом решить, перемещать файл или нет5. у меня есть исходный файл в формате csv.