Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring Batch: One reader, multiple processors and writers

In Spring batch I need to pass the items read by an ItemReader to two different processors and writer. What I'm trying to achieve is that...

                         +---> ItemProcessor#1 ---> ItemWriter#1                         | ItemReader ---> item ---+                         |                         +---> ItemProcessor#2 ---> ItemWriter#2 

This is needed because items written by ItemWriter#1 should be processed in a completely different way compared to the ones written by ItemWriter#2. Moreover, ItemReader reads item from a database, and the queries it executes are so computational expensive that executing the same query twice should be discarded.

Any hint about how to achieve such set up ? Or, at least, a logically equivalent set up ?

like image 543
danidemi Avatar asked Sep 25 '13 08:09

danidemi


People also ask

How reader processor and writer works in Spring Batch?

An Item Reader reads data into the spring batch application from a particular source, whereas an Item Writer writes data from Spring Batch application to a particular destination. An Item processor is a class which contains the processing code which processes the data read in to the spring batch.

Is spring batch single threaded?

Multithreaded steps. By default, Spring Batch uses the same thread to execute a batch job from start to finish, meaning that everything runs sequentially. Spring Batch also allows multithreading at the step level. This makes it possible to process chunks using several threads.

Does spring batch run in parallel?

Spring Batch Parallel Processing is each chunk in its own thread by adding a task executor to the step. If there are a million records to process and each chunk is 1000 records, and the task executor exposes four threads, you can handle 4000 records in parallel instead of 1000 records.


2 Answers

This solution is valid if your item should be processed by processor #1 and processor #2

You have to create a processor #0 with this signature:

class Processor0<Item, CompositeResultBean> 

where CompositeResultBean is a bean defined as

class CompositeResultBean {   Processor1ResultBean result1;   Processor2ResultBean result2; } 

In your Processor #0 just delegate work to processors #1 and #2 and put result in CompositeResultBean

CompositeResultBean Processor0.process(Item item) {   final CompositeResultBean r = new CompositeResultBean();   r.setResult1(processor1.process(item));   r.setResult2(processor2.process(item));   return r; } 

Your own writer is a CompositeItemWriter that delegate to writer CompositeResultBean.result1 or CompositeResultBean.result2 (look at PropertyExtractingDelegatingItemWriter, maybe can help)

like image 154
Luca Basso Ricci Avatar answered Oct 05 '22 07:10

Luca Basso Ricci


I followed Luca's suggestion to use PropertyExtractingDelegatingItemWriter as writer and I was able to work with two different entities in one single step.

First of all what I did was to define a DTO that stores the two entities/results from the processor

public class DatabaseEntry {     private AccessLogEntry accessLogEntry;     private BlockedIp blockedIp;      public AccessLogEntry getAccessLogEntry() {         return accessLogEntry;     }      public void setAccessLogEntry(AccessLogEntry accessLogEntry) {         this.accessLogEntry = accessLogEntry;     }      public BlockedIp getBlockedIp() {         return blockedIp;     }      public void setBlockedIp(BlockedIp blockedIp) {         this.blockedIp = blockedIp;     } } 

Then I passed this DTO to the writer, a PropertyExtractingDelegatingItemWriter class where I define two customized methods to write the entities into the database, see my writer code below:

@Configuration public class LogWriter extends LogAbstract {     @Autowired     private DataSource dataSource;      @Bean()     public PropertyExtractingDelegatingItemWriter<DatabaseEntry> itemWriterAccessLogEntry() {         PropertyExtractingDelegatingItemWriter<DatabaseEntry> propertyExtractingDelegatingItemWriter = new PropertyExtractingDelegatingItemWriter<DatabaseEntry>();         propertyExtractingDelegatingItemWriter.setFieldsUsedAsTargetMethodArguments(new String[]{"accessLogEntry", "blockedIp"});         propertyExtractingDelegatingItemWriter.setTargetObject(this);         propertyExtractingDelegatingItemWriter.setTargetMethod("saveTransaction");         return propertyExtractingDelegatingItemWriter;     }      public void saveTransaction(AccessLogEntry accessLogEntry, BlockedIp blockedIp) throws SQLException {         writeAccessLogTable(accessLogEntry);         if (blockedIp != null) {             writeBlockedIp(blockedIp);         }      }      private void writeBlockedIp(BlockedIp entry) throws SQLException {         PreparedStatement statement = dataSource.getConnection().prepareStatement("INSERT INTO blocked_ips (ip,threshold,startDate,endDate,comment) VALUES (?,?,?,?,?)");         statement.setString(1, entry.getIp());         statement.setInt(2, threshold);         statement.setTimestamp(3, Timestamp.valueOf(startDate));         statement.setTimestamp(4, Timestamp.valueOf(endDate));         statement.setString(5, entry.getComment());         statement.execute();     }      private void writeAccessLogTable(AccessLogEntry entry) throws SQLException {         PreparedStatement statement = dataSource.getConnection().prepareStatement("INSERT INTO log_entries (date,ip,request,status,userAgent) VALUES (?,?,?,?,?)");         statement.setTimestamp(1, Timestamp.valueOf(entry.getDate()));         statement.setString(2, entry.getIp());         statement.setString(3, entry.getRequest());         statement.setString(4, entry.getStatus());         statement.setString(5, entry.getUserAgent());         statement.execute();     } } 

With this approach you can get the wanted inital behaviour from a single reader for processing multiple entities and save them in a single step.

like image 43
Juan Pablo G Avatar answered Oct 05 '22 06:10

Juan Pablo G