#scala #apache-flink
Вопрос:
Я новичок в Флинке. Я действительно не понимаю, как кэшировать файлы и загружать их в набор данных ? Я не могу найти простого примера. Я в замешательстве, почему нам нужно сначала создать набор данных, чтобы вызвать «функцию RichMapFunction» ? Как я кэширую файл, который не имеет ничего общего с любым другим набором данных? В примере, который я нашел, он как бы выполнял объединение с другим набором данных. Спасибо.
Ответ №1:
В случае объединения двух наборов данных, а один набор данных невелик, используйте широковещательную передачу, чтобы избежать перетасовки. Без широковещательной передачи перетасовать большой набор данных-сущая мука.
Например, в одном наборе данных содержится 1 миллиард записей, в другом-100 записей. С помощью широковещательной передачи небольшой набор данных будет распределен среди всех менеджеров задач, обрабатывающих эти 1 миллиард записей, — без перемещения 1 миллиарда записей для соединения. Без трансляции типичным поведением операции присоединения является перетасовка 1 миллиарда записей и 100 записей, чтобы записи с одним и тем же ключом находились в одной и той же машине, что намного дороже по сравнению с трансляцией.
Функция Richmapфункция предоставляет метод open() и метод для доступа к RuntimeContext. В функции open() задание Flink может получить широковещательный набор данных через getRuntimeContext (). getBroadcastVariable (). Функция open() вызывается только один раз для каждого оператора, поэтому транслируемый набор данных инициализируется один раз, а затем может быть применен ко всем входящим записям. Именно по этой причине следует использовать функцию RichMapFunction() вместо функции MapFunction().
Примечание — Широковещательная передача применяется в том случае, если набор данных для широковещательной передачи невелик. Сначала необходимо создать набор данных, а затем передать набор данных всем операторам. Пожалуйста, обратитесь сюда для использования API.
Для распределенного кэширования файлов это относится к тому случаю, когда операции(например, операции с картой) необходимо один раз загрузить внешний файл и использовать его в операции.
Например, обученная модель сохраняется в формате HDFS. В задании Flink ему необходимо загрузить модель и применить ее к каждой записи. В этом случае задание Flink может использовать API распределенного файлового кэша. Файл модели будет извлечен из HDFS на локальную машину, и все задачи, выполняемые на этой машине, могут совместно использовать извлеченный файл локально, что экономит сеть и время.
Вам не нужно создавать набор данных для распространяемого файла, а нужно использовать registerCachedFile(). Как и по той же причине для широковещательного набора данных, использование функции RichMapFunction позволяет заданию Flink загружать/инициализировать распределенный файл один раз.
Пожалуйста, обратитесь к этому документу для использования.