#apache-flink #flink-streaming #flink-cep #flink-sql
#apache-flink #flink-потоковая передача #flink-cep #flink-sql
Вопрос:
Я использую Flink CEP для обнаружения шаблонов по событиям из Kafka. Для простоты события имеют только один тип. Я пытаюсь обнаружить изменение значения поля в непрерывном потоке событий. Код выглядит следующим образом
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamEnv.addSource(new FlinkKafkaConsumer[..]())
.filter(...)
.map(...)
.assignTimestampsAndWatermarks(
WatermarkStrategy.forMonotonousTimestamps[Event]().withTimestampAssigner(..)
)
.keyBy(...)(TypeInformation.of(classOf[...]))
val pattern: Pattern[Event, _] =
Pattern.begin[Event]("start", AfterMatchSkipStrategy.skipPastLastEvent()).times(1)
.next("middle")
.oneOrMore()
.optional()
.where(new IterativeCondition[Event] {
override def filter(event: Event, ctx:...): Boolean = {
val startTrafficEvent = ctx.getEventsForPattern("start").iterator().next()
startTrafficEvent.getFieldValue().equals(event.getFieldValue())
}
})
.next("end").times(1)
.where(new IterativeCondition[Event] {
override def filter(event: Event, ctx:...): Boolean = {
val startTrafficEvent = ctx.getEventsForPattern("start").iterator().next()
!startTrafficEvent.getFieldValue().equals(event.getFieldValue())
}
})
.within(Time.seconds(30))
Тема Kafka состоит из 104 разделов, события равномерно распределены по разделам. Когда я отправлял задание, parallelism
было установлено значение 104.
Из веб-интерфейса было 2 задачи: первая Source->filter->map->timestamp/watermark
; вторая CepOperator->sink
. Каждая задача получила 104 параллелизма.
Нагрузка на подзадачи была неравномерной, она должна исходить из keyBy
. Водяные знаки среди подзадач были разными, но они начали зависать на значении, долгое время не меняясь. Из журналов я вижу, что CEP продолжал оценивать события, и сопоставленные результаты отправлялись в нижестоящий приемник.
Скорость событий составляла 10 кб / с, а противодавление первой задачи сохранялось high
и во второй ok
.
Пожалуйста, помогите объяснить, что произошло в CEP и как устранить проблему
Спасибо
Ответ №1:
Более внимательно рассмотрев ваш вопрос, я пересматриваю свой ответ.
Похоже, что CEP продолжает производить совпадения, и они выталкиваются в приемник, но задача CEP sink создает высокое противодавление. Что могло бы помочь, так это идентифицировать причину противодавления.
Если события доступны для чтения из всех разделов, и все же водяные знаки продвигаются лишь незначительно, похоже, что противодавление достаточно сильное, чтобы вообще не допустить попадания событий.
Я подозреваю
- комбинаторный взрыв усилий в движке CEP и / или
- достаточно совпадений, за которыми не может уследить приемник
как вероятные причины.
Несколько идей для получения дополнительной информации:
(1) Попробуйте использовать профилировщик, чтобы определить, является ли CepOperator узким местом, и, возможно, определить, что он делает.
(2) Отключите цепочку операторов между CepOperator и приемником, чтобы изолировать CEP — просто как этап отладки. Это даст вам лучшую видимость (с помощью метрик и мониторинга противодавления) относительно того, что делают CEP и приемник.
(3) Протестируйте это в меньшей настройке и расширьте ведение журнала CEP.
Комментарии:
1. Спасибо за ваш ответ, Дэвид. Когда я проверил подзадачу с отложенным водяным знаком, я обнаружил, что подзадача когда-либо получала данные. Как вы думаете, возможно ли, что CEP занимает слишком много времени для обработки данных в этом разделе и в конечном итоге замедляет всю работу?
2. Вы имели в виду «никогда не получал» или «все еще получен»?
3. я имею в виду «все еще получен»
4. На самом деле мы равномерно распределили события по разделам kafka. После
keyBy
вызова количество событий под каждым ключом неравномерно, и водяные знаки в KeyedStream застряли5. Я включил контрольную точку с интервалом 15 минут и тайм-аутом 10 минут, но не удалось записать состояния из-за тайм-аута