Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

JSR 352 :How to collect data from the Writer of each Partition of a Partitioned Step?

So, I have 2 partitions in a step which writes into a database. I want to record the number of rows written in each partition, get the sum, and print it to the log;

I was thinking of using a static variable in the Writer and use Step Context/Job Context to get it in afterStep() of the Step Listener. However when I tried it I got null. I am able to get these values in close() of the Reader.

Is this the right way to go about it? Or should I use Partition Collector/Reducer/ Analyzer?

I am using a java batch in Websphere Liberty. And I am developing in Eclipse.

like image 711
Fazil Hussain Avatar asked Feb 06 '23 18:02

Fazil Hussain


1 Answers

I was thinking of using a static variable in the Writer and use Step Context/Job Context to get it in afterStep() of the Step Listener. However when i tried it i got null.

The ItemWriter might already be destroyed at this point, but I'm not sure.

Is this the right way to go about it?

Yes, it should be good enough. However, you need to ensure the total row count is shared for all partitions because the batch runtime maintains a StepContext clone per partition. You should rather use JobContext.

I think using PartitionCollector and PartitionAnalyzer is a good choice, too. Interface PartitionCollector has a method collectPartitionData() to collect data coming from its partition. Once collected, batch runtime passes this data to PartitionAnalyzer to analyze the data. Notice that there're

  • N PartitionCollector per step (1 per partition)
  • N StepContext per step (1 per partition)
  • 1 PartitionAnalyzer per step

The records written can be passed via StepContext's transientUserData. Since the StepContext is reserved for its own step-partition, the transient user data won't be overwritten by other partition.


Here's the implementation :

MyItemWriter :

@Inject
private StepContext stepContext;

@Override
public void writeItems(List<Object> items) throws Exception {
    // ...
    Object userData = stepContext.getTransientUserData();
    stepContext.setTransientUserData(partRowCount);
}

MyPartitionCollector

@Inject
private StepContext stepContext;

@Override
public Serializable collectPartitionData() throws Exception {

    // get transient user data
    Object userData = stepContext.getTransientUserData();
    int partRowCount = userData != null ? (int) userData : 0;
    return partRowCount;
}

MyPartitionAnalyzer

private int rowCount = 0;

@Override
public void analyzeCollectorData(Serializable fromCollector) throws Exception {
    rowCount += (int) fromCollector;
    System.out.printf("%d rows processed (all partitions).%n", rowCount);
}

Reference : JSR352 v1.0 Final Release.pdf

like image 102
Mincong Huang Avatar answered Jun 13 '23 19:06

Mincong Huang