#apache-kafka #apache-flink #apache-beam #apache-beam-io
Вопрос:
У меня есть вариант использования для создания приложения с отслеживанием состояния. Мне нужно построить состояние с историческими данными, хранящимися в S3, и переключиться на кафку ( с определенного смещения) с продолжающимся историческим состоянием.
У нас есть трубопровод луча с бегуном в качестве флинка. Ниже приведены шаги, которые я выполняю. Обработайте все файлы и создайте состояние. Остановите задание флинка с точкой сохранения Запустите задание флинка с состоянием, сохраненным на шаге 2, и переключитесь на кафку в качестве источника
Мой конвейер не обрабатывает никаких сообщений на шаге 3. Когда я проверяю пользовательский интерфейс flink, я замечаю, что водяной знак установлен как MAX_WATERMARK (9223372036854776000) для блока преобразования с сохранением состояния. Я ищу решение, если есть способ переопределить этот водяной знак и установить его на требуемое смещение.
Ниже приведены примеры кода и топологии. Для POC я читаю данные из локальных файлов и получаю имена файлов от кафки.
Я использую flink версии 1.9.3 beam версии 2.23.0
try {
PCollection<String> records = null;
boolean isSource1 = runningMode.equals("source1") ? true : false;
if (isSource1) {
PCollection<String> absolutePaths = pipeline
.apply("read from source topic 1", KafkaIO.<Long, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("file-topic")
.withKeyDeserializer(org.apache.kafka.common.serialization.LongDeserializer.class)
.withValueDeserializer(org.apache.kafka.common.serialization.StringDeserializer.class))
.apply(MapElements.into(TypeDescriptor.of(String.class)).via(record -> {
String folder = record.getKV().getValue();
String path = "file:///tmp/files/" folder "/*";
System.out.println("path -> " path);
return path;
}));
records = absolutePaths
.apply("read file names", FileIO.matchAll())
.apply("match file names", FileIO.readMatches())
.apply("read data from files", TextIO.readFiles());
} else {
records = pipeline
.apply("read from source topic 2", KafkaIO.<Long, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("data-topic")
.withStartReadTime(Instant.parse("2021-09-01T09:30:00-00:00"))
.withKeyDeserializer(org.apache.kafka.common.serialization.LongDeserializer.class)
.withValueDeserializer(org.apache.kafka.common.serialization.StringDeserializer.class)
)
.apply(MapElements.into(TypeDescriptor.of(String.class)).via(record -> record.getKV().getValue()));
}
PCollection<String> output = records.apply("counter", new ClientTransformation());
return output.apply("writing to output topic", KafkaIO.<Void, String>write()
.withBootstrapServers("localhost:9092")
.withTopic("output-topic").withValueSerializer(org.apache.kafka.common.serialization.StringSerializer.class)
.values());
} catch (Exception e) {
LOG.error("Failed to initialize pipeline due to missing coders", e.getMessage());
return null;
}