#java #amazon-s3 #apache-flink #flink-batch
Вопрос:
Рассмотрим код:
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
class Scratch {
public static void main(String[] args) {
StreamTableEnvironment tableEnv = /*some init code here*/;
tableEnv.executeSql("CREATE TABLE my_table (n"
" id STRING,n"
" createdDate DATE,n"
" `date` STRING "
" ) PARTITIONED BY (`date`) n"
" WITH (n"
" 'connector' = 'filesystem',n"
" 'path' = 's3://my-bucket/',n"
" 'format' = 'json'n"
" )");
tableEnv.executeSql("CREATE TABLE output_table (n"
" id STRING,n"
" created_date DATE,n"
" count_value BIGINT,n"
" PRIMARY KEY (id, created_date) NOT ENFORCEDn"
") WITH (n"
" 'connector' = 'filesystem', n"
" 'path' = 's3://some-bucket/output-table/',n"
" 'format' = 'json'n"
" )");
Table temp = tableEnv.sqlQuery(
" SELECT id as id, "
" max(createdDate) as created_date, "
" COUNT(DISTINCT(id)) as count_value "
" from my_tablen"
" GROUP BY createdDate, id"
);
temp.executeInsert("output_table");
}
}
Это приведет меня к ошибке:
орг.апач.флинк.клиент.программа.Исключение ProgramInvocationException: Основной метод вызвал ошибку: Приемник таблицы ‘default_catalog.default_database.output_table’ не поддерживает использование изменений обновления, создаваемых группой узлов(select=[MIN($f0) В КАЧЕСТВЕ идентификатора, MAX(CreatedDate) В КАЧЕСТВЕ created_date, КОЛИЧЕСТВО(ОТЛИЧНОЕ ОТ $f2) КАК значение count_value ])
Есть ли способ записать агрегацию в s3 через flink? (флинк запускается в пакетном режиме)
Ответ №1:
Как бы то ни было, вы выполняете запрос в потоковом режиме, для чего требуется приемник, который может обрабатывать обновления и удаления, поступающие из агрегации.
Это сработает, если вы либо
- выводите результаты в формате CDC (список изменений), например debezium,
- или запустите задание в пакетном режиме
Для запуска в пакетном режиме вы можете сделать это:
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inBatchMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
Если вам нужно использовать табличный API в режиме пакетного выполнения, одновременно имея доступ к API потока данных, это возможно только с Flink 1.14.