Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring batch Job read from multiple sources

How can I read items from multiples databases? I already know that is possible from files.
the following example works for read from multiples files

...
<job id="readMultiFileJob" xmlns="http://www.springframework.org/schema/batch">
    <step id="step1">
    <tasklet>
        <chunk reader="multiResourceReader" writer="flatFileItemWriter"
            commit-interval="1" />
    </tasklet>
    </step>
</job>
...
<bean id="multiResourceReader"
    class=" org.springframework.batch.item.file.MultiResourceItemReader">
    <property name="resources" value="file:csv/inputs/domain-*.csv" />
    <property name="delegate" ref="flatFileItemReader" />
</bean>
...

three beans like this.

<bean id="database2" class="org.springframework.batch.item.database.JdbcCursorItemReader">
    <property name="name" value="database2Reader" />
    <property name="dataSource" ref="dataSource2" />
    <property name="sql" value="select image from object where image like '%/images/%'" />
    <property name="rowMapper">
        <bean class="sym.batch.ImagesRowMapper2" />
    </property>
</bean>
like image 901
xedo Avatar asked Jan 23 '14 09:01

xedo


People also ask

How do I read multiple files in spring batch?

You need to use MultiResourceItemReader to read lines from CSV file. It reads items from multiple resources sequentially. FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>(); //Set number of lines to skips.

Can a spring batch have multiple jobs?

Multiple jobs can be run simultaneously. There are two main types of Spring Batch Parallel Processing: Single Process, Multi-threaded, or Multi-process.

How do I read multiple CSV files in spring boot?

The csv file can be read in the spring boot batch application using the ItemReader implemented java class FlatFileItemReader. In the spring boot batch application, the FlatFileItemReader class reads the csv file data and converts it to an object.

Can we skip processor in spring batch?

Using skip and skipLimit. First of all, to enable skip functionality, we need to include a call to faultTolerant() during the step-building process. Within skip() and skipLimit(), we define the exceptions we want to skip and the maximum number of skipped items.


1 Answers

There isn't a ready-to-use component that perform what you ask; the only solution is to write a custom ItemReader<> that delegates to JdbcCursorItemReader (or to HibernateCursorItemReader or to any generic ItemReader implementation).
You need to prepare all necessary stuff (datasource, session, real database readers) and bind all delegated readers to your custom reader.

EDIT: You need to simulate a loop using recusion of ItemReader.read() and mantain reader and delegates state across job restarts.

class MyItemReader<T> implements ItemReader<T>, ItemStream {
  private ItemReader[] delegates;
  private int delegateIndex;
  private ItemReader<T> currentDelegate;
  private ExecutionContext stepExecutionContext;

  public void setDelegates(ItemReader[] delegates) {
    this.delegates = delegates;
  }

  @BeforeStep
  private void beforeStep(StepExecution stepExecution) {
    this.stepExecutionContext = stepExecution.getExecutionContext();
  }

  public T read() {
    T item = null;
    if(null != currentDelegate) {
      item = currentDelegate.read();
      if(null == item) {
        ((ItemStream)this.currentDelegate).close();
        this.currentDelegate = null;
      }
    }
    // Move to next delegate if previous was exhausted!
    if(null == item && this.delegateIndex< this.delegates.length) {
      this.currentDelegate = this.delegates[this.currentIndex++];
      ((ItemStream)this.currentDelegate).open(this.stepExecutionContext);
      update(this.stepExecutionContext);
      // Recurse to read() to simulate loop through delegates
      item = read();
    }
    return item;
  }

  public void open(ExecutionContext ctx) {
    // During open restore last active reader and restore its state
    if(ctx.containsKey("index")) {
      this.delegateIndex = ctx.getInt("index");
      this.currentDelegate = this.delegates[this.delegateIndex];
      ((ItemStream)this.currentDelegate ).open(ctx);
    }
  }

  public void update(ExecutionContext ctx) {
    // Update current delegate index and state
    ctx.putInt("index", this.delegateIndex);
    if(null != this.currentDelegate) {
      ((ItemStream)this.currentDelegate).update(ctx);
    }
  }

  public void close(ExecutionContext ctx) {
    if(null != this.currentDelegate) {
      ((ItemStream)this.currentDelegate).close();
  }
}

<bean id="myItemReader" class=path.to.MyItemReader>
  <property name="delegates">
    <array>
      <ref bean="itemReader1"/>
      <ref bean="itemReader2"/>
      <ref bean="itemReader3"/>
    </array>
  </property>
</bean>

EDIT2: Remember to set property name; this is NECESSARY to let MyItemReader.read() works correctly

<bean id="itemReader1" class="JdbcCursorItemReader">
  <property name="name" value="itemReader1" />
  <!-- Set other properties -->
</bean>
like image 189
Luca Basso Ricci Avatar answered Sep 29 '22 20:09

Luca Basso Ricci