Как использовать параметры контекста в пользовательском процессоре Java и модульном тестировании

#apache-nifi

#apache-nifi

Вопрос:

Я создал пробный процессор Nifi на Java с 2 дескрипторами свойств, каждый из которых предназначен для чтения параметра контекста

    public static final PropertyDescriptor
  CACHE_KEY =
  property( "last-dt-processed-key" )
    .description( "References a context property which holds the cache key used for the last date time processed" )
    .required( true )
    .addValidator( NON_EMPTY_VALIDATOR )
    .defaultValue( "#{last-dt-processed-key}" )
    .build( );

public static final PropertyDescriptor
  PROCESS_FROM_DAYS_AGO_DEFAULT =
  property( "process-from-days-ago-default" )
    .description( "References a context property which holds the default number of days ago to start processing from" )
    .required( true )
    .addValidator( NON_EMPTY_VALIDATOR )
    .defaultValue( "#{process-from-days-ago-default}" )
    .build( );
 

Процессор получает значения этих параметров контекста, регистрирует их и добавляет как атрибуты в файл исходящего потока

 @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    FlowFile flowFile = session.get();
    if ( flowFile == null ) {
        return;
    }

    final String cacheKey = context.getProperty( CACHE_KEY ).evaluateAttributeExpressions( ).getValue( );
    final String daysAgo =  context.getProperty( PROCESS_FROM_DAYS_AGO_DEFAULT ).getValue();

    getLogger()
      .info( String.format( "parameters cache key '%s' days ago %s with attributes %s" ,
        cacheKey ,
        daysAgo ,
        flowFile.getAttributes()
      )
    );

    flowFile = session.putAttribute(flowFile, "added-cache-key", cacheKey );
    flowFile = session.putAttribute(flowFile, "added-days-ago", daysAgo );

    session.transfer( flowFile , SUCCESS);
 

Обратите внимание, что я использую 2 варианта доступа к свойству, с помощью и без evaluateAttributeExpressions( ) . Поскольку это обе работы, мне неясно, следует ли мне их использовать.

2020-12-10 10:43:18 396 ИНФОРМАЦИЯ [Поток процесса, управляемый таймером-3] c.s.n.p.a. adwords.MyParameterProcessor MyParameterProcessor[id=49716825-0176-1000-7c82-90d5d4d51e14] ключ кэша параметров ‘ssd-uploader-last-datetime-processed’ дней назад 30 с атрибутами {absolute.path=C:nifinifi-1.12.1data.in /, путь=/, filename=ssd-extract.txt , файл.LastModifiedTime=2020-12-08T09:42:06 1300, file.CreationTime=2020-12-10T10:39:21 1300, file.Последнее время доступа =2020-12-10T10:39:21 1300, файл.owner= xxxxx, uuid= bc0ec9c5-7f82-40db-888f-ec581aeadb98}

 Key: 'added-cache-key' 
Value: 'ssd-uploader-last-datetime-processed'
Key: 'added-days-ago'
Value: '30'
 

Теперь, как я могу это проверить (я бы предпочел сначала создать тест, но у меня не работают тесты !!)? У меня есть это

 @Test void testMyParameterProcessor() {

    final MockComponentLog logger = new MockComponentLog( "1234" , this  );

    final TestRunner runner = TestRunners.newTestRunner( MyParameterProcessor.class , logger );

    runner.setValidateExpressionUsage( false );

    // this works
    runner.setProperty( CACHE_KEY.getName() , "ssd-uploader-last-datetime-processed" );
    runner.setProperty( PROCESS_FROM_DAYS_AGO_DEFAULT.getName() , "3" );


    final MockFlowFile flowFile = runner.enqueue( "Hello world!!" .getBytes( ) , Collections.emptyMap( ) );

    runner.run();
    flowFile.assertAttributeEquals( "added-cach-key" , "ssd-uploader-last-datetime-processed" );
    flowFile.assertAttributeEquals( "added-days-ago" , "30" );
    runner.assertAllFlowFilesTransferred( SUCCESS, 1 );
}
 

Без этой строки, которую я только что обнаружил (я не заметил ее в исключении и не документирован),

 runner.setValidateExpressionUsage( false )
 

значения свойств не могут быть восстановлены, получите это исключение

 java.lang.AssertionError: java.lang.IllegalStateException: Attempting to Evaluate Expressions but PropertyDescriptor[last-dt-processed-key] indicates that the Expression Language is not supported. If you realize that this is the case and do not want this error to occur, it can be disabled by calling TestRunner.setValidateExpressionUsage(false)
 

С помощью этой строки свойства регистрируются

 MyParameterProcessorUTests@487ed8ff parameters cache key 'ssd-uploader-last-datetime-processed' days ago 3 with attributes {path=target, filename=188603694525600.mockFlowFile, uuid=f02f0e04-7901-4460-9fcf-8e1734cd0999}
 

Но атрибуты не добавляются в файл потока. Я использую nifi 1.12.1.

Любой совет или указатель на аналогичный пример теста будут приняты с благодарностью.

Комментарии:

1. Кроме того, я опробовал платформу Nifi integration testing Framework на основе scala, которую я нахожу очень хорошей, но у нее также были

Ответ №1:

Кроме того, я опробовал платформу Nifi integration testing framework plumber на основе scala, которую я нахожу очень хорошей, но, похоже, она также не обрабатывала контекстные параметры pl.touk plumber

Я пересмотрел это после распознавания ответа runner.setValidateExpressionUsage (false) выше. То же самое можно сделать в plumber, включив validateExpressions = false при добавлении узла: все работает, включая атрибуты, добавляемые в файл потока. К вашему сведению, вот весь тест

 class ParameterProcessorITests extends FunSuite with Matchers {

   private val emptyAttributes: Map[String,String] = Map( )

   test("test my parameter processor " ){

val flow = new NifiFlowBuilder()

  .addNode(name = "pp",
    processor = new MyParameterProcessor,
    properties = Map(
      MyParameterProcessor.CACHE_KEY.getName-> "ssd-uploader-last-datetime-processed",
      MyParameterProcessor.PROCESS_FROM_DAYS_AGO_DEFAULT.getName->"3"
    ) ,
    validateExpressions = false
  )

  .addNode("log-message", new LogMessage, emptyAttributes )
  .addNode("log-attribute", new LogAttribute, emptyAttributes )

  .addInputConnection("pp")
  .addConnection("pp", "log-message", MyParameterProcessor.SUCCESS )
  .addConnection("pp", "log-attribute", MyParameterProcessor.SUCCESS )
  .addOutputConnection("pp", MyParameterProcessor.SUCCESS )

  .build()

flow.enqueue(
  "anything".getBytes,
  emptyAttributes
)
flow.run()

val results = flow.executionResult.outputFlowFiles
results.foreach(file => println( new String( file.toByteArray ) ) )
results should have size 1
results.head.assertAttributeEquals("added-cache-key", "ssd-uploader-last-datetime-processed" )
results.head.assertAttributeEquals("added-days-ago", "3" )
 

}
}