#apache-flink
#apache-flink
Вопрос:
У нас есть 3 java-pojos,
class Foo{
int id;
String name;
List<Bar1> list1;
List<Bar2> list2;
}
class Bar1{
int id;
String field_x;
String field_y;
}
class Bar2{
int id;
String field_a;
String field_b;
}
И у нас есть 3 потока данных в нашем задании Flink,
class Test{
public static void main(...){
DataStream<Foo> ds1 = ...;
DataStream<Bar1> ds2 = ...;
DataStream<Bar2> ds3 = ...;
}
}
Для каждого идентификатора будет только один объект Foo, в то время как объектов Bar1 и Bar2 может быть несколько.
Что мы хотим сделать, так это для каждого Foo в ds1 найти все Bar1 с одинаковым идентификатором в ds2 и поместить их в list1, найти все Bar2 с одинаковым идентификатором в ds3 и поместить их в list2.
Каков наилучший способ?
Ответ №1:
Операторы потока данных Flink поддерживают до двух входных потоков. Существует два распространенных способа реализации операций с тремя потоками:
- с помощью двух двоичных операций. В вашем случае это очень просто, поскольку
Bar1
иBar2
не связаны друг с другом. Это будет выглядеть примерно следующим образом:
DataStream<Foo> withList1 = ds1
.connect(ds2).keyBy("id", "id")
.process(
// your processing logic
new CoProcessFunction<Foo, Bar1, Foo>(){...});
DataStream<Foo> withList1AndList2 = withList1
.connect(ds3).keyBy("id", "id")
.process(
// your processing logic
new CoProcessFunction<Foo, Bar2, Foo>(){...});
- путем объединения всех трех потоков в один поток с общим типом данных (например, POJO с тремя полями
foo
,bar1
иbar2
, из которых используется только одно поле, и с использованием оператора с одним вводом для обработки объединенного потока.
// map Foo to CommonType
DataStream<CommonType> common1 = ds1.map(new MapFunction<Foo, CommonType>(){...});
// map Bar1 to CommonType
DataStream<CommonType> common2 = ds2.map(new MapFunction<Bar1, CommonType>(){...});
// map Bar2 to CommonType
DataStream<CommonType> common3 = ds3.map(new MapFunction<Bar2, CommonType>(){...});
DataStream<Foo> withList1AndList2 = ds1.union(ds2, ds3)
.keyBy("id")
.process(
// your processing logic
new KeyedProcessFunction<CommonType, Foo>(){...});
Вы также можете просто объединить ds2
и ds3
и использовать двоичный оператор.
Более серьезной проблемой может быть определение того, когда были получены все Bar1
и Bar2
события, чтобы вы могли выдать результат. Опять же, есть несколько вариантов (в зависимости от вашего варианта использования).
- если
Foo
известно, сколькоBar1
иBar2
ему нужно подождать, решение очевидно. - если
Foo
неизвестно, сколько событий нужно ждать, вы можете попробовать отправить уведомление, сигнализирующее о том, что было отправлено последнееBar1
илиBar2
. - вы также можете работать с тайм-аутом, если знаете, что все
Bar1
илиBar2
должны прибыть в течение x секунд / минут / и т.д.
Комментарии:
1. Спасибо за ответ, поскольку я совсем новичок в Flink, возможно ли, чтобы вы предоставили фрагмент кода, который был бы очень полезен.
2. Я добавил несколько фрагментов, чтобы сделать общий поток понятным
3. Привет, Фабиан, у меня все еще возникают проблемы с написанием «процесса (…)». Я выяснил, как это сделать при наличии временного окна, поскольку у него есть итератор, я могу пройти через итератор. Но я понятия не имею, что делать, когда нет временного окна. Не могли бы вы, пожалуйста, помочь привести какой-нибудь пример? (предположим, что все строки Bar1 или Bar2 должны поступить в течение 3 минут). Большое спасибо.
4. Идея состоит в том, чтобы использовать таймеры (через
Context
объект). Когда вы получаетеFoo
объект, вы сохраняете его в состоянии и регистрируете таймер на 3 минуты вперед. В течение этих 3 минут вы добавляете всеBar1
иBar2
объекты кFoo
объекту, т. Е. обновляете состояние. Через 3 минуты таймер вызываетonTimer()
метод обратного вызова, который генерируетFoo
объект и очищает состояние.5. Понял, я думаю, мне нужно изучить соответствующие государству знания. Еще раз спасибо!