Временное объединение во времени событий в Apache Flink работает только с небольшими наборами данных

#join #apache-flink #temporal

#Присоединиться #apache-flink #временное

Вопрос:

Справочная информация: я пытаюсь получить временное объединение во времени событий, работающее с двумя большими (r) наборами данных / таблицами, которые считываются из CSV-файла (16K строк в левой таблице, несколько меньше в правой таблице). Обе таблицы являются таблицами только для добавления, т. Е. Их источниками данных в настоящее время являются CSV-файлы, но станут журналами изменений CDC, отправленными Debezium через Pulsar. Я использую довольно новый SYSTEM_TIME AS OF синтаксис.

Проблема: результаты объединения корректны лишь частично, т. Е. В начале (первые 20% или около того) выполнения запроса строки левой части не сопоставляются со строками с правой стороны, хотя теоретически они должны. Через пару секунд появляется больше совпадений, и к моменту завершения запроса строки левой части правильно сопоставляются / соединяются со строками правой стороны. Каждый раз, когда я запускаю запрос, он показывает другие результаты, в терминах которых строки (не) совпадают.

Оба набора данных не упорядочены по их соответствующим временам событий. Они упорядочены по их первичному ключу. Так что это действительно так, только с большим количеством данных.

По сути, правая сторона — это таблица поиска, которая меняется со временем, и мы уверены, что для каждой левой записи была соответствующая правая запись, поскольку обе были созданы в исходной базе данных в /- одно и то же мгновение. В конечном итоге наша цель — динамическое материализованное представление, содержащее те же данные, что и при объединении 2 таблиц в исходной базе данных с поддержкой CDC (SQL Server).

Очевидно, что я хочу добиться правильного объединения по всему набору данных, как описано в документах Flink
В отличие от простых примеров и тестового кода Flink с небольшим набором данных, состоящим всего из нескольких строк (как здесь ), объединение больших наборов данных не дает правильных результатов.

Я подозреваю, что, когда таблица зондирования / левой таблицы начинает течь, таблица построения / правой таблицы еще не находится «в памяти», что означает, что левые строки не находят соответствующую правую строку, в то время как они должны — если бы правильная таблица начала течь несколько раньше. Вот почему left join возвращает нулевые значения для столбцов правой таблицы.

Я включил свой код:

 @Slf4j(topic = "TO_FILE")
public class CsvTemporalJoinTest {

    private final String emr01Ddl =
            "CREATE TABLE EMR01n"  
                    "(n"  
                    "    SRC_NO         STRING,n"  
                    "    JRD_ETT_NO     STRING,n"  
                    "    STT_DT         DATE,n"  
                    "    MGT_SLT_DT     DATE,n"  
                    "    ATM_CRT_DT     DATE,n"  
                    "    LTD_MDT_IC     STRING,n"  
                    "    CPN_ORG_NO     STRING,n"  
                    "    PTY_NO         STRING,n"  
                    "    REG_USER_CD    STRING,n"  
                    "    REG_TS         TIMESTAMP,n"  
                    "    MUT_USER_CD    STRING,n"  
                    "    MUT_TS         TIMESTAMP(3),n"  
                    "    WATERMARK FOR MUT_TS AS MUT_TS,n"  
                    "    PRIMARY KEY (CPN_ORG_NO) NOT ENFORCEDn"  
                    ") WITH (n"  
                    "   'connector' = 'filesystem',n"  
                    "   'path' = '"   getCsv1()   "',n"  
                    "   'format' = 'csv'n"  
                    ")";

    private final String emr02Ddl =
            "CREATE TABLE EMR02n"  
                    "(n"  
                    "    CPN_ORG_NO  STRING,n"  
                    "    DSB_TX      STRING,n"  
                    "    REG_USER_CD STRING,n"  
                    "    REG_TS      TIMESTAMP,n"  
                    "    MUT_USER_CD STRING,n"  
                    "    MUT_TS      TIMESTAMP(3),n"  
                    "    WATERMARK FOR MUT_TS AS MUT_TS,n"  
                    "    PRIMARY KEY (CPN_ORG_NO) NOT ENFORCEDn"  
                    ") WITH (n"  
                    "   'connector' = 'filesystem',n"  
                    "   'path' = '"   getCsv2()   "',n"  
                    "   'format' = 'csv'n"  
                    ")";

    @Test
    public void testEventTimeTemporalJoin() throws Exception {
        var env = StreamExecutionEnvironment.getExecutionEnvironment();
        var tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql(emr01Ddl);
        tableEnv.executeSql(emr02Ddl);

        Table result = tableEnv.sqlQuery(""  
                "SELECT *"  
                "   FROM EMR01"  
                "   LEFT JOIN EMR02 FOR SYSTEM_TIME AS OF EMR01.MUT_TS"  
                "       ON EMR01.CPN_ORG_NO = EMR02.CPN_ORG_NO");

        tableEnv.toChangelogStream(result).addSink(new TestSink());
        env.execute();

        System.out.println("[Count]"   TestSink.values.size());
        //System.out.println("[Row 1]"   TestSink.values.get(0));
        //System.out.println("[Row 2]"   TestSink.values.get(1));
        AtomicInteger i = new AtomicInteger();
        TestSink.values.listIterator().forEachRemaining(value -> log.info("[Row "   i.incrementAndGet()   " ]="   value));
    }

    private static class TestSink implements SinkFunction<Row> {

        // must be static
        public static final List<Row> values = Collections.synchronizedList(new ArrayList<>());

        @Override
        public void invoke(Row value, SinkFunction.Context context) {
            values.add(value);
        }
    }

    String getCsv1() {
        try {
            return new ClassPathResource("/GBTEMR01.csv").getFile().getAbsolutePath();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    String getCsv2() {
        try {
            return new ClassPathResource("/GBTEMR02.csv").getFile().getAbsolutePath();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

}
 

Есть ли способ решить эту проблему? Например. есть ли способ СНАЧАЛА загрузить правую сторону в состояние Flink, а ЗАТЕМ начать загрузку / потоковую передачу левой стороны? Будет ли это хорошим подходом, потому что возникает вопрос: насколько позже? в какое время левая сторона может начать течь?

Мы используем Flink 1.13.3.

Ответ №1:

Такого рода временное / версионное объединение зависит от наличия точных водяных знаков. Flink полагается на водяные знаки, чтобы знать, какие строки можно безопасно удалить из поддерживаемого состояния (потому что они больше не могут влиять на результаты).

Используемые вами водяные знаки указывают на то, что строки упорядочены по MUT_TS . Поскольку это неверно, объединение не может выдавать полные результаты.

Чтобы исправить это, водяные знаки должны быть определены примерно так

 WATERMARK FOR MUT_TS AS MUT_TS - INTERVAL '2' MINUTE
 

где интервал указывает, сколько неупорядоченности необходимо учесть.

Комментарии:

1. Спасибо, Дэвид, по-видимому, это исправило проблему — ты действительно спас для меня день. Но я полагаю, что мне просто «повезло», и 2 минут, кажется, достаточно. Оба потока фактически (логически) поступают из Debezium и упорядочены по PK (последовательности). Таким образом, записи с более низким значением PK также имеют более низкие значения MUT_TS (= REG_TS при первоначальном создании). Конечно, это нельзя назвать совпадением, но если бы PK не был последовательностью, это бы вообще не сработало, верно? Итак, в целом, не было бы лучше, чтобы оба потока упорядочивались по MUT_TS ? Но возникает вопрос: возможно ли последнее с помощью Debezium?

2. Debezium создает события в том же порядке, в котором они произошли; Я думаю, все должно быть в порядке.