#apache-flink
#apache-flink
Вопрос:
У меня есть файлы данных hdfs, которые изначально были созданы заданием mapreduce с настройками вывода, как показано ниже,
job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(BytesWritable.class);
job.setOutputFormatClass(SequenceFileAsBinaryOutputFormat.class);
SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
Теперь я пытаюсь прочитать эти файлы с помощью Flink DataSet API (версия 1.5.6), я заглядываю в документ flink, но не могу понять, как это сделать.
- В документе есть API ‘readSequenceFile’, я просто не могу найти его в классе ExecutionEnvironment, я могу найти ‘readCsvFile’, ‘readTextFile’, но не этот.
- Есть общий ‘ReadFile (InputFormat, path)’, но я понятия не имею, что такое InputFormat, похоже, этот API не принимает формат ввода hadoop, такой как ‘SequenceFileAsBinaryInputFormat’.
Не мог бы кто-нибудь, пожалуйста, пролить свет на это? Большое спасибо.
Ответ №1:
Я думаю, что вы пропустили дополнительную зависимость: "org.apache.flink" %% "flink-hadoop-compatibility" % 1.7.2
Как только вы добавили это, вы можете запустить:
val env = ExecutionEnvironment.getExecutionEnvironment
env.createInput(HadoopInputs.readSequenceFile[Long, String](classOf[Long], classOf[String], "/data/wherever"))
Найдите более подробную документацию о том, что и как здесь https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/hadoop_compatibility.html
Надеюсь, это поможет
Комментарии:
1. Спасибо, ты спас мой день.
2. Привет, Тобиш, я хотел бы вывести результат в hdfs с помощью SequenceFileAsBinaryOutputFormat, кажется, я могу использовать HadoopOutputFormat в соответствии с приведенной вами выше ссылкой, но для этого требуется инициализировать экземпляр задания mapreduce, означает ли это, что он будет внутренне запускать задание mapreduce?
3. Я не уверен. Я бы просто попробовал. Вы должны иметь возможность видеть, что происходит в вашем менеджере заданий. Извините.