Как записать данные в FS, HDFS или S3 с помощью приемника файлов Flink с полными разрешениями

#scala #io #hdfs #apache-flink

Вопрос:

У меня есть конвейер с Flink 13 и Kafka для HDFS (или FS). Для записи строковых файлов в HDFS я использую приемник файлов (форматы с кодировкой строк), а созданные файлы не имеют полных разрешений

 -rw-r--r-- 1 flink flink 1545571 Oct  4 07:08 part-a8a42755-2d30-4b16-a93c-b9407764980f-1
-rw-r--r-- 1 flink flink 1545182 Oct  4 07:09 part-a8a42755-2d30-4b16-a93c-b9407764980f-2
 

Не могли бы вы помочь мне, пожалуйста, настроить приемник файлов Flink для записи файлов в хранилище FS, HDFS или S3 с управляемыми разрешениями, например

 drwxrwxrwx
 

Код

 import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.connector.file.sink.FileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy

val input: DataStream[String] = ...

val sink: FileSink[String] = FileSink
    .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
    .withRollingPolicy(
        DefaultRollingPolicy.builder()
            .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
            .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
            .withMaxPartSize(1024 * 1024 * 1024)
            .build())
    .build()

input.sinkTo(sink)
 

Код из документов
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/file_sink/#row-encoded-formats

Предоставление родительскому каталогу в диспетчере заданий полных разрешений не помогло

 drwxrwxrwt   1 root root 4096 Oct  4 18:38 tmp
drwxrwxrwx 1 root  root  4096 Oct  4 18:38 flink-output
drwxrwxrwx 1 flink flink 4096 Oct  4 18:47 2021-10-04--18
-rw-r--r-- 1 flink flink 1557542 Oct  4 18:48 part-87dfe0af-87b1-4a6d-b244-7329b5c567db-9
 

Комментарии:

1. Перед запуском программы предоставьте родительскому каталогу полные разрешения