Как создать несколько файлов потока из одного файла входящего потока в nifi, используя executeScript с python

#python #apache-nifi #execute-script #flowfile

#python #apache-nifi #execute-script #flowfile

Вопрос:

При локальном запуске это работает именно так, как я хочу (имеет один файл входящего потока с множеством разных кодов в позиции 7-10 и выводит 1 файл на уникальный код) Например, если запись 1-5 имеет 1234 в позициях 7-10, а запись 6 имеет 2345 в позиции 7-10, а запись 7 имеет 1234 в позициях 7-10, тогда будет один файл с именем 1234_file.txt со строками 1-5 и 7 и вторым файлом 2345_file.txt будет иметь строку 6 из входного файла:

 f = open("test_comp.txt", "r")
for x in f:
    comp = x[6:10]
    print(comp)
    n = open(comp "_file.txt","a")
    n.write(x)
    n.close()
f.close()
  

В nifi я пытаюсь это:

 from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    f = open(inputStream, 'r')
    for x in f:
        comp = x[6:10]
        print("comp: ",comp)
        newFile = open(comp "_file.txt","a")
        newFile.write(x)


flowFile = session.get()
if (flowFile != None):
    flowFile = session.write(flowFile, PyStreamCallback())
    session.transfer(flowFile, REL_SUCCESS)
session.commit()
  

Кажется, он получает входные данные и правильно сохраняет comp в позиции 7-10, как и ожидалось, но я не получаю несколько файлов потока (для каждой уникальной строки в x[6: 10]. И выходящий файл потока представляет собой файл с 1 нулевым байтом.

Есть мысли о том, чего мне не хватает??

Ответ №1:

Вы записываете непосредственно в файлы в своей файловой системе, а не в потоковые файлы, которые являются объектами в экосистеме NiFi. Я бы посоветовал прочитать Руководство разработчика Apache NiFi для контекста этих шаблонов и посмотреть некоторые примеры Python executeScript, чтобы увидеть соответствующий код Python.

Вместо того, чтобы записывать один потоковый файл, вам нужно будет создать несколько объектов потокового файла, сопоставить с ними данные, а затем перенести их все в соответствующие отношения.

Есть ли причина, по которой вам нужно использовать пользовательский код Python для этого, а не SplitRecord процессоры and / or PartitionRecord ? Я думаю PartitionRecord , что это очень легко решит проблему, которую вы описали.

Комментарии:

1. Спасибо, я прочитаю предложенные вами статьи. Что касается PartitionRecord, какая служба контроллера будет работать, поскольку мой файл представляет собой просто плоский файл (не csv)?

2. Можете ли вы предоставить несколько примеров строк? CSVReader можно настроить для анализа многих файлов с разделителями, даже если они не являются запятыми, табуляциями или каналами

3. Я согласен с Мэттом, если входящие данные «плоские», CSVReader вероятно, это лучшее решение с использованием настраиваемого разделителя. Если ни одна из служб контроллера OOTB не работает для вас, есть Scripted также вариант.

4. 470120123829 2000004590}00000051 3212 057 YVKB 20125 00 999999999999901 HG04070316 123 Main street, US 99999 470220123729 2000091230}00000051 2012 056 Y2HF 27258 00 999999999999901 HG04070316 123 Main street, US 99999

5. Есть две примерные строки. Оба они предназначены для того, чтобы оказаться в разных файлах потока, потому что у одного 1238 в позиции 7-10, а у другого 1237 в позиции 7-10.