Как управлять 3 потоками данных в задании Flink?

#apache-flink

#apache-flink

Вопрос:

У нас есть 3 java-pojos,

 class Foo{
 int id;
 String name;
 List<Bar1> list1;
 List<Bar2> list2;
}

class Bar1{
 int id;
 String field_x;
 String field_y;
}

class Bar2{
 int id;
 String field_a;
 String field_b;
}
  

И у нас есть 3 потока данных в нашем задании Flink,

 class Test{
 public static void main(...){
  DataStream<Foo> ds1 = ...;
  DataStream<Bar1> ds2 = ...;
  DataStream<Bar2> ds3 = ...;
 }
}
  

Для каждого идентификатора будет только один объект Foo, в то время как объектов Bar1 и Bar2 может быть несколько.

Что мы хотим сделать, так это для каждого Foo в ds1 найти все Bar1 с одинаковым идентификатором в ds2 и поместить их в list1, найти все Bar2 с одинаковым идентификатором в ds3 и поместить их в list2.

Каков наилучший способ?

Ответ №1:

Операторы потока данных Flink поддерживают до двух входных потоков. Существует два распространенных способа реализации операций с тремя потоками:

  1. с помощью двух двоичных операций. В вашем случае это очень просто, поскольку Bar1 и Bar2 не связаны друг с другом. Это будет выглядеть примерно следующим образом:
 DataStream<Foo> withList1 = ds1
  .connect(ds2).keyBy("id", "id")
  .process(
    // your processing logic
    new CoProcessFunction<Foo, Bar1, Foo>(){...});
DataStream<Foo> withList1AndList2 = withList1
  .connect(ds3).keyBy("id", "id")
  .process(
    // your processing logic
    new CoProcessFunction<Foo, Bar2, Foo>(){...});
  
  1. путем объединения всех трех потоков в один поток с общим типом данных (например, POJO с тремя полями foo , bar1 и bar2 , из которых используется только одно поле, и с использованием оператора с одним вводом для обработки объединенного потока.
 // map Foo to CommonType
DataStream<CommonType> common1 = ds1.map(new MapFunction<Foo, CommonType>(){...}); 
// map Bar1 to CommonType
DataStream<CommonType> common2 = ds2.map(new MapFunction<Bar1, CommonType>(){...});
// map Bar2 to CommonType
DataStream<CommonType> common3 = ds3.map(new MapFunction<Bar2, CommonType>(){...});

DataStream<Foo> withList1AndList2 = ds1.union(ds2, ds3)
  .keyBy("id")
  .process(
    // your processing logic
    new KeyedProcessFunction<CommonType, Foo>(){...});
  

Вы также можете просто объединить ds2 и ds3 и использовать двоичный оператор.

Более серьезной проблемой может быть определение того, когда были получены все Bar1 и Bar2 события, чтобы вы могли выдать результат. Опять же, есть несколько вариантов (в зависимости от вашего варианта использования).

  1. если Foo известно, сколько Bar1 и Bar2 ему нужно подождать, решение очевидно.
  2. если Foo неизвестно, сколько событий нужно ждать, вы можете попробовать отправить уведомление, сигнализирующее о том, что было отправлено последнее Bar1 или Bar2 .
  3. вы также можете работать с тайм-аутом, если знаете, что все Bar1 или Bar2 должны прибыть в течение x секунд / минут / и т.д.

Комментарии:

1. Спасибо за ответ, поскольку я совсем новичок в Flink, возможно ли, чтобы вы предоставили фрагмент кода, который был бы очень полезен.

2. Я добавил несколько фрагментов, чтобы сделать общий поток понятным

3. Привет, Фабиан, у меня все еще возникают проблемы с написанием «процесса (…)». Я выяснил, как это сделать при наличии временного окна, поскольку у него есть итератор, я могу пройти через итератор. Но я понятия не имею, что делать, когда нет временного окна. Не могли бы вы, пожалуйста, помочь привести какой-нибудь пример? (предположим, что все строки Bar1 или Bar2 должны поступить в течение 3 минут). Большое спасибо.

4. Идея состоит в том, чтобы использовать таймеры (через Context объект). Когда вы получаете Foo объект, вы сохраняете его в состоянии и регистрируете таймер на 3 минуты вперед. В течение этих 3 минут вы добавляете все Bar1 и Bar2 объекты к Foo объекту, т. Е. обновляете состояние. Через 3 минуты таймер вызывает onTimer() метод обратного вызова, который генерирует Foo объект и очищает состояние.

5. Понял, я думаю, мне нужно изучить соответствующие государству знания. Еще раз спасибо!