Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How does one open a Reader when implementing ItemReader in a Spring Batch project?

In a Spring Batch project I need to compose a record out of multiple lines. I'm implementing ItemReader to accumulate multiple lines before returning an object. After working through several example projects I have pieced this together but I am faced with a ReaderNotOpenException.

I have triple checked the path to the file is correct. When I debug the delegate contains the resource and file path from my configuration file.

Any help appreciated.

Config file:

    <bean id="cvsFileItemReader" class="com.mkyong.XYZFileRecordReader">
    <property name="delegate">
        <bean class="org.springframework.batch.item.file.FlatFileItemReader">
            <property name="resource" value="classpath:ma/report-headeronly.psv" />
            <property name="lineMapper">
                <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
                    <property name="lineTokenizer">
                        <bean
                            class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                            <property name="delimiter" value="|" />
                        </bean>
                    </property>
                    <property name="fieldSetMapper">
                        <bean class="org.springframework.batch.item.file.mapping.PassThroughFieldSetMapper" />
                    </property>
                </bean>
            </property>
        </bean>
    </property>
</bean>

My Reader:

package com.mkyong;

import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.transform.FieldSet;

import com.mkyong.model.XYZFileHeaderRecord;

public class XYZFileRecordReader implements ItemReader<XYZFileHeaderRecord>, ItemStream {

private FlatFileItemReader<FieldSet> delegate;

@Override
public XYZFileHeaderRecord read() throws Exception,
        UnexpectedInputException, ParseException,
        NonTransientResourceException {

    XYZFileHeaderRecord maFileHeaderRecord = new XYZFileHeaderRecord();

    for (FieldSet line = null; (line = this.delegate.read()) != null;) {
        String firstToken = line.readString(0);
        if (firstToken.equals("File ID")) {
            maFileHeaderRecord.setFileName( line.readString(1) );
        } else if (firstToken.equals("Date")) {
            maFileHeaderRecord.setDate( line.readString(1) );
            return maFileHeaderRecord;
        }
    }

    return null;
}

@Override
public void close() throws ItemStreamException {}

@Override
public void open(ExecutionContext arg0) throws ItemStreamException {}

@Override
public void update(ExecutionContext arg0) throws ItemStreamException {}

public FlatFileItemReader<FieldSet> getDelegate() {
    return delegate;
}

public void setDelegate(FlatFileItemReader<FieldSet> delegate) {
    this.delegate = delegate;
}
}

And my stacktrace:

SEVERE: Encountered an error executing the step
org.springframework.batch.item.ReaderNotOpenException: Reader must be open before it can be read.
    at org.springframework.batch.item.file.FlatFileItemReader.readLine(FlatFileItemReader.java:195)
    at org.springframework.batch.item.file.FlatFileItemReader.doRead(FlatFileItemReader.java:173)
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:83)
    at com.mkyong.XYZFileRecordReader.read(XYZFileRecordReader.java:26)
    at com.mkyong.XYZFileRecordReader.read(XYZFileRecordReader.java:1)
    at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:91)
    at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:155)
    at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:114)
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:368)
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:144)
    at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:108)
    at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:69)
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:395)
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:131)
    at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:267)
    at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:77)
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:368)
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:144)
    at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:253)
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:195)
    at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:137)
    at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:64)
    at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:60)
    at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:152)
    at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:131)
    at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:135)
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:301)
    at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:134)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:49)
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:127)
    at com.mkyong.App.main(App.java:27)
Apr 25, 2014 5:35:56 PM org.springframework.batch.core.launch.support.SimpleJobLauncher$1 run
INFO: Job: [FlowJob: [name=reportJob]] completed with the following parameters: [{}] and the following status: [FAILED]
Exit Status : FAILED
Done
like image 454
yamori Avatar asked Apr 25 '14 21:04

yamori


2 Answers

Your delegate isn't getting opened. The easiest way to address this is to update the open, close, and update methods to call the corresponding methods on the delegate as well. This also allows for restartability (which your current version would not because the state of the delegate is not being saved):

@Override
public void close() throws ItemStreamException {
   delegate.close();
}

@Override
public void open(ExecutionContext arg0) throws ItemStreamException {
    delegate.open(arg0);
}

@Override
public void update(ExecutionContext arg0) throws ItemStreamException {
    delegate.update(arg0);
}

The alternative is to register your FlatFileItemReader as a stream in your step. You'll have to pull it out to a separate bean definition if you want to go that route.

You can read more about ItemStreams and how their lifecycle works and how it is impacted via delegation here: http://docs.spring.io/spring-batch/reference/html-single/index.html#itemStream

like image 53
Michael Minella Avatar answered Oct 05 '22 00:10

Michael Minella


You have to call delegate.open() to perform the open of real reader. Or you can register delegate reader as streaming to let SB manage the delegate reader stream lifecycle (read chapter 6.5)

like image 36
Luca Basso Ricci Avatar answered Oct 04 '22 23:10

Luca Basso Ricci