To make it simple, let's suppose I have an ItemReader that returns me 25 rows.
The first 10 rows belong to student A
The next 5 belong to student B
and the 10 remaining belong to student C
I want to aggregate them together logically say by studentId and flatten them to end up with one row per student.
If I understand correctly, setting the commit interval to 5 will do the following:
If that is true, then for the next five I will have to check the already written ones, get them out aggregate them to the ones that I am currently processing and write them again.
I personally do no like that.
Sometimes I feel that it is much easier to write a regular Spring JDBC main program and then I have full control of what I want to do. However, I wanted to take advantage of of the job repository state monitoring of the job, ability to restart, skip, job and step listeners....
My module-context.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:batch="http://www.springframework.org/schema/batch"
xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
<description>Example job to get you started. It provides a skeleton for a typical batch application.</description>
<batch:job id="job1">
<batch:step id="step1" >
<batch:tasklet transaction-manager="transactionManager" start-limit="100" >
<batch:chunk reader="attendanceItemReader"
processor="attendanceProcessor"
writer="attendanceItemWriter"
commit-interval="10"
/>
</batch:tasklet>
</batch:step>
</batch:job>
<bean id="attendanceItemReader" class="org.springframework.batch.item.database.JdbcCursorItemReader">
<property name="dataSource">
<ref bean="sourceDataSource"/>
</property>
<property name="sql"
value="select s.student_name ,s.student_id ,fas.attendance_days ,fas.attendance_value from K12INTEL_DW.ftbl_attendance_stumonabssum fas inner join k12intel_dw.dtbl_students s on fas.student_key = s.student_key inner join K12INTEL_DW.dtbl_schools ds on fas.school_key = ds.school_key inner join k12intel_dw.dtbl_school_dates dsd on fas.school_dates_key = dsd.school_dates_key where dsd.rolling_local_school_yr_number = 0 and ds.school_code = ? and s.student_activity_indicator = 'Active' and fas.LOCAL_GRADING_PERIOD = 'G1' and s.student_current_grade_level = 'Gr 9' order by s.student_id"/>
<property name="preparedStatementSetter" ref="attendanceStatementSetter"/>
<property name="rowMapper" ref="attendanceRowMapper"/>
</bean>
<bean id="attendanceStatementSetter" class="edu.kdc.visioncards.preparedstatements.AttendanceStatementSetter"/>
<bean id="attendanceRowMapper" class="edu.kdc.visioncards.rowmapper.AttendanceRowMapper"/>
<bean id="attendanceProcessor" class="edu.kdc.visioncards.AttendanceProcessor" />
<bean id="attendanceItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
<property name="resource" value="file:target/outputs/passthrough.txt"/>
<property name="lineAggregator">
<bean class="org.springframework.batch.item.file.transform.PassThroughLineAggregator" />
</property>
</bean>
</beans>
My supporting classes for the Reader.
A PreparedStatementSetter
package edu.kdc.visioncards.preparedstatements;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import org.springframework.jdbc.core.PreparedStatementSetter;
public class AttendanceStatementSetter implements PreparedStatementSetter {
public void setValues(PreparedStatement ps) throws SQLException {
ps.setInt(1, 7);
}
}
and a RowMapper
package edu.kdc.visioncards.rowmapper;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.springframework.jdbc.core.RowMapper;
import edu.kdc.visioncards.dto.AttendanceDTO;
public class AttendanceRowMapper<T> implements RowMapper<AttendanceDTO> {
public static final String STUDENT_NAME = "STUDENT_NAME";
public static final String STUDENT_ID = "STUDENT_ID";
public static final String ATTENDANCE_DAYS = "ATTENDANCE_DAYS";
public static final String ATTENDANCE_VALUE = "ATTENDANCE_VALUE";
public AttendanceDTO mapRow(ResultSet rs, int rowNum) throws SQLException {
AttendanceDTO dto = new AttendanceDTO();
dto.setStudentId(rs.getString(STUDENT_ID));
dto.setStudentName(rs.getString(STUDENT_NAME));
dto.setAttDays(rs.getInt(ATTENDANCE_DAYS));
dto.setAttValue(rs.getInt(ATTENDANCE_VALUE));
return dto;
}
}
My processor
package edu.kdc.visioncards;
import java.util.HashMap;
import java.util.Map;
import org.springframework.batch.item.ItemProcessor;
import edu.kdc.visioncards.dto.AttendanceDTO;
public class AttendanceProcessor implements ItemProcessor<AttendanceDTO, Map<Integer, AttendanceDTO>> {
private Map<Integer, AttendanceDTO> map = new HashMap<Integer, AttendanceDTO>();
public Map<Integer, AttendanceDTO> process(AttendanceDTO dto) throws Exception {
if(map.containsKey(new Integer(dto.getStudentId()))){
AttendanceDTO attDto = (AttendanceDTO)map.get(new Integer(dto.getStudentId()));
attDto.setAttDays(attDto.getAttDays() + dto.getAttDays());
attDto.setAttValue(attDto.getAttValue() + dto.getAttValue());
}else{
map.put(new Integer(dto.getStudentId()), dto);
}
return map;
}
}
In the Processor, I create a HashMap and as I process the rows I check whether I already have that Student in the Map, if it's not there I add it. If it's already there I grab the it get the values that I am interested in and add them with the row that I am currently processing.
After that, Spring Batch Framework writes to a File according to my configuration
My question is as follows:
Interface ItemReader<T> Strategy interface for providing the data. Implementations are expected to be stateful and will be called multiple times for each batch, with each call to read() returning a different value and finally returning null when all input data is exhausted.
ItemWriter. It is the element of the step of a batch process which writes data. An ItemWriter writes one item a time. Spring Batch provides an Interface ItemWriter. All the writers implement this interface.
The default SQL ItemReader implementations invoke a RowMapper to return objects, keep track of the current row if restart is required, store basic statistics, and provide some transaction enhancements that are explained later.
FlatFileItemWriter is the ItemWriter implementation provided to generate text file output. Similar to FlatFileItemReader in many respects, this class addresses the issues with file-based output in Java with a clean, consistent interface for you to use.
In my application I created a CollectingJdbcCursorItemReader
that extends the standard JdbcCursorItemReader
and performs exactly what you need. Internally it uses my CollectingRowMapper
: an extension of the standard RowMapper
that maps multiple related rows to one object.
Here is the code of the ItemReader, the code of CollectingRowMapper
interface, and an abstract implementation of it, is available in another answer of mine.
import java.sql.ResultSet;
import java.sql.SQLException;
import org.springframework.batch.item.ReaderNotOpenException;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.jdbc.core.RowMapper;
/**
* A JdbcCursorItemReader that uses a {@link CollectingRowMapper}.
* Like the superclass this reader is not thread-safe.
*
* @author Pino Navato
**/
public class CollectingJdbcCursorItemReader<T> extends JdbcCursorItemReader<T> {
private CollectingRowMapper<T> rowMapper;
private boolean firstRead = true;
/**
* Accepts a {@link CollectingRowMapper} only.
**/
@Override
public void setRowMapper(RowMapper<T> rowMapper) {
this.rowMapper = (CollectingRowMapper<T>)rowMapper;
super.setRowMapper(rowMapper);
}
/**
* Read next row and map it to item.
**/
@Override
protected T doRead() throws Exception {
if (rs == null) {
throw new ReaderNotOpenException("Reader must be open before it can be read.");
}
try {
if (firstRead) {
if (!rs.next()) { //Subsequent calls to next() will be executed by rowMapper
return null;
}
firstRead = false;
} else if (!rowMapper.hasNext()) {
return null;
}
T item = readCursor(rs, getCurrentItemCount());
return item;
}
catch (SQLException se) {
throw getExceptionTranslator().translate("Attempt to process next row failed", getSql(), se);
}
}
@Override
protected T readCursor(ResultSet rs, int currentRow) throws SQLException {
T result = super.readCursor(rs, currentRow);
setCurrentItemCount(rs.getRow());
return result;
}
}
You can use it just like the classic JdbcCursorItemReader
: the only requirement is that you provide it a CollectingRowMapper
instead of the classic RowMapper
.
I always follow this pattern:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With