Водяной знак сильно отстал в Flink CEP

#apache-flink #flink-streaming #flink-cep #flink-sql

#apache-flink #flink-потоковая передача #flink-cep #flink-sql

Вопрос:

Я использую Flink CEP для обнаружения шаблонов по событиям из Kafka. Для простоты события имеют только один тип. Я пытаюсь обнаружить изменение значения поля в непрерывном потоке событий. Код выглядит следующим образом

 val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamEnv.addSource(new FlinkKafkaConsumer[..]())
          .filter(...)
          .map(...)
          .assignTimestampsAndWatermarks(
            WatermarkStrategy.forMonotonousTimestamps[Event]().withTimestampAssigner(..)
          )
          .keyBy(...)(TypeInformation.of(classOf[...]))
    
val pattern: Pattern[Event, _] = 
          Pattern.begin[Event]("start", AfterMatchSkipStrategy.skipPastLastEvent()).times(1)
          .next("middle")
          .oneOrMore()
          .optional()
          .where(new IterativeCondition[Event] {
             override def filter(event: Event, ctx:...): Boolean = {
                 val startTrafficEvent = ctx.getEventsForPattern("start").iterator().next()
                 startTrafficEvent.getFieldValue().equals(event.getFieldValue())
             }
          })
          .next("end").times(1)
          .where(new IterativeCondition[Event] {
             override def filter(event: Event, ctx:...): Boolean = {
                  val startTrafficEvent = ctx.getEventsForPattern("start").iterator().next()
                  !startTrafficEvent.getFieldValue().equals(event.getFieldValue())
            }
          })
          .within(Time.seconds(30))
  

Тема Kafka состоит из 104 разделов, события равномерно распределены по разделам. Когда я отправлял задание, parallelism было установлено значение 104.

Из веб-интерфейса было 2 задачи: первая Source->filter->map->timestamp/watermark ; вторая CepOperator->sink . Каждая задача получила 104 параллелизма.

Нагрузка на подзадачи была неравномерной, она должна исходить из keyBy . Водяные знаки среди подзадач были разными, но они начали зависать на значении, долгое время не меняясь. Из журналов я вижу, что CEP продолжал оценивать события, и сопоставленные результаты отправлялись в нижестоящий приемник.

Скорость событий составляла 10 кб / с, а противодавление первой задачи сохранялось high и во второй ok .

Пожалуйста, помогите объяснить, что произошло в CEP и как устранить проблему

Спасибо

Ответ №1:

Более внимательно рассмотрев ваш вопрос, я пересматриваю свой ответ.

Похоже, что CEP продолжает производить совпадения, и они выталкиваются в приемник, но задача CEP sink создает высокое противодавление. Что могло бы помочь, так это идентифицировать причину противодавления.

Если события доступны для чтения из всех разделов, и все же водяные знаки продвигаются лишь незначительно, похоже, что противодавление достаточно сильное, чтобы вообще не допустить попадания событий.

Я подозреваю

  1. комбинаторный взрыв усилий в движке CEP и / или
  2. достаточно совпадений, за которыми не может уследить приемник

как вероятные причины.

Несколько идей для получения дополнительной информации:

(1) Попробуйте использовать профилировщик, чтобы определить, является ли CepOperator узким местом, и, возможно, определить, что он делает.

(2) Отключите цепочку операторов между CepOperator и приемником, чтобы изолировать CEP — просто как этап отладки. Это даст вам лучшую видимость (с помощью метрик и мониторинга противодавления) относительно того, что делают CEP и приемник.

(3) Протестируйте это в меньшей настройке и расширьте ведение журнала CEP.

Комментарии:

1. Спасибо за ваш ответ, Дэвид. Когда я проверил подзадачу с отложенным водяным знаком, я обнаружил, что подзадача когда-либо получала данные. Как вы думаете, возможно ли, что CEP занимает слишком много времени для обработки данных в этом разделе и в конечном итоге замедляет всю работу?

2. Вы имели в виду «никогда не получал» или «все еще получен»?

3. я имею в виду «все еще получен»

4. На самом деле мы равномерно распределили события по разделам kafka. После keyBy вызова количество событий под каждым ключом неравномерно, и водяные знаки в KeyedStream застряли

5. Я включил контрольную точку с интервалом 15 минут и тайм-аутом 10 минут, но не удалось записать состояния из-за тайм-аута