#scala #io #hdfs #apache-flink
Вопрос:
У меня есть конвейер с Flink 13 и Kafka для HDFS (или FS). Для записи строковых файлов в HDFS я использую приемник файлов (форматы с кодировкой строк), а созданные файлы не имеют полных разрешений
-rw-r--r-- 1 flink flink 1545571 Oct 4 07:08 part-a8a42755-2d30-4b16-a93c-b9407764980f-1
-rw-r--r-- 1 flink flink 1545182 Oct 4 07:09 part-a8a42755-2d30-4b16-a93c-b9407764980f-2
Не могли бы вы помочь мне, пожалуйста, настроить приемник файлов Flink для записи файлов в хранилище FS, HDFS или S3 с управляемыми разрешениями, например
drwxrwxrwx
Код
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.connector.file.sink.FileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
val input: DataStream[String] = ...
val sink: FileSink[String] = FileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.build()
input.sinkTo(sink)
Код из документов
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/file_sink/#row-encoded-formats
Предоставление родительскому каталогу в диспетчере заданий полных разрешений не помогло
drwxrwxrwt 1 root root 4096 Oct 4 18:38 tmp
drwxrwxrwx 1 root root 4096 Oct 4 18:38 flink-output
drwxrwxrwx 1 flink flink 4096 Oct 4 18:47 2021-10-04--18
-rw-r--r-- 1 flink flink 1557542 Oct 4 18:48 part-87dfe0af-87b1-4a6d-b244-7329b5c567db-9
Комментарии:
1. Перед запуском программы предоставьте родительскому каталогу полные разрешения