I'm following Spring doc for creating batch service. It implements ItemWriter
using JdbcBatchItemWriter
, so could you please help me write the MongoDb equivalent of following code using MongoItemWriter
?
I found two tutorials using MongoDb, but they use XML files to define beans & seem outdated.
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
// tag::readerwriterprocessor[]
@Bean
public ItemWriter<Person> writer(DataSource dataSource) {
JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
writer.setSql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)");
writer.setDataSource(dataSource);
return writer;
}
// end::readerwriterprocessor[]
@Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
}
Advertisements. An Item Reader reads data into the spring batch application from a particular source, whereas an Item Writer writes data from Spring Batch application to a particular destination. An Item processor is a class which contains the processing code which processes the data read in to the spring batch.
Specify the MongoDB document's primary key _ id using the @Id annotation. If we don't specify anything, MongoDB will generate an _ id field while creating the document.
MongoDB stores data in collections. Spring Data MongoDB maps the Customer class into a collection called customer . If you want to change the name of the collection, you can use Spring Data MongoDB's @Document annotation on the class.
package hello;
import com.mongodb.DBAddress;
import com.mongodb.MongoClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.data.MongoItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.mongodb.MongoDbFactory;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.SimpleMongoDbFactory;
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
// tag::readerwriterprocessor[]
@Bean
public ItemReader<Person> reader() {
FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>();
reader.setResource(new ClassPathResource("sample-data.csv"));
reader.setLineMapper(new DefaultLineMapper<Person>() {{
setLineTokenizer(new DelimitedLineTokenizer() {{
setNames(new String[] { "firstName", "lastName" });
}});
setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
setTargetType(Person.class);
}});
}});
return reader;
}
@Bean
public ItemProcessor<Person, Person> processor() {
return new PersonItemProcessor();
}
@Bean
public ItemWriter<Record> writer() {
MongoItemWriter<Record> writer = new MongoItemWriter<Record>();
try {
writer.setTemplate(mongoTemplate());
} catch (Exception e) {
log.error(e.toString());
}
writer.setCollection("people");
return writer;
}
// end::readerwriterprocessor[]
// tag::jobstep[]
@Bean
public Job importUserJob(JobBuilderFactory jobs, Step s1, JobExecutionListener listener) {
return jobs.get("importUserJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(s1)
.end()
.build();
}
@Bean
public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader,
ItemWriter<Person> writer, ItemProcessor<Person, Person> processor) {
return stepBuilderFactory.get("step1")
.<Person, Person> chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
// end::jobstep[]
@Bean
public MongoDbFactory mongoDbFactory() throws Exception {
return new SimpleMongoDbFactory(new MongoClient(), "db-name");
}
@Bean
public MongoTemplate mongoTemplate() throws Exception {
MongoTemplate mongoTemplate = new MongoTemplate(mongoDbFactory());
return mongoTemplate;
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With