#apache-flink
#apache-flink
Вопрос:
Я использую управляемый неоконный поток. Попытка свести вышеупомянутые потоковые объекты в один, сведя оба значения к одному путем суммирования переменной в этом объекте и возврата этого объекта. Ниже приведен пример
Например, есть два объекта, object1 и object2, оба имеют переменную count .
object1 ->
name
count
object2 ->
name
count
Функция уменьшения неоконного потока с ключом Flink на самом деле не уменьшает и не возвращает один объект. Он возвращает оба объекта: object1 как есть и object2 с обновленным количеством
deDupedStream.keyBy(msg -> keyConstruct())
.reduce((ReduceFunction<Object>) (value1, value2) -> {
value2.setCount(value1.getCount() value2.getCount());
return value2;
})
вышеуказанные результаты:
actual:
value1 -> object with no change
value2 -> object with updated count
expectation:
value2 -> object with updated count
То, что я ожидал, вернет только обновленный объект, но не оба. Это то, что я предполагаю как уменьшение. Может кто-нибудь, пожалуйста, подсказать, как я могу вывести только обновленный?
Обновление: логика перед вышеупомянутым шагом агрегации: дедупликация.
SingleOutputStreamOperator<Object> deDupedStream = inputStream.keyBy(new DeDupKeyConstructor())
.timeWindow(Time.milliseconds(timeWindowInMillis))
.reduce(new DedupReduceFunction(), new DedupProcessFunction());
DedupeProcessFunction.java
public class DedupProcessFunction extends ProcessWindowFunction<Class1, Class1, String, TimeWindow> {
@Override
public void process(String key, Context context, Iterable<Class1> in, Collector<Class1> out) throws Exception {
Class1 obj = in.iterator().next();
obj.setWindowStartTime(context.window().getStart());
out.collect(obj);
}
}
DedupeReduceFunction.java
public class DedupReduceFunction implements ReduceFunction<Class1> {
@Override
public Class1 reduce(Class1 value1, Class1 value2) throws Exception {
return value1.getEventTimeStamp() > value2.getEventTimeStamp() ? value1 : value2;
}
}
Приведенный выше поток фактически собирает данные за 1 час и удаляет дубликаты среди них
Ниже я пытаюсь выполнить агрегацию. На самом деле я пытался, смогу ли я объединить обе логики в одно окно. Давайте также предположим, что оба конструктора ключей одинаковы. Если я попытаюсь объединить оба в одну оконную логику, я могу уменьшить количество операторов в задании. Вот как я начал играть, чтобы удалить одно окно и посмотреть, как оно себя ведет. Если это невозможно, как я могу объединить две логики в одну (предполагая, что логика группировки ключей почти одинакова (только одна переменная удаляется в aggregateKeyConstructor из DeDupKeyConstructor)).
SingleOutputStreamOperator<Object> aggregatedStream = deDupedStream
.keyBy(aggregateKeyConstructor())
.timeWindow(Time.milliseconds(timeWindowInMillis))
.reduce(new AggregateReduceFunction(), new AggregateProcessFunction());
AggregateProcessFunction.java
public class AggregateProcessFunction extends ProcessWindowFunction<Class1, Class1, String, TimeWindow> {
@Override
public void process(String key, Context context, Iterable<Class1> in, Collector<Class1> out) throws Exception {
out.collect(in.iterator().next());
}
}
AggregateReduceFunction.java
public class AggregateReduceFunction implements ReduceFunction<Class1> {
@Override
public Class1 reduce(Class1 value1, Class1 value2) {
Class1 value = value1.getEventTimeStamp() > value2.getEventTimeStamp() ? value1 : value2;
value.setCount(value1.getCount() value2.getCount());
return value;
}
}
Комментарии:
1. Это постепенное уменьшение: когда появляется первый элемент, его еще не с чем уменьшить, поэтому он выдается в качестве начального значения. Если вы ожидаете здесь одно значение, сколько вы ожидаете, когда нужно уменьшить три, четыре, … элемента, а не два?
2. Я по-прежнему ожидаю только один объект в качестве вывода, если в этой группе с ключом есть N объектов.
3. Я думаю, есть ли выбор не испускать первый объект, а только испускать обновленный объект. Я проверяю возможность без использования окна.
4. Если N объектов в одной и той же ключевой группе все равно должны приводить к единому выводу, возникает следующий логический вопрос: как вы узнаете, когда последний элемент ключевой группы прибыл в неограниченный поток, чтобы теперь вы могли выдать результат? Я думаю, вам нужно ответить на этот вопрос для вашего варианта использования, а затем реализовать пользовательскую расширенную функцию, которая выполняет агрегацию.
5. Теперь я понимаю, о чем вы говорите. Я не могу предоставить весь код, возможно, это было не нужно. Но с вашим вопросом я должен включить эту логику. Позвольте мне уточнить вопрос о том, что я делаю перед этой агрегацией