Процессор Kafka не сохраняет состояние атрибутов потокового файла

#apache-nifi #apache-minifi

#apache-nifi #apache-minifi

Вопрос:

Я обновляю несколько атрибутов flowfile и помещаю их в kafka, но когда я использую то же самое из процессора consumekafka_2.0, эти атрибуты теряются. Это не поддерживается? Нужно ли мне настраивать этот процессор?

Когда я увидел приведенный ниже исходный код процессора, я понял, что он уже считывает атрибуты из записи и записывает их в flowfile, тогда почему они недоступны в flowfile?

 private void writeData(final ProcessSession session, ConsumerRecord<byte[], byte[]> record, final TopicPartition topicPartition) {
        FlowFile flowFile = session.create();
        final BundleTracker tracker = new BundleTracker(record, topicPartition, keyEncoding);
        tracker.incrementRecordCount(1);
        final byte[] value = record.value();
        if (value != null) {
            flowFile = session.write(flowFile, out -> {
                out.write(value);
            });
        }
        flowFile = session.putAllAttributes(flowFile, getAttributes(record));
        tracker.updateFlowFile(flowFile);
        populateAttributes(tracker);
        session.transfer(tracker.flowFile, REL_SUCCESS);
    }
  

Комментарии:

1. Похоже, что вы пишете в Kafka с помощью скрипта и пытаетесь прочитать его с помощью NiFi. Чтобы сузить проблему, пожалуйста, попробуйте записать некоторые атрибуты из Nifi в Kafka и снова прочитать их в Nifi.

2. @DennisJaheruddin сделал то же самое .. добавил атрибут с помощью evaluatejsonpath -> putkafka, затем ConsumeKafka-> logattribute

Ответ №1:

Для передачи атрибутов вы должны использовать заголовки Kafka, в противном случае невозможно передать атрибуты, поскольку они не являются частью тела файла потока, который станет телом сообщения в Kafka.

На стороне публикации PublishKafka_2_0 имеет следующее свойство, чтобы указать, какие атрибуты отправлять в качестве заголовков:

 static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new PropertyDescriptor.Builder()
        .name("attribute-name-regex")
        .displayName("Attributes to Send as Headers (Regex)")
        .description("A Regular Expression that is matched against all FlowFile attribute names. "
              "Any attribute whose name matches the regex will be added to the Kafka messages as a Header. "
              "If not specified, no FlowFile attributes will be added as headers.")
        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
        .expressionLanguageSupported(ExpressionLanguageScope.NONE)
        .required(false)
        .build();
  

На стороне потребления, ConsumeKafka_2_0 имеет следующее свойство, чтобы указать, какие поля заголовка добавлять в качестве атрибутов:

 static final PropertyDescriptor HEADER_NAME_REGEX = new PropertyDescriptor.Builder()
        .name("header-name-regex")
        .displayName("Headers to Add as Attributes (Regex)")
        .description("A Regular Expression that is matched against all message headers. "
              "Any message header whose name matches the regex will be added to the FlowFile as an Attribute. "
              "If not specified, no Header values will be added as FlowFile attributes. If two messages have a different value for the same header and that header is selected by "
              "the provided regex, then those two messages must be added to different FlowFiles. As a result, users should be cautious about using a regex like "
              "".*" if messages are expected to have header values that are unique per message, such as an identifier or timestamp, because it will prevent NiFi from bundling "
              "the messages together efficiently.")
        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
        .expressionLanguageSupported(ExpressionLanguageScope.NONE)
        .required(false)
        .build();
  

Комментарии:

1. У меня есть атрибут с именем testid, при размещении его в kafka я использовал publishkafka_2 с «Атрибутами для отправки в виде заголовков (Regex)» в качестве теста.* и использовал consumekafka_2.0, но все еще не смог получить testid в качестве атрибута в flowfile

2. Установлен ли у вас «Демаркатор сообщений»? Вы можете получать значения заголовка, только если вы выполняете 1 сообщение kafka на 1 файл потока, что означает, что у вас не может быть демаркатора

3. После установки того же заголовка в ConsumeKafka это сработало. Спасибо