Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring batch to aggregate values and write single value

I am using spring batch and I need to achieve the following

  1. Read a csv file which has details like date and amount
  2. Aggregate the sum of all amounts for a same date
  3. Persist one entry with date and the sum

I have used batch in the past and I thought of the following approach. Create a batch with 2 steps.

Step 1:

  1. Reader: Loop through the entire file using FlatFileItemReader
  2. Processor: Populate a map with Key as date and value as amount. If entry is present then get the value and add it to the new value
  3. Writer: No operation writer as I do not wish to write

Step 2:

  1. Reader: Loop through the values of the map
  2. Writer: Persist the values

I was able to acheive step 1 where I populated the Map. This Map has been declared with @JobScope

I am stuck at how do I create the reader for step2 which needs to just read the List of values. I tried ListItemReader but I am not able to access the Map from the ListItemReader.

Please advise a solution or if you have a better approach to tackle this

Thanks

like image 679
Pratik Shelar Avatar asked Jul 22 '15 08:07

Pratik Shelar


1 Answers

Option 1: If your cvs is already sorted by date, you could implement a group reader, which reads lines until a key value changes. After that, the whole group can be passed as one item to the processor.

Such a group reader could look like this:

  private SingleItemPeekableItemReader<I> reader;
  private ItemReader<I> peekReaderDelegate;

  @Override
  public void afterPropertiesSet() throws Exception {
    Assert.notNull(peekReaderDelegate, "The 'itemReader' may not be null");
    this.reader= new SingleItemPeekableItemReader<I>();
    this.reader.setDelegate(peekReaderDelegate);
  }

  @Override
  // GroupDTO is just a simple container. It is also possible to use
  // List<I> instead of GroupDTO<I>
  public GroupDTO<I> read() throws Exception {
    State state = State.NEW; // a simple enum with the states NEW, READING, and COMPLETE
    GroupDTO<I> group = null;
    I item = null;

    while (state != State.COMPLETE) {
      item = reader.read();

      switch (state) {
        case NEW: {
          if (item == null) {
            // end reached
            state = State.COMPLETE;
            break;
          }

          group = new GroupDTO<I>();
          group.addItem(item);
          state = State.READING;
          I nextItem = reader.peek();
          // isGroupBreak returns true, if 'item' and 'nextItem' do NOT belong to the same group
          if (nextItem == null || getGroupBreakStrategy.isGroupBreak(item, nextItem)) {
            state = State.COMPLETE;
          }
          break;
        }
        case READING: {
          group.addItem(item);

          // peek and check if there the peeked entry has a new date
          I nextItem = peekEntry();
          // isGroupBreak returns true, if 'item' and 'nextItem' do NOT belong to the same group
          if (nextItem == null || getGroupBreakStrategy.isGroupBreak(item, nextItem)) {
            state = State.COMPLETE;
          }
          break;
        }
        default: {
          throw new org.springframework.expression.ParseException(groupCounter, "ParsingError: Reader is in an invalid state");
        }
      }
    }

    return group;
  }

You need a SingleItemPeekableItemReader, in order to pre-read the next element. This one wraps your normal reader.

Option 2: Step one is as you have proposed, but simply write a tasklet for step 2. There is no need to use reader-process-writer approach, instead a simple tasklet could be used that writes the content of your map to a file.

Option 3: If you really wanna use a reader-processor-writer approach for step 2, write your own reader that iterates over your map.

something like (I did not test that code):

public class MapReader implements ItemReader {

     private MapContainer container;
     private Iterator<Map.Entry<Date, Integer> mapIterator;

     @PostConstruct
     public void afterPropertiesSet() {
        Assert.notNull(container);
        iterator = container.getMap().entry().iterator;
     }

     public void setMapContainer(MapContainer container) {
         this.container = container;
     }

     public Map.Entry<Date, Integer> read() {
        if (iterator.hasNext()) {
           return iterator.next();
        }
        return null;
      }
}

@Component
public class MapContainer {
    private Map<Date, Integer> data = new Hashmap<>();

    public Map<Date, Integer> getMap() {
        return data;
    }

    // add modifier method as needed for step 1

}

so, you create a single spring-bean instance for the Container, inject it in your processor of step 2, fill it there, also inject it in the reader above.

like image 71
Hansjoerg Wingeier Avatar answered Oct 11 '22 14:10

Hansjoerg Wingeier