#apache-kafka #apache-kafka-streams
#apache-kafka #apache-kafka-streams
Вопрос:
Я пытаюсь написать простое приложение Kafka Streams (ориентированное на Kafka 2.2 / Confluent 5.2), чтобы преобразовать входную тему с семантикой «по крайней мере один раз» в выходной поток «ровно один раз». Я хотел бы закодировать следующую логику:
- Для каждого сообщения с заданным ключом:
- Считывает временную метку сообщения из строкового поля в значении сообщения
- Извлеките наибольшую временную метку, которую мы ранее видели для этого ключа, из локального хранилища состояний
- Если временная метка сообщения меньше или равна временной метке в хранилище состояний, ничего не выдавайте
- Если временная метка больше, чем временная метка в хранилище состояний, или ключ не существует в хранилище состояний, отправьте сообщение и обновите хранилище состояний ключом / меткой времени сообщения
(Это гарантированно обеспечивает правильные результаты, основанные на гарантиях упорядочения, которые мы получаем от вышестоящей системы; я не пытаюсь сделать здесь ничего волшебного.)
Сначала я думал, что смогу сделать это с помощью оператора потоков Kafka flatMapValues
, который позволяет сопоставлять каждое входное сообщение с нулем или более выходных сообщений с одним и тем же ключом. Однако эта документация явно предупреждает:
Это операция записи записи без состояния (см. transformValues(ValueTransformerSupplier, String …) для преобразования значения с сохранением состояния).
Это звучит многообещающе, но в transformValues
документации не ясно, как выдавать ноль или одно выходное сообщение на входное сообщение. Если только это не то, что // or null
пытается сказать в стороне в примере?
flatTransform
также выглядело несколько многообещающе, но мне не нужно манипулировать ключом, и, если возможно, я бы хотел избежать перераспределения.
Кто-нибудь знает, как правильно выполнять такую фильтрацию?
Комментарии:
1. Примечание: flatTransformValues() добавляется в потоки Kafka и, скорее всего, будет доступна в следующей версии Kafka, которая является версией v2.3. См cwiki.apache.org/confluence/display/KAFKA /… на случай, если вам понадобится больше деталей. Сегодня это вам не поможет, но, возможно, вы захотите запомнить это на потом.
Ответ №1:
вы могли бы использовать Transformer
для реализации операций с сохранением состояния, как вы описали выше. Чтобы не распространять сообщение по потоку, вам необходимо вернуть null
from transform
метод, упомянутый в Transformer
java doc . И вы могли бы управлять распространением через processorContext.forward(key, value)
. Упрощенный пример приведен ниже
kStream.transform(() -> new DemoTransformer(stateStoreName), stateStoreName)
public class DemoTransformer implements Transformer<String, String, KeyValue<String, String>> {
private ProcessorContext processorContext;
private String stateStoreName;
private KeyValueStore<String, String> keyValueStore;
public DemoTransformer(String stateStoreName) {
this.stateStoreName = stateStoreName;
}
@Override
public void init(ProcessorContext processorContext) {
this.processorContext = processorContext;
this.keyValueStore = (KeyValueStore) processorContext.getStateStore(stateStoreName);
}
@Override
public KeyValue<String, String> transform(String key, String value) {
String existingValue = keyValueStore.get(key);
if (/* your condition */) {
processorContext.forward(key, value);
keyValueStore.put(key, value);
}
return null;
}
@Override
public void close() {
}
}
Комментарии:
1. Спасибо! Так что нормально
transform
всегда возвращать null и передавать все данные через побочные эффекты наProcessorContext
? Я не могу сказать из документовforward
, но работает ли это без перераспределения выходных данных?2. это нормально использовать только для передачи данных внутри.
processorContext.forward
transform
вывод будет в том же разделе.