#apache-camel
#apache-camel
Вопрос:
Я хотел бы разработать маршрут, который опрашивает каталог, содержащий файлы CSV, и для каждого файла он разархивирует каждую строку с помощью Bindy и ставит ее в очередь в activemq.
Проблема в том, что файлы могут быть довольно большими (миллион строк), поэтому я бы предпочел ставить в очередь по одной строке за раз, но я получаю все строки в java.util.ArrayList в конце Bindy, что вызывает проблемы с памятью.
Пока что я провел небольшой тест, и демаршалинг работает, так что конфигурация Bindy с использованием аннотаций в порядке.
Вот маршрут:
from("file://data/inbox?noop=trueamp;maxMessagesPerPoll=1amp;delay=5000")
.unmarshal()
.bindy(BindyType.Csv, "com.ess.myapp.core")
.to("jms:rawTraffic");
Среда: Eclipse Indigo, Maven 3.0.3, Camel 2.8.0
Спасибо
Ответ №1:
Если вы используете разделитель EIP, то вы можете использовать режим потоковой передачи, что означает, что Camel будет обрабатывать файл построчно.
from("file://data/inbox?noop=trueamp;maxMessagesPerPoll=1amp;delay=5000")
.split(body().tokenize("n")).streaming()
.unmarshal().bindy(BindyType.Csv, "com.ess.myapp.core")
.to("jms:rawTraffic");
Комментарии:
1. Спасибо Клаусу за ваш ответ. Теперь я столкнулся с другой проблемой. Следуя моему небольшому упражнению, я пытаюсь извлечь из очереди и записать в файл с помощью
.convertBodyTo(String.class).to("file:data/outbox?fileExist=Append")
, но записывается только первая строка. Все равно, если я использую переопределение параметра file, я получаю только последнюю строку. Есть ли способ записать в файл все строки из CSV-файла?. Спасибо2. Вам нужно указать имя файла .to(«файл: данные / исходящие? fileName=data.csvamp;fileExist=Append»)
3. Добавить
.thread()
после.streaming()
, может ли это быть более эффективным?4. Разделитель EIP имеет встроенную поддержку многопоточности, вы можете обратиться к ExecutorService. Это было бы более идеальным для использования, чем потоки. Но возможно и последнее. Смотрите примеры в Camel docs.
5. Я вижу маркировку новой строкой. Как насчет многострочных строк? Поддерживаются ли?
Ответ №2:
Для записи и для других пользователей, которые, возможно, искали это так же усердно, как и я, между тем, кажется, есть более простой метод, который также хорошо работает с картами использования:
CsvDataFormat csv = new CsvDataFormat()
.setLazyLoad(true)
.setUseMaps(true);
from("file://data/inbox?noop=trueamp;maxMessagesPerPoll=1amp;delay=5000")
.unmarshal(csv)
.split(body()).streaming()
.to("log:mappedRow?multiline=true");
Ответ №3:
Использование как разделителя, так и агрегатора EIPs было бы лучшей стратегией для обработки больших CSV-файлов в Apache Camel. Подробнее об этом читайте в форме составленного процессора сообщений
Вот пример использования Java DSL:
package com.camel;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.dataformat.csv.CsvDataFormat;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.QuoteMode;
public class FileSplitter {
public static void main(String args[]) throws Exception {
CamelContext context = new DefaultCamelContext();
CsvDataFormat csvParser = new CsvDataFormat(CSVFormat.DEFAULT);
csvParser.setSkipHeaderRecord(true);
csvParser.setQuoteMode(QuoteMode.ALL);
context.addRoutes(new RouteBuilder() {
public void configure() {
String fileName = "Hello.csv";
int lineCount = 20;
System.out.println("fileName = " fileName);
System.out.println("lineCount = " lineCount);
from("file:data/inbox?noop=trueamp;fileName=" fileName).unmarshal(csvParser).split(body()).streaming()
.aggregate(constant(true), new ArrayListAggregationStrategy()).completionSize(lineCount)
.completionTimeout(1500).marshal(csvParser)
.to("file:data/outbox?fileName=${file:name.noext}_${header.CamelSplitIndex}.csv");
}
});
context.start();
Thread.sleep(10000);
context.stop();
System.out.println("End");
}
}