Как записать в таблицу s3 в flink без ошибки обновления и удаления изменений?

#java #amazon-s3 #apache-flink #flink-batch

Вопрос:

Рассмотрим код:

 import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

class Scratch {
  public static void main(String[] args) {

    StreamTableEnvironment tableEnv = /*some init code here*/;

    tableEnv.executeSql("CREATE TABLE my_table (n"  
        "                                  id STRING,n"  
        "                                  createdDate DATE,n"  
        "                                  `date` STRING "  
        "                                ) PARTITIONED BY (`date`) n"  
        "                                WITH (n"  
        "                                  'connector' = 'filesystem',n"  
        "                                  'path' = 's3://my-bucket/',n"  
        "                                  'format' = 'json'n"  
        "                                )");

    tableEnv.executeSql("CREATE TABLE output_table  (n"  
        "  id STRING,n"  
        "  created_date DATE,n"  
        "  count_value BIGINT,n"  
        "  PRIMARY KEY (id, created_date) NOT ENFORCEDn"  
        ") WITH (n"  
        "   'connector' = 'filesystem', n"  
        "   'path' = 's3://some-bucket/output-table/',n"  
        "   'format' = 'json'n"  
        " )");
    Table temp = tableEnv.sqlQuery(
        " SELECT id as id, "  
            " max(createdDate) as created_date, "  
            " COUNT(DISTINCT(id)) as count_value  "  
            " from my_tablen"  
            "    GROUP BY createdDate, id"
    );
    temp.executeInsert("output_table");

  }
}
 

Это приведет меня к ошибке:

орг.апач.флинк.клиент.программа.Исключение ProgramInvocationException: Основной метод вызвал ошибку: Приемник таблицы ‘default_catalog.default_database.output_table’ не поддерживает использование изменений обновления, создаваемых группой узлов(select=[MIN($f0) В КАЧЕСТВЕ идентификатора, MAX(CreatedDate) В КАЧЕСТВЕ created_date, КОЛИЧЕСТВО(ОТЛИЧНОЕ ОТ $f2) КАК значение count_value ])

Есть ли способ записать агрегацию в s3 через flink? (флинк запускается в пакетном режиме)

Ответ №1:

Как бы то ни было, вы выполняете запрос в потоковом режиме, для чего требуется приемник, который может обрабатывать обновления и удаления, поступающие из агрегации.

Это сработает, если вы либо

  • выводите результаты в формате CDC (список изменений), например debezium,
  • или запустите задание в пакетном режиме

Для запуска в пакетном режиме вы можете сделать это:

 import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

EnvironmentSettings settings = EnvironmentSettings
    .newInstance()
    .inBatchMode()
    .build();

TableEnvironment tEnv = TableEnvironment.create(settings);
 

Если вам нужно использовать табличный API в режиме пакетного выполнения, одновременно имея доступ к API потока данных, это возможно только с Flink 1.14.