NiFi: как сохранить максимальную временную метку при использовании процессора ListFile / GetFile?

#apache-nifi

#apache-nifi

Вопрос:

я использую версии MiNiFi 0.3 и NiFi 1.5.

у нас есть требование извлекать данные (csv) из папки «A» с помощью MiNiFi и отправлять в NiFi, работающий в Linux.

например, если файл поступает с 10 записями в 1 час ночи. нам нужно переместить (не копировать) файл из папки «A» в NiFi hub. Через 10 минут (1.10 утра) добавленный файл поступит с 10 старыми записями и 10 новыми записями. таким образом, всего он будет содержать 20 записей.

нам нужно отправить только новые 10 записей в NiFi hub.

я попробовал ListFile -> FetchFile, но поскольку нам нужно переместить данные. это не работает. затем я попробовал с процессором GetFile, но он захватывает все 20 записей.

есть ли какой-либо способ достичь сценария.

заранее спасибо.

Комментарии:

1. например, вы могли бы сохранить ее в файле data.csv -> data.csv.timestamp

2. было бы полезно, если бы вы подробно объяснили. какой процессор и где data.csv должен присутствовать?

3. csv — это просто текстовый файл. чтобы отфильтровать строки — вы должны проанализировать его и использовать QueryRecord для удаления ненужных строк. в одном из полей должно быть что-то уникальное (например, временная метка), которое будет использоваться для фильтрации. и сохраните последнее значение этого уникального поля в отдельный файл вместо входящего файла, но с некоторым суффиксом.

4. как проанализировать и извлечь максимальную дату и время из csv. я использовал ExtractText processor, но не смог найти соответствующее регулярное выражение.

5. как прочитать файл data.csv после того, как я получу исходные данные из процессора ListFile?

Ответ №1:

Используя FetchFile , вы можете настроить его с помощью свойства Completion Strategy на Move File или даже Delete File (и затем вы можете PutFile использовать его, когда захотите).

Комментарии:

1. Свойство стратегии завершения со значением Перемещения, позволяющее переместить файл в целевое местоположение. я хотел добиться того, чтобы тот же файл поступал с добавленными новыми записями. на этот раз я хочу переместить только новые записи. как этого добиться?

2. Может быть, извлечь файл, немного подождать и снова извлечь его, пока не будут доступны все записи?

3. наше требование заключается в том, что в файле будут обновляться записи, поступающие каждые 10 минут. И нам нужно обработать, как только появятся новые добавленные данные.

4. как насчет того же процесса, но затем -> EvaluateJsonPath с $.length() свойством (для извлечения количества записей) -> RouteOnAttrribute (чтобы проверить, сколько там записей) и таким образом решить, перемещать файл или нет

5. у меня есть исходный файл в формате csv.