Как читать файлы в flink, если его необработанный файл журнала использует scala?

#scala #apache-flink #flink-streaming

Вопрос:

Я пытаюсь прочитать необработанный файл журнала в flink, чтобы обработать его в kinesis .Однако в flink scala, как читать необработанные файлы вместо использования текстового файла, так как текстовый файл создает поток данных[Строка], что создает проблему с форматированием.Я пробовал это, но это для чтения текста:

 val inputStream = env.readTextFile(config.input.stream)
inputStream.print()

inputStream.map{str =>
  val strByte = str.getBytes()
  val thriftSerializer = new LazyBinaryThriftStructSerializer[CollectorPayload] {
    override def codec: CollectorPayload.type = CollectorPayload
  }
  val collectorPayload = thriftSerializer.fromBytes(strByte)
  collectorPayload
}.print()
 

Здесь он не преобразует данные в надлежащий формат, поэтому хотел прочитать как двоичный файл, возможно ли это?

Ответ №1:

Вместо использования readTextFile(path) вы можете использовать readFile(fileInputFormat, path) и предоставлять fileInputFormat то, что соответствует вашим потребностям.

https://ci.apache.org/projects/flink/flink-docs-release-1.2/api/java/org/apache/flink/api/common/io/class-use/FileInputFormat.html

Комментарии:

1. файл не имеет расширения, так как определить формат можно в любом примере