Flink уменьшает управляемый неоконный поток

#apache-flink

#apache-flink

Вопрос:

Я использую управляемый неоконный поток. Попытка свести вышеупомянутые потоковые объекты в один, сведя оба значения к одному путем суммирования переменной в этом объекте и возврата этого объекта. Ниже приведен пример

Например, есть два объекта, object1 и object2, оба имеют переменную count .

 object1 -> 
    name
    count
object2 ->
    name
    count
 

Функция уменьшения неоконного потока с ключом Flink на самом деле не уменьшает и не возвращает один объект. Он возвращает оба объекта: object1 как есть и object2 с обновленным количеством

 deDupedStream.keyBy(msg -> keyConstruct())
.reduce((ReduceFunction<Object>) (value1, value2) -> {
    value2.setCount(value1.getCount()   value2.getCount());
    return value2;
})
 

вышеуказанные результаты:

 actual:
value1 -> object with no change
value2 -> object with updated count

expectation:
value2 -> object with updated count
 

То, что я ожидал, вернет только обновленный объект, но не оба. Это то, что я предполагаю как уменьшение. Может кто-нибудь, пожалуйста, подсказать, как я могу вывести только обновленный?

Обновление: логика перед вышеупомянутым шагом агрегации: дедупликация.

 SingleOutputStreamOperator<Object> deDupedStream = inputStream.keyBy(new DeDupKeyConstructor())
                .timeWindow(Time.milliseconds(timeWindowInMillis))
                .reduce(new DedupReduceFunction(), new DedupProcessFunction());
 

DedupeProcessFunction.java

 public class DedupProcessFunction extends ProcessWindowFunction<Class1, Class1, String, TimeWindow> {

    @Override
    public void process(String key, Context context, Iterable<Class1> in, Collector<Class1> out) throws Exception {
        Class1 obj = in.iterator().next();
        obj.setWindowStartTime(context.window().getStart());
        out.collect(obj);
    }
}
 

DedupeReduceFunction.java

 public class DedupReduceFunction implements ReduceFunction<Class1> {
    @Override
    public Class1 reduce(Class1 value1, Class1 value2) throws Exception {
        return value1.getEventTimeStamp() > value2.getEventTimeStamp() ? value1 : value2;
    }
}
 

Приведенный выше поток фактически собирает данные за 1 час и удаляет дубликаты среди них

Ниже я пытаюсь выполнить агрегацию. На самом деле я пытался, смогу ли я объединить обе логики в одно окно. Давайте также предположим, что оба конструктора ключей одинаковы. Если я попытаюсь объединить оба в одну оконную логику, я могу уменьшить количество операторов в задании. Вот как я начал играть, чтобы удалить одно окно и посмотреть, как оно себя ведет. Если это невозможно, как я могу объединить две логики в одну (предполагая, что логика группировки ключей почти одинакова (только одна переменная удаляется в aggregateKeyConstructor из DeDupKeyConstructor)).

 SingleOutputStreamOperator<Object> aggregatedStream = deDupedStream
.keyBy(aggregateKeyConstructor())
.timeWindow(Time.milliseconds(timeWindowInMillis))
.reduce(new AggregateReduceFunction(), new AggregateProcessFunction());
 

AggregateProcessFunction.java

 public class AggregateProcessFunction extends ProcessWindowFunction<Class1, Class1, String, TimeWindow> {

    @Override
    public void process(String key, Context context, Iterable<Class1> in, Collector<Class1> out) throws Exception {
        out.collect(in.iterator().next());
    }
}
 

AggregateReduceFunction.java

 public class AggregateReduceFunction implements ReduceFunction<Class1> {
    @Override
    public Class1 reduce(Class1 value1, Class1 value2) {
        Class1 value =  value1.getEventTimeStamp() > value2.getEventTimeStamp() ? value1 : value2;
        value.setCount(value1.getCount()   value2.getCount());
        return value;
    }
}
 

Комментарии:

1. Это постепенное уменьшение: когда появляется первый элемент, его еще не с чем уменьшить, поэтому он выдается в качестве начального значения. Если вы ожидаете здесь одно значение, сколько вы ожидаете, когда нужно уменьшить три, четыре, … элемента, а не два?

2. Я по-прежнему ожидаю только один объект в качестве вывода, если в этой группе с ключом есть N объектов.

3. Я думаю, есть ли выбор не испускать первый объект, а только испускать обновленный объект. Я проверяю возможность без использования окна.

4. Если N объектов в одной и той же ключевой группе все равно должны приводить к единому выводу, возникает следующий логический вопрос: как вы узнаете, когда последний элемент ключевой группы прибыл в неограниченный поток, чтобы теперь вы могли выдать результат? Я думаю, вам нужно ответить на этот вопрос для вашего варианта использования, а затем реализовать пользовательскую расширенную функцию, которая выполняет агрегацию.

5. Теперь я понимаю, о чем вы говорите. Я не могу предоставить весь код, возможно, это было не нужно. Но с вашим вопросом я должен включить эту логику. Позвольте мне уточнить вопрос о том, что я делаю перед этой агрегацией