Is there a way to change the skipping behavior of the ItemProcessor, to behave like the ItemWriter? Throwing a skipable exception in the ItemProcessor results in reprocessing of all already accepted items, leading to a behavior which is quadratic in its complexity. Is there a way to change the behavior to do a rollback on all Items and process the elements one by one, just like the ItemWriter?
Job definition:
@Bean
public Job job() {
return jobBuilderFactory.get("job").start(step(null)).build();
}
@Bean
public Step step(ReaderProcessorWriter readerProcessorWriter) {
return stepBuilderFactory.get("step")
.<Integer, Integer>chunk(20).faultTolerant()
.skip(RuntimeException.class).skipLimit(10)
.reader(readerProcessorWriter)
.processor(readerProcessorWriter)
.writer(readerProcessorWriter)
.build();
}
Reader, Processor and Writer:
@Component
public static class ReaderProcessorWriter extends ListItemReader<Integer> implements ItemProcessor<Integer, Integer>, ItemWriter<Integer> {
private int run;
public ReaderProcessorWriter() {
super(List.of(0, 1, 2, 3, 4, 5, 6, 7,8 ,9));
}
@Override
public Integer process(Integer integer) {
// Probably some long computation involving lots of DB Reading, lasting minutes at worst
if (integer >= 5) {
System.out.println("FAIL: " + integer);
throw new RuntimeException("Hue hue");
}
System.out.println("OK: " + integer);
return integer;
}
@Override
public void write(List<? extends Integer> list) {
if (run++ == 0) {
throw new RuntimeException("Writer");
}
System.out.println(list);
}
}
Output:
OK: 0
OK: 1
OK: 2
OK: 3
OK: 4
FAIL: 5
OK: 0
OK: 1
OK: 2
OK: 3
OK: 4
FAIL: 6
OK: 0
OK: 1
OK: 2
OK: 3
OK: 4
FAIL: 7
OK: 0
OK: 1
OK: 2
OK: 3
OK: 4
FAIL: 8
OK: 0
OK: 1
OK: 2
OK: 3
OK: 4
FAIL: 9
OK: 0
OK: 1
OK: 2
OK: 3
OK: 4
OK: 0
[0]
OK: 1
[1]
OK: 2
[2]
OK: 3
[3]
OK: 4
[4]
In the example the Items 0-4 are processed 6 times before they are successfully written (assuming the Writer would not throw).
If the ItemProcessor used the same skipping strategy as the ItemWriter, then they would be processed only 2 times:
You could try to configure the fault tolerant processor to be non-transactional. It will then cache the processing result of the successful processed items such that in case of the rollback and during the retry of processing the chunk again , it just get the processing result from the cache instead of reprocessing the successful item again.
@Bean
public Step step(ReaderProcessorWriter readerProcessorWriter) {
return stepBuilderFactory.get("step")
.<Integer, Integer>chunk(20).faultTolerant()
.skip(RuntimeException.class).skipLimit(10)
.reader(readerProcessorWriter)
.processor(readerProcessorWriter)
.processorNonTransactional()
.writer(readerProcessorWriter)
.build();
}
You have to see if it is okay for your use case. When using JPA for processing the items , I feel more safe that in case of the rollback , all entities that are loaded and processed in the rollbacked transaction should be discarded and better not reuse them in another new transaction as the entities will become detached in the new transaction which seems to make things more complicated to me.
If reprocessing of an item takes a considerable amount of time , I will try to see if the bottleneck operation within processing an item can be fine tune such as using cache to cache result of those expansive operations.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With