Флинк, Кафка и JDBC раковина

#jdbc #apache-kafka #apache-flink #flink-streaming

Вопрос:

У меня есть задание Flink 1.11, которое использует сообщения из темы Кафки, открывает их, фильтрует (ввод ключа, за которым следует пользовательская функция процесса) и сохраняет их в БД через приемник JDBC (как описано здесь: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/jdbc.html)

Потребитель Kafka инициализируется с помощью этих параметров:

 properties.setProperty("auto.offset.reset", "earliest")
kafkaConsumer = new FlinkKafkaConsumer(topic, deserializer, properties)
kafkaConsumer.setStartFromGroupOffsets()
kafkaConsumer.setCommitOffsetsOnCheckpoints(true)

 

В кластере включены контрольные точки.

Чего я хочу добиться, так это гарантии сохранения всех отфильтрованных данных в БД, даже если база данных не работает, скажем, 6 часов или при сохранении в БД возникают ошибки программирования, и задание необходимо обновить, перераспределить и перезапустить.

Чтобы это произошло, любая контрольная отметка смещений Кафки должна означать, что либо

  1. Данные, которые были прочитаны из Кафки, находятся в состоянии оператора Flink, ожидая фильтрации / передачи в приемник, и будут отмечены как часть проверки оператора Flink, ИЛИ
  2. Данные, которые были прочитаны из Кафки, уже были записаны в базу данных.

Рассматривая реализацию JdbcSink, я вижу, что на самом деле он не сохраняет никакого внутреннего состояния, которое будет проверено/восстановлено — скорее, его контрольная точка-это запись в базу данных. Теперь, если эта запись завершится неудачно во время проверки, и смещения Кафки будут сохранены, я окажусь в ситуации, когда я «потерял» данные — последующие чтения из Кафки возобновятся с зафиксированных смещений, и все данные, которые были в полете, когда произошла ошибка записи в БД, теперь больше не считываются из Кафки и не находятся в БД.

Итак, есть ли способ прекратить продвижение смещений Кафки всякий раз, когда полный конвейер (Кафка -> Флинк ->> БД) не выполняется — или потенциально решение здесь (в мире до 1.13) состоит в том, чтобы создать мою собственную реализацию функции GenericJdbcSinkFunction, которая будет поддерживать некоторое состояние значений, пока запись бд не завершится успешно?

Комментарии:

1. почему вы не используете скольжение? ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/…

2. Не уверен, как это поможет с сохранением в базе данных? Независимо от того, объединяюсь ли я с Windows или использую Windows, в конце мне нужно сохранить полученное окно в БД, и в этом моя проблема.

3. @kozyr Flink 1.13 ровно один раз обеспечил поддержку соединителя JDBC (в настоящее время не поддерживается для MySQL). Это означает, что если вы используете Kafka с поддержкой ровно один раз и JDBC, фиксация смещения во время контрольной точки должна быть прервана в случае сбоя одного из операторов. Подробнее об этом здесь

4. @YuvalItzchakov, К сожалению, я нахожусь на 1.11, так как я использую Kinesis Data Analytics для запуска Flink, а последняя версия-1.11.

5. Я не совсем понимаю ваш вопрос: вы используете контрольные точки или нет? setCommitOffsetsOnCheckpoints подразумевает это, но вы явно указываете в своем вопросе, что не используете контрольные точки.

Ответ №1:

Есть 3 варианта, которые я вижу:

  1. Попробуйте разъем JDBC 1.13 с вашей версией Flink. Есть хороший шанс, что это может просто сработать.
  2. Если это не сработает немедленно, проверьте, можете ли вы перенести его обратно в 1.11. Не должно быть слишком много изменений.
  3. Напишите свой собственный 2-фазный приемник фиксации, либо расширяя TwoPhaseCommitSinkFunction , либо реализуя свой собственный SinkFunction с CheckpointedFunction помощью и CheckpointListener . По сути, вы создаете новую транзакцию после успешной контрольной точки и фиксируете ее с notifyCheckpointCompleted помощью .