Конвейер flink не обрабатывает сообщения Кафки после переключения источника

#apache-kafka #apache-flink #apache-beam #apache-beam-io

Вопрос:

У меня есть вариант использования для создания приложения с отслеживанием состояния. Мне нужно построить состояние с историческими данными, хранящимися в S3, и переключиться на кафку ( с определенного смещения) с продолжающимся историческим состоянием.

У нас есть трубопровод луча с бегуном в качестве флинка. Ниже приведены шаги, которые я выполняю. Обработайте все файлы и создайте состояние. Остановите задание флинка с точкой сохранения Запустите задание флинка с состоянием, сохраненным на шаге 2, и переключитесь на кафку в качестве источника

Мой конвейер не обрабатывает никаких сообщений на шаге 3. Когда я проверяю пользовательский интерфейс flink, я замечаю, что водяной знак установлен как MAX_WATERMARK (9223372036854776000) для блока преобразования с сохранением состояния. Я ищу решение, если есть способ переопределить этот водяной знак и установить его на требуемое смещение.

Ниже приведены примеры кода и топологии. Для POC я читаю данные из локальных файлов и получаю имена файлов от кафки.

Я использую flink версии 1.9.3 beam версии 2.23.0

 try {
    PCollection<String> records = null;
    boolean isSource1 = runningMode.equals("source1") ? true : false;

    if (isSource1) {
        PCollection<String> absolutePaths = pipeline
                .apply("read from source topic 1", KafkaIO.<Long, String>read()
                        .withBootstrapServers("localhost:9092")
                        .withTopic("file-topic")
                        .withKeyDeserializer(org.apache.kafka.common.serialization.LongDeserializer.class)
                        .withValueDeserializer(org.apache.kafka.common.serialization.StringDeserializer.class))
                .apply(MapElements.into(TypeDescriptor.of(String.class)).via(record -> {
                    String folder = record.getKV().getValue();
                    String path = "file:///tmp/files/"   folder   "/*";
                    System.out.println("path -> "   path);
                    return path;
                }));
        records = absolutePaths
                .apply("read file names", FileIO.matchAll())
                .apply("match file names", FileIO.readMatches())
                .apply("read data from files", TextIO.readFiles());
    } else {
        records = pipeline
                .apply("read from source topic 2", KafkaIO.<Long, String>read()
                        .withBootstrapServers("localhost:9092")
                        .withTopic("data-topic")
                        .withStartReadTime(Instant.parse("2021-09-01T09:30:00-00:00"))
                        .withKeyDeserializer(org.apache.kafka.common.serialization.LongDeserializer.class)
                        .withValueDeserializer(org.apache.kafka.common.serialization.StringDeserializer.class)
                )
                .apply(MapElements.into(TypeDescriptor.of(String.class)).via(record -> record.getKV().getValue()));
    }

    PCollection<String> output = records.apply("counter", new ClientTransformation());
    return output.apply("writing to output topic", KafkaIO.<Void, String>write()
            .withBootstrapServers("localhost:9092")
            .withTopic("output-topic").withValueSerializer(org.apache.kafka.common.serialization.StringSerializer.class)
            .values());

} catch (Exception e) {
    LOG.error("Failed to initialize pipeline due to missing coders", e.getMessage());
    return null;
}
 

топология на шаге 1
топология на шаге 3