#spring-integration
Вопрос:
Я пытаюсь прочитать файл из GCP на основе уведомления, полученного в соответствии с потоком, определенным ниже:
Средство чтения файлов — Десериализует данные в сбор и отправляет для маршрутизации.
Я удаляю данные из коллекции объектов и отправляю их маршрутизатору для дальнейшей обработки. Поскольку у меня нет контроля над размером файла, я думаю о каком-то подходе к пакетной обработке процесса чтения.
В настоящее время активатор службы чтения файлов возвращает все Collection
десериализованные объекты.
Вопрос:
- В случае, если я получу файл большего размера, т. Е. с записями 200 тыс., я хочу отправлять его пакетами на маршрутизатор значений заголовка, а не в коллекцию из 200 тыс. объектов.
- Если я конвертирую средство чтения файлов в разделитель и добавлю агрегатор после этого уведомления- > средство чтения файлов — > > агрегатор- > > > маршрутизатор. Мне все равно нужно было бы вернуть коллекцию всех объектов, а не итератор.
- Я не хочу загружать все записи в коллекцию.
Обновленный подход:
public <S> Collection<S> readData(DataInfo dataInfo, Class<S> clazz) {
Resource gcpResource = context.getResource("classpath://data.json")
var tempDataSet = new HashSet<S>();
AtomicInteger pivot = new AtomicInteger();
try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(gcpResource.getInputStream()))) {
bufferedReader.lines().map((dataStr) -> {
try {
var data = deserializeData(dataStr, clazz);
return data;
} catch (JsonProcessingException ex) {
throw new CustomException("PARSER-1001", "Error occurred while parsing", ex);
}
}).forEach(data -> {
if (BATCH_SIZE == pivot.get()) {
//When the size in tempDataSet reached BATCH_SIZE send the data in routing channel and reset the pivot
var message = MessageBuilder.withPayload(tempDataSet.clone())
.setHeader(AppConstants.EVENT_HEADER_KEY, eventType)
.build();
routingChannel.send(message);
pivot.set(0);
tempDataSet.removeAll(tempDataSet);
} else {
pivot.addAndGet(1);
tempDataSet.add(data);
}
});
return tempDataSet;
} catch (Exception ex) {
throw new CustomException("PARSER-1002", "Error occurred while parsing", ex);
}
}
Если размер пакета в 100 и мы получили 1010 объектов. Будут созданы 11 пакетов, 10 со 100 и последний с 10 объектами в нем.
В случае, если я использую разделитель и передам ему поток, будет ли он ждать завершения всего потока, а затем отправлять собранную коллекцию, или мы сможем достичь чего-то близкого к предыдущему подходу, используя его?
Ответ №1:
Не уверен, в чем вопрос, но я бы выбрал FileSplitter
Aggregator
решение. Первый из них предназначен именно для использования при чтении потоковых файлов. Второй позволяет буферизировать входящие сообщения до тех пор, пока они не достигнут определенного состояния, поэтому он может выдавать одно сообщение вниз по потоку. Это сообщение действительно может быть с коллекцией в качестве полезной нагрузки.
Вот их документы для вашего рассмотрения:
https://docs.spring.io/spring-integration/docs/current/reference/html/file.html#file-splitter
Комментарии:
1. Приношу извинения за то, что не указал точную проблему в вопросе. Я пробовал использовать разделитель, но я думаю, что он может возвращать только коллекцию, а не итератор/поток сообщений.
2. Является ли отправка сообщений явно на канал, как указано выше, единственным способом?
3. Предположение неверно: разделитель принимает полезную нагрузку, подобную коллекции, и выдает свои элементы один за другим. Агрегатор делает обратное. И они оба точно подходят для вашей задачи. Пожалуйста, еще раз ознакомьтесь с документами об этих компонентах
4. Я написал пользовательский разделитель, который похож на разделитель файлов, но вместо разделения данных на итератор<Объект>, он разбивает поток файлов на пакеты с помощью разделителя. Возвращаемый тип пользовательского разделителя-Поток<Список<?><?>> . Можем ли мы иметь поддержку для предоставления размера сообщения в разделителе файлов, т. Е. Разделителе файлов(логический итератор, логические маркеры, логические маркеры JSON, размер пакета int)?
5. Нет, потому что это злоупотребление шаблоном разделителя. Агрегатор предназначен для пакетной обработки. Цель на самом деле состоит в том, чтобы прочитать файл строка за строкой. В большинстве случаев для файлов CVS. То, о чем вы просите, действительно может быть достигнуто с помощью агрегатора.