#apache-flink
#apache-flink
Вопрос:
.
Привет,
используя Apache Flink 1.8. У меня есть поток записей, поступающих из Kafka в виде JSON, и я фильтрую их, и все это работает нормально.
Теперь я хотел бы обогатить данные из Kafka значением поиска из таблицы базы данных.
Это просто случай создания 2 потоков, загрузки таблицы во 2-й поток и последующего объединения данных?
Таблица базы данных обновляется, но не часто, и я бы хотел избежать поиска в БД по каждой записи, которая поступает через поток.
Ответ №1:
У Flink есть состояние, которым вы могли бы воспользоваться здесь. Я сделал нечто подобное, где я брал ежедневный запрос из своей таблицы поиска (в моем случае это был массовый вызов веб-сервиса) и просматривал результаты в теме kafka. Эта тема kafka использовалась тем же заданием службы flink, которое требовало данных для поиска. Оба раздела были помечены одним и тем же значением, но я использовал раздел поиска для сохранения данных в заданном состоянии, а при обработке другого раздела я бы извлек данные из состояния.
У меня была дополнительная логика, чтобы проверить, не было ли еще состояния для данного ключа. Если бы это было так, я бы сделал асинхронный запрос к веб-сервису. Однако вам может и не понадобиться это делать.
Предостережение здесь в том, что у меня была память для управления состоянием, и моя таблица поиска содержала всего около 30 миллионов записей, около 100 гигабайт, распределенных по 45 слотам на 15 узлах.
[В ответ на вопрос в комментариях] Извините, но мой ответ был слишком длинным, поэтому пришлось отредактировать мой пост:
У меня было задание на python, которое загружало данные с помощью массового вызова REST (ваш мог просто выполнить поиск данных). Затем он преобразовал данные в правильный формат и загрузил их в Kafka. Тогда у моего потока flink было два источника, один из которых был разделом «реальные данные», а другой — разделом «данные поиска». Данные, поступающие из раздела данных поиска, были сохранены в состоянии (я использовал ValueState, потому что каждый ключ сопоставлен с единственным возможным значением, но существуют и другие типы состояний. У меня также было 24-часовое время истечения срока действия для каждой записи, но это был мой usecase.
Хитрость заключается в том, что та же операция, которая сохраняет значение в состоянии из раздела поиска, должна быть операцией, которая извлекает значение обратно из состояния из «реальной» темы. Это связано с тем, что состояния flink (даже состояния с ключом) привязаны к оператору, который их создал.
Комментарии:
1. Моя таблица поиска не такая большая, может быть, несколько сотен строк. Но что вы использовали для загрузки данных отдельным потоком?
2. @user432024 Я обновил свой пост, чтобы ответить на ваш вопрос.
3. Прохладный. Я приму ваш ответ. На самом деле у меня была похожая идея при загрузке в Apache Ignite, поскольку я уже использую его… Существует также: ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream /…
4. Мой опыт работы с асинхронными процессами в flink не является положительным. Внутренне он, похоже, использует службу исполнителя, и если поток данных становится слишком большим, очередь задач исполнителя заполняется. Добавление очереди задач затем блокируется до тех пор, пока в очереди не останется свободного места, что создает обратное давление. Для меня это привело к увеличению задержки и в конечном итоге к удалению сообщений. По моему опыту, избегайте асинхронных процессов, если только количество запросов не может быть сведено к минимуму.
5. Хорошо, так что в моем случае. У меня будет задание, загружаемое в Apache Ignite, и у меня будет flatMap, который выполняет поиск в Ignite и таким образом обогащает данные. Пример: github.com/dataArtisans/yahoo-streaming-benchmark/blob/master /…