#spring #spring-batch
#spring #пакет spring-batch
Вопрос:
У меня есть плоский файл содержимого фиксированной длины, который содержит примеры записей, как показано ниже, и не имеет разделителя, поэтому он содержит специальные шестнадцатеричные символы, и данные также распределены по нескольким строкам. Но каждая строка имеет постоянные 2000 байт / символов, и мне нужно продолжать выбирать байты из 1-2000
, 2001-4000
и так далее. Я исправил символы индекса.
Примечание — Здесь я не хочу читать все символы из 2000 строк, просто хотел прочитать на основе диапазона.
Customer.java
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
public class Customer {
private String firstValue;
private String secondValue;
private String thirdValue;
private String fourthValue;
}
Ошибка
Java Bean
@Bean
public FlatFileItemReader<Customer> customerItemReader(){
return new FlatFileItemReaderBuilder<Customer>()
.name("customerItemReader")
.linesToSkip(1)
.resource(new ClassPathResource("/data/test.conv"))
.fixedLength()
.columns(new Range[] { new Range(3, 6), new Range(7, 13), new Range(14, 15), new Range(14, 15) })
.names(new String[] { "firstValue", "secondValue", "thirdValue", "fourthValue" })
.targetType(Customer.class)
.build();
}
Ошибка
org.springframework.batch.item.file.FlatFileParseException: Parsing error at line: 2 in resource=[class path resource [data/test.conv]], input=[560000000000411999999992052300000000D 0000 0000000000010000000100000040000000000000 00000000 NYNNVX N N 0 N004 000100000001000100000001000100000001000100000001000100000001000100000001000100000001 YNYNYYNNNNNYNNNN0004000000070000000300010000000000000000000000020000000000000000NN1N N00NNNND 001NNN 00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000O840000000000000000000AN0201000000NNNC840 N N00N A NN00400000000NNNNNUSAN NNNN00000000000000NN141900INNNNNN N N000000 NN 200//0055//20000YNN MO ¶200528000000 !!B3K555800000001A****00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 0005230000000000000000 ]
at org.springframework.batch.item.file.FlatFileItemReader.doRead(FlatFileItemReader.java:189) ~[spring-batch-infrastructure-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:93) ~[spring-batch-infrastructure-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:99) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:180) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:126) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:118) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:71) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) [spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:410) [spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136) [spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319) [spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147) [spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) [spring-core-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:140) [spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_171]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_171]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_171]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_171]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) [spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) [spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) [spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) [spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) [spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at com.sun.proxy.$Proxy57.run(Unknown Source) [na:na]
at com.example.DatabaseOutputApplication.run(DatabaseOutputApplication.java:39) [classes/:na]
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:784) [spring-boot-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:768) [spring-boot-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:322) [spring-boot-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) [spring-boot-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) [spring-boot-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at com.example.DatabaseOutputApplication.main(DatabaseOutputApplication.java:29) [classes/:na]
Caused by: org.springframework.batch.item.file.transform.IncorrectLineLengthException: Line is longer than max range 15
at org.springframework.batch.item.file.transform.FixedLengthTokenizer.doTokenize(FixedLengthTokenizer.java:113) ~[spring-batch-infrastructure-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.item.file.transform.AbstractLineTokenizer.tokenize(AbstractLineTokenizer.java:130) ~[spring-batch-infrastructure-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.item.file.mapping.DefaultLineMapper.mapLine(DefaultLineMapper.java:43) ~[spring-batch-infrastructure-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.item.file.FlatFileItemReader.doRead(FlatFileItemReader.java:185) ~[spring-batch-infrastructure-4.2.2.RELEASE.jar:4.2.2.RELEASE]
Я также пробовал это
@Bean
public FlatFileItemReader<Customer> customerItemReader(){
FixedLengthTokenizer tokenizer = new FixedLengthTokenizer();
tokenizer.setNames("firstValue", "secondValue", "thirdValue", "fourthValue", "fifthValue", "sixthValue", "seventhValue", "eighthValue", "ninethValue", "dummyRange");
tokenizer.setColumns(
new Range(3, 6), new Range(7, 13), new Range(14,15), new Range(16,24), new Range(25, 28), new Range(29,32), new Range(33, 36), new Range(1322, 1324),
new Range(1406, 1408), new Range(1409));
DefaultLineMapper<Customer> customerLineMapper = new DefaultLineMapper<>();
customerLineMapper.setLineTokenizer(tokenizer);
customerLineMapper.setFieldSetMapper(new CustomerFieldSetMapper());
customerLineMapper.afterPropertiesSet();
FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
reader.setLinesToSkip(1);
reader.setResource(new ClassPathResource("/data/test.conv"));
reader.setLineMapper(customerLineMapper);
reader.setStrict(false);
return reader;
}
Это решение не работает, когда у вас нет разделителя, а ваши данные распределены по нескольким строкам. здесь индекс 1406 column присутствует в другой строке, а разделитель сгенерирован в файле мейнфреймом. Пожалуйста, укажите здесь.
Комментарии:
1. Я думаю, что в этом случае вам нужен пользовательский ридер. У Майкла есть образец приема мэйнфрейма здесь: github.com/mminella/mainframeingestion что может помочь.
2. @MahmoudBenHassine — Конечно, я посмотрю репо Майкла, однако я реализовал здесь некоторую логику, не могли бы вы проверить: github.com/mminella/scaling-demos/issues/7 ?
Ответ №1:
Основная проблема здесь в том, что FlatFileItemReader
предполагается, что у вас есть разрывы строк, которых у вас нет. Самое простое решение для меня — скопировать / вставить класс и заменить readLine()
метод на тот, который принимает соответствующее количество символов. К сожалению, поскольку большая часть класса является частной, вы не можете легко расширить и переопределить.
package org.springframework.batch.item.file;
import java.io.BufferedReader;
import java.io.IOException;
import java.nio.charset.Charset;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.item.ReaderNotOpenException;
import org.springframework.batch.item.file.separator.RecordSeparatorPolicy;
import org.springframework.batch.item.file.separator.SimpleRecordSeparatorPolicy;
import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.io.Resource;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.StringUtils;
/**
* Modified version of {@link FlatFileItemWriter} which reads in mainframe style files with fixed width and no line breaks.
* Still a restartable {@link ItemReader} that reads lines from input {@link #setResource(Resource)}. Line is defined by the
* {@link #setRecordSeparatorPolicy(RecordSeparatorPolicy)} and mapped to item using {@link #setLineMapper(LineMapper)}.
* If an exception is thrown during line mapping it is rethrown as {@link FlatFileParseException} adding information
* about the problematic line and its line number.
*
* @author Robert Kasanicky
* @author Dean Clark
*/
public class FixedLengthFlatFileItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements ResourceAwareItemReaderItemStream<T>, InitializingBean {
private static final Log logger = LogFactory.getLog(FlatFileItemReader.class);
// default encoding for input files
public static final String DEFAULT_CHARSET = Charset.defaultCharset().name();
private RecordSeparatorPolicy recordSeparatorPolicy = new SimpleRecordSeparatorPolicy();
private Resource resource;
private BufferedReader reader;
private int lineCount = 0;
private String[] comments = new String[] { "#" };
private boolean noInput = false;
private String encoding = DEFAULT_CHARSET;
private LineMapper<T> lineMapper;
private int linesToSkip = 0;
private LineCallbackHandler skippedLinesCallback;
private boolean strict = true;
private BufferedReaderFactory bufferedReaderFactory = new DefaultBufferedReaderFactory();
// CHANGE: Added a variable to store Line Length
private Integer lineLength;
public FixedLengthFlatFileItemReader() {
setName(ClassUtils.getShortName(FlatFileItemReader.class));
}
/**
* In strict mode the reader will throw an exception on
* {@link #open(org.springframework.batch.item.ExecutionContext)} if the input resource does not exist.
* @param strict <code>true</code> by default
*/
public void setStrict(final boolean strict) {
this.strict = strict;
}
/**
* @param skippedLinesCallback will be called for each one of the initial skipped lines before any items are read.
*/
public void setSkippedLinesCallback(final LineCallbackHandler skippedLinesCallback) {
this.skippedLinesCallback = skippedLinesCallback;
}
/**
* Public setter for the number of lines to skip at the start of a file. Can be used if the file contains a header
* without useful (column name) information, and without a comment delimiter at the beginning of the lines.
*
* @param linesToSkip the number of lines to skip
*/
public void setLinesToSkip(final int linesToSkip) {
this.linesToSkip = linesToSkip;
}
/**
* Setter for line mapper. This property is required to be set.
* @param lineMapper maps line to item
*/
public void setLineMapper(final LineMapper<T> lineMapper) {
this.lineMapper = lineMapper;
}
/**
* Setter for the encoding for this input source. Default value is {@link #DEFAULT_CHARSET}.
*
* @param encoding a properties object which possibly contains the encoding for this input file;
*/
public void setEncoding(final String encoding) {
this.encoding = encoding;
}
/**
* Factory for the {@link BufferedReader} that will be used to extract lines from the file. The default is fine for
* plain text files, but this is a useful strategy for binary files where the standard BufferedReaader from java.io
* is limiting.
*
* @param bufferedReaderFactory the bufferedReaderFactory to set
*/
public void setBufferedReaderFactory(final BufferedReaderFactory bufferedReaderFactory) {
this.bufferedReaderFactory = bufferedReaderFactory;
}
/**
* Setter for comment prefixes. Can be used to ignore header lines as well by using e.g. the first couple of column
* names as a prefix.
*
* @param comments an array of comment line prefixes.
*/
public void setComments(final String[] comments) {
this.comments = new String[comments.length];
System.arraycopy(comments, 0, this.comments, 0, comments.length);
}
/**
* Public setter for the input resource.
*/
@Override
public void setResource(final Resource resource) {
this.resource = resource;
}
/**
* Public setter for the recordSeparatorPolicy. Used to determine where the line endings are and do things like
* continue over a line ending if inside a quoted string.
*
* @param recordSeparatorPolicy the recordSeparatorPolicy to set
*/
public void setRecordSeparatorPolicy(final RecordSeparatorPolicy recordSeparatorPolicy) {
this.recordSeparatorPolicy = recordSeparatorPolicy;
}
/**
* @return string corresponding to logical record according to
* {@link #setRecordSeparatorPolicy(RecordSeparatorPolicy)} (might span multiple lines in file).
*/
@Override
protected T doRead() throws Exception {
if (noInput) {
return null;
}
final String line = readLine();
if (line == null) {
return null;
} else {
try {
return lineMapper.mapLine(line, lineCount);
} catch (final Exception ex) {
throw new FlatFileParseException("Parsing error at line: " lineCount " in resource=["
resource.getDescription() "], input=[" line "]", ex, line, lineCount);
}
}
}
/**
* @return next line (skip comments).getCurrentResource
*/
// CHANGE: Modified readLine() to pull in a set number of characters
private String readLine() {
if (reader == null) {
throw new ReaderNotOpenException("Reader must be open before it can be read.");
}
String line = null;
try {
// CHANGE: READ IN LINE BASED ON LINE LENGTH
final char[] chars = new char[lineLength 1];
final int charsRead = reader.read(chars, 0, lineLength);
if (charsRead <= 10) {
noInput = true;
return null;
}
line = new String(chars);
// END CHANGE: READ IN LINE BASED ON LINE LENGTH
lineCount ;
while (isComment(line)) {
line = reader.readLine();
if (line == null) {
return null;
}
lineCount ;
}
line = applyRecordSeparatorPolicy(line);
} catch (final IOException e) {
// Prevent IOException from recurring indefinitely
// if client keeps catching and re-calling
noInput = true;
throw new NonTransientFlatFileException("Unable to read from resource: [" resource "]", e, line,
lineCount);
}
return line;
}
private boolean isComment(final String line) {
for (final String prefix : comments) {
if (line.startsWith(prefix)) {
return true;
}
}
return false;
}
@Override
protected void doClose() throws Exception {
lineCount = 0;
if (reader != null) {
reader.close();
}
}
@Override
protected void doOpen() throws Exception {
Assert.notNull(resource, "Input resource must be set");
Assert.notNull(recordSeparatorPolicy, "RecordSeparatorPolicy must be set");
noInput = true;
if (!resource.exists()) {
if (strict) {
throw new IllegalStateException("Input resource must exist (reader is in 'strict' mode): " resource);
}
logger.warn("Input resource does not exist " resource.getDescription());
return;
}
if (!resource.isReadable()) {
if (strict) {
throw new IllegalStateException("Input resource must be readable (reader is in 'strict' mode): "
resource);
}
logger.warn("Input resource is not readable " resource.getDescription());
return;
}
reader = bufferedReaderFactory.create(resource, encoding);
for (int i = 0; i < linesToSkip; i ) {
final String line = readLine();
if (skippedLinesCallback != null) {
skippedLinesCallback.handleLine(line);
}
}
noInput = false;
}
@Override
public void afterPropertiesSet() throws Exception {
Assert.notNull(lineMapper, "LineMapper is required");
// CHANGE: Added an assertion to verify Line Length was provided
Assert.notNull(lineLength, "Line length is required");
}
@Override
protected void jumpToItem(final int itemIndex) throws Exception {
for (int i = 0; i < itemIndex; i ) {
readLine();
}
}
private String applyRecordSeparatorPolicy(String line) throws IOException {
String record = line;
while ((line != null) amp;amp; !recordSeparatorPolicy.isEndOfRecord(record)) {
line = this.reader.readLine();
if (line == null) {
if (StringUtils.hasText(record)) {
// A record was partially complete since it hasn't ended but
// the line is null
throw new FlatFileParseException("Unexpected end of file before record complete", record, lineCount);
} else {
// Record has no text but it might still be post processed
// to something (skipping preProcess since that was already
// done)
break;
}
} else {
lineCount ;
}
record = recordSeparatorPolicy.preProcess(record) line;
}
return recordSeparatorPolicy.postProcess(record);
}
// CHANGE: Added a setter for Line Length
public void setLineLength(final Integer lineLength) {
this.lineLength = lineLength;
}
}
Комментарии:
1. Похоже, это единственный оставшийся вариант.
Ответ №2:
Вы можете использовать флаг isStrict, чтобы ознакомиться с этой проблемой — используйте токенизатор отдельно, и ваша проблема будет решена
Я реализовал — fixedLengthTokenizer, как показано ниже
@Value("classpath:mainframe.txt")
private Resource resource;
@Bean
public FlatFileItemReader fixLengthItemReader(){
FlatFileItemReader reader = new FlatFileItemReader();
reader.setResource(resource);
reader.setLineMapper(new DefaultLineMapper() {
{
setLineTokenizer(fixedLengthTokenizer());
setFieldSetMapper(new BeanWrapperFieldSetMapper<Customer>() {
{
setTargetType(Customer.class);
}
});
}
});
return reader;
}
@Bean
public FixedLengthTokenizer fixedLengthTokenizer() {
FixedLengthTokenizer tokenizer = new FixedLengthTokenizer();
tokenizer.setColumns(new Range[] { new Range(3, 6), new Range(7, 13), new Range(14, 15), new Range(14, 15) });
tokenizer.setNames(new String[] { "firstValue", "secondValue", "thirdValue", "fourthValue" });
tokenizer.setStrict(false);
return tokenizer;
}
Примечание — не забудьте установить этот флаг -> tokenizer.setStrict(false);