Обработка опасных сообщений в Apache Flink

#apache-flink

#apache-flink

Вопрос:

Я пытаюсь определить наилучшие методы работы с ядовитыми сообщениями / необработанными исключениями с помощью Apache Flink. У нас есть работа по обработке событий в реальном времени данных о местоположении с устройств Интернета вещей. Существует два возможных сценария, в которых это может возникнуть:

  1. Данные каким-то образом неверны — например, недопустимое значение
  2. Данные вызывают ошибку из-за какого-то крайнего случая, которого мы не ожидали.

В настоящее время вся моя обработка данных останавливается только из-за одного сообщения.

Я видел два предложения:

  1. Перехват исключений — это требует, чтобы я оборачивал каждую часть логики чем-то, чтобы перехватывать каждое исключение во время выполнения
  2. Используйте побочные выходы как своего рода DLQ — из того, что я могу сказать, это, похоже, вариант # 1, где я должен перехватывать все исключения и отправлять их на боковой вывод.

Неужели нет другого способа сделать это, кроме как обернуть каждую часть логики обработкой исключений? Нет ли общего способа перехватывать исключения и не продолжать обработку?

Комментарии:

1. откуда приходят сообщения? Kafka?

2. Почему это имеет значение? Иногда Rabbit MQ, иногда MQTT, иногда через websockets. Rabbit MQ — единственный готовый источник, который мы используем.

Ответ №1:

Я думаю, идея состоит не в том, чтобы перехватывать все виды исключений и отправлять их в другое место, а в том, чтобы иметь хорошо протестированный и функционирующий код и использовать мертвые буквы только для недопустимых входных данных.

Таким образом, типичный конвейер будет

 source => validate => ... => sink
                  => dead letter queue
  

Как только ваша запись проходит ваш оператор validate, вы хотите, чтобы все ошибки всплывали, поскольку любая ошибка в этих операторах может привести к повреждению агрегированных данных и данных, которые после записи не могут быть легко восстановлены.

Шаг проверки будет работать с любым из двух описанных вами подходов. Как правило, побочные выходы имеют лучшую семантику, но в итоге может получиться больше кода.


Теперь у вас может быть служба с высокими SLA, и вы действительно хотите, чтобы она выдавала выходные данные, даже если она повреждена только для получения данных. Или у вас есть простой конвейер преобразования, в котором вы пропустите некоторые события, но сохраните большинство (а нисходящий поток может иметь дело с неполными данными). Тогда вы правы в том, что вам нужно обернуть код всех операторов с помощью try-catch . Однако, как правило, вы все равно будете делать это только для хрупких операторов, а не для всех из них. Тривиальные операторы должны быть протестированы, а затем доверены для работы. Кроме того, вы обычно перехватываете только определенные типы исключений, чтобы ограничить область действия ожидаемыми исключениями, которые могут произойти.

Вы можете задаться вопросом, почему Flink не включает его в качестве шаблона по умолчанию. Насколько я вижу, есть две причины:

  1. Если Flink молча игнорирует какие-либо исключения и отправляет дополнительное сообщение вторичному приемнику, как Flink может гарантировать, что после этого вызывающий оператор находится в нормальном состоянии? Как можно избежать любых утечек, которые могут произойти из-за того, что код очистки не выполняется?
  2. В Java чаще всего разработчики явно рассуждают об исключениях и обработке исключений. Также непросто понять, каковы требования: вы хотите иметь только входные данные? Вы также хотите сохранить исключение? Как насчет состояния оператора, которое могло повлиять на результат? Должен ли Flink по-прежнему завершаться сбоем, если за заданный промежуток времени получено слишком много ошибок? Это быстро становится важной функцией для чего-то, чего вообще не должно происходить в идеальном мире, где данные высокого качества обрабатываются и обрабатываются должным образом.

Таким образом, хотя это выглядит просто для вашего случая, потому что вы точно знаете, какие виды информации вы хотите сохранить, нелегко найти решение для всех целей, тем более что дополнительный код, который должен написать пользователь, крошечный по сравнению с общим решением.


Что вы могли бы сделать, так это извлечь большинство сложных логических элементов в один ProcessFunction и использовать побочные выходы, как вы описали. Поскольку это центральная часть, вам нужно будет написать функцию побочного вывода только один раз. Если это выполняется несколько раз, вы можете извлечь вспомогательную функцию, в которой вы передаете свой фактический код как RunnableWithException лямбда-выражение, которое скрывает всю логику побочного вывода. Убедитесь, что вы используете много finally блоков, чтобы обеспечить нормальное состояние.

Я бы также добавил немало ИТ-кейсов и использовал тестирование мутаций, чтобы быстрее укрепить ваш конвейер. Если вы сохраняете свои тестовые данные встроенными, мутанты могут также точно имитировать ваши неожиданные проблемы с данными, так что ваш оператор проверки становится более полным.

Комментарии:

1. Спасибо, Арвид. Я понимаю точку зрения, но я не уверен, как я к этому отношусь 🙂 В моем случае данные с потерями лучше, чем отсутствие данных, так что это, вероятно, влияет на мое мышление. Мне нравится идея обернуть логику в ProcessFunction и использовать ее для направления поврежденных данных на побочные выходы.

2. В прошлом у меня был похожий проект, где нам действительно нужна была доступность (и мы были уверены, что наш код работает для большинства событий). Итак, да, мы завершили несколько преобразований, подверженных ошибкам при обработке исключений, и написали об этом во второстепенной теме. Дополнительный код был незначительным, и у нас был явный выбор, что обрабатывать, а что нет. Кстати, это было с Kafka Streams , поэтому я предполагаю, что вам нужно подумать об обработке исключений для всех потоковых процессоров на базе Java.

3. Конечно, вам нужно поместить предупреждения в тему ошибки. Существует риск того, что вы пропустите изменение формата и накопите в нем огромные куски событий. Вам придется регулярно просматривать эту тему и улучшать свой конвейер. Если у вас есть некоторый контроль над данными, использование событий на основе схемы также значительно помогает уменьшить неожиданные форматы.