Фильтрация с учетом состояния / flatMapValues в потоках Kafka?

#apache-kafka #apache-kafka-streams

#apache-kafka #apache-kafka-streams

Вопрос:

Я пытаюсь написать простое приложение Kafka Streams (ориентированное на Kafka 2.2 / Confluent 5.2), чтобы преобразовать входную тему с семантикой «по крайней мере один раз» в выходной поток «ровно один раз». Я хотел бы закодировать следующую логику:

  • Для каждого сообщения с заданным ключом:
    • Считывает временную метку сообщения из строкового поля в значении сообщения
    • Извлеките наибольшую временную метку, которую мы ранее видели для этого ключа, из локального хранилища состояний
      • Если временная метка сообщения меньше или равна временной метке в хранилище состояний, ничего не выдавайте
      • Если временная метка больше, чем временная метка в хранилище состояний, или ключ не существует в хранилище состояний, отправьте сообщение и обновите хранилище состояний ключом / меткой времени сообщения

(Это гарантированно обеспечивает правильные результаты, основанные на гарантиях упорядочения, которые мы получаем от вышестоящей системы; я не пытаюсь сделать здесь ничего волшебного.)

Сначала я думал, что смогу сделать это с помощью оператора потоков Kafka flatMapValues , который позволяет сопоставлять каждое входное сообщение с нулем или более выходных сообщений с одним и тем же ключом. Однако эта документация явно предупреждает:

Это операция записи записи без состояния (см. transformValues(ValueTransformerSupplier, String …) для преобразования значения с сохранением состояния).

Это звучит многообещающе, но в transformValues документации не ясно, как выдавать ноль или одно выходное сообщение на входное сообщение. Если только это не то, что // or null пытается сказать в стороне в примере?

flatTransform также выглядело несколько многообещающе, но мне не нужно манипулировать ключом, и, если возможно, я бы хотел избежать перераспределения.

Кто-нибудь знает, как правильно выполнять такую фильтрацию?

Комментарии:

1. Примечание: flatTransformValues() добавляется в потоки Kafka и, скорее всего, будет доступна в следующей версии Kafka, которая является версией v2.3. См cwiki.apache.org/confluence/display/KAFKA /… на случай, если вам понадобится больше деталей. Сегодня это вам не поможет, но, возможно, вы захотите запомнить это на потом.

Ответ №1:

вы могли бы использовать Transformer для реализации операций с сохранением состояния, как вы описали выше. Чтобы не распространять сообщение по потоку, вам необходимо вернуть null from transform метод, упомянутый в Transformer java doc . И вы могли бы управлять распространением через processorContext.forward(key, value) . Упрощенный пример приведен ниже

kStream.transform(() -> new DemoTransformer(stateStoreName), stateStoreName)

 public class DemoTransformer implements Transformer<String, String, KeyValue<String, String>> {
    private ProcessorContext processorContext;
    private String stateStoreName;
    private KeyValueStore<String, String> keyValueStore;

    public DemoTransformer(String stateStoreName) {
        this.stateStoreName = stateStoreName;
    }

    @Override
    public void init(ProcessorContext processorContext) {
        this.processorContext = processorContext;
        this.keyValueStore = (KeyValueStore) processorContext.getStateStore(stateStoreName);
    }

    @Override
    public KeyValue<String, String> transform(String key, String value) {
        String existingValue = keyValueStore.get(key);
        if (/* your condition */) {
            processorContext.forward(key, value);
            keyValueStore.put(key, value);
        }

        return null;
    }

    @Override
    public void close() {
    }
}
 

Комментарии:

1. Спасибо! Так что нормально transform всегда возвращать null и передавать все данные через побочные эффекты на ProcessorContext ? Я не могу сказать из документов forward , но работает ли это без перераспределения выходных данных?

2. это нормально использовать только для передачи данных внутри. processorContext.forward transform вывод будет в том же разделе.