#jdbc #apache-kafka #apache-flink #flink-streaming
Вопрос:
У меня есть задание Flink 1.11, которое использует сообщения из темы Кафки, открывает их, фильтрует (ввод ключа, за которым следует пользовательская функция процесса) и сохраняет их в БД через приемник JDBC (как описано здесь: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/jdbc.html)
Потребитель Kafka инициализируется с помощью этих параметров:
properties.setProperty("auto.offset.reset", "earliest")
kafkaConsumer = new FlinkKafkaConsumer(topic, deserializer, properties)
kafkaConsumer.setStartFromGroupOffsets()
kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
В кластере включены контрольные точки.
Чего я хочу добиться, так это гарантии сохранения всех отфильтрованных данных в БД, даже если база данных не работает, скажем, 6 часов или при сохранении в БД возникают ошибки программирования, и задание необходимо обновить, перераспределить и перезапустить.
Чтобы это произошло, любая контрольная отметка смещений Кафки должна означать, что либо
- Данные, которые были прочитаны из Кафки, находятся в состоянии оператора Flink, ожидая фильтрации / передачи в приемник, и будут отмечены как часть проверки оператора Flink, ИЛИ
- Данные, которые были прочитаны из Кафки, уже были записаны в базу данных.
Рассматривая реализацию JdbcSink, я вижу, что на самом деле он не сохраняет никакого внутреннего состояния, которое будет проверено/восстановлено — скорее, его контрольная точка-это запись в базу данных. Теперь, если эта запись завершится неудачно во время проверки, и смещения Кафки будут сохранены, я окажусь в ситуации, когда я «потерял» данные — последующие чтения из Кафки возобновятся с зафиксированных смещений, и все данные, которые были в полете, когда произошла ошибка записи в БД, теперь больше не считываются из Кафки и не находятся в БД.
Итак, есть ли способ прекратить продвижение смещений Кафки всякий раз, когда полный конвейер (Кафка -> Флинк ->> БД) не выполняется — или потенциально решение здесь (в мире до 1.13) состоит в том, чтобы создать мою собственную реализацию функции GenericJdbcSinkFunction, которая будет поддерживать некоторое состояние значений, пока запись бд не завершится успешно?
Комментарии:
1. почему вы не используете скольжение? ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/…
2. Не уверен, как это поможет с сохранением в базе данных? Независимо от того, объединяюсь ли я с Windows или использую Windows, в конце мне нужно сохранить полученное окно в БД, и в этом моя проблема.
3. @kozyr Flink 1.13 ровно один раз обеспечил поддержку соединителя JDBC (в настоящее время не поддерживается для MySQL). Это означает, что если вы используете Kafka с поддержкой ровно один раз и JDBC, фиксация смещения во время контрольной точки должна быть прервана в случае сбоя одного из операторов. Подробнее об этом здесь
4. @YuvalItzchakov, К сожалению, я нахожусь на 1.11, так как я использую Kinesis Data Analytics для запуска Flink, а последняя версия-1.11.
5. Я не совсем понимаю ваш вопрос: вы используете контрольные точки или нет?
setCommitOffsetsOnCheckpoints
подразумевает это, но вы явно указываете в своем вопросе, что не используете контрольные точки.
Ответ №1:
Есть 3 варианта, которые я вижу:
- Попробуйте разъем JDBC 1.13 с вашей версией Flink. Есть хороший шанс, что это может просто сработать.
- Если это не сработает немедленно, проверьте, можете ли вы перенести его обратно в 1.11. Не должно быть слишком много изменений.
- Напишите свой собственный 2-фазный приемник фиксации, либо расширяя
TwoPhaseCommitSinkFunction
, либо реализуя свой собственныйSinkFunction
сCheckpointedFunction
помощью иCheckpointListener
. По сути, вы создаете новую транзакцию после успешной контрольной точки и фиксируете ее сnotifyCheckpointCompleted
помощью .