Как использовать Apache Flink с данными поиска?

#apache-flink

#apache-flink

Вопрос:

.

Привет,

используя Apache Flink 1.8. У меня есть поток записей, поступающих из Kafka в виде JSON, и я фильтрую их, и все это работает нормально.

Теперь я хотел бы обогатить данные из Kafka значением поиска из таблицы базы данных.

Это просто случай создания 2 потоков, загрузки таблицы во 2-й поток и последующего объединения данных?

Таблица базы данных обновляется, но не часто, и я бы хотел избежать поиска в БД по каждой записи, которая поступает через поток.

Ответ №1:

У Flink есть состояние, которым вы могли бы воспользоваться здесь. Я сделал нечто подобное, где я брал ежедневный запрос из своей таблицы поиска (в моем случае это был массовый вызов веб-сервиса) и просматривал результаты в теме kafka. Эта тема kafka использовалась тем же заданием службы flink, которое требовало данных для поиска. Оба раздела были помечены одним и тем же значением, но я использовал раздел поиска для сохранения данных в заданном состоянии, а при обработке другого раздела я бы извлек данные из состояния.

У меня была дополнительная логика, чтобы проверить, не было ли еще состояния для данного ключа. Если бы это было так, я бы сделал асинхронный запрос к веб-сервису. Однако вам может и не понадобиться это делать.

Предостережение здесь в том, что у меня была память для управления состоянием, и моя таблица поиска содержала всего около 30 миллионов записей, около 100 гигабайт, распределенных по 45 слотам на 15 узлах.

[В ответ на вопрос в комментариях] Извините, но мой ответ был слишком длинным, поэтому пришлось отредактировать мой пост:

У меня было задание на python, которое загружало данные с помощью массового вызова REST (ваш мог просто выполнить поиск данных). Затем он преобразовал данные в правильный формат и загрузил их в Kafka. Тогда у моего потока flink было два источника, один из которых был разделом «реальные данные», а другой — разделом «данные поиска». Данные, поступающие из раздела данных поиска, были сохранены в состоянии (я использовал ValueState, потому что каждый ключ сопоставлен с единственным возможным значением, но существуют и другие типы состояний. У меня также было 24-часовое время истечения срока действия для каждой записи, но это был мой usecase.

Хитрость заключается в том, что та же операция, которая сохраняет значение в состоянии из раздела поиска, должна быть операцией, которая извлекает значение обратно из состояния из «реальной» темы. Это связано с тем, что состояния flink (даже состояния с ключом) привязаны к оператору, который их создал.

Комментарии:

1. Моя таблица поиска не такая большая, может быть, несколько сотен строк. Но что вы использовали для загрузки данных отдельным потоком?

2. @user432024 Я обновил свой пост, чтобы ответить на ваш вопрос.

3. Прохладный. Я приму ваш ответ. На самом деле у меня была похожая идея при загрузке в Apache Ignite, поскольку я уже использую его… Существует также: ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream /…

4. Мой опыт работы с асинхронными процессами в flink не является положительным. Внутренне он, похоже, использует службу исполнителя, и если поток данных становится слишком большим, очередь задач исполнителя заполняется. Добавление очереди задач затем блокируется до тех пор, пока в очереди не останется свободного места, что создает обратное давление. Для меня это привело к увеличению задержки и в конечном итоге к удалению сообщений. По моему опыту, избегайте асинхронных процессов, если только количество запросов не может быть сведено к минимуму.

5. Хорошо, так что в моем случае. У меня будет задание, загружаемое в Apache Ignite, и у меня будет flatMap, который выполняет поиск в Ignite и таким образом обогащает данные. Пример: github.com/dataArtisans/yahoo-streaming-benchmark/blob/master /…