Batch Processing Large Data Sets With Spring Boot and Spring Batch
In this article, I am going to demonstrate batch processing using one of the projects of Spring which is Spring Batch. Spring Batch provides functions for processing large volumes of data in batch jobs. This includes logging, transaction management, job restart (if a job is not completed), job skip, job processing statistics, and resource management.
Let us look at how Spring Batch works in a nutshell.
A step is an object that encapsulates a sequential phase of a job and holds all the necessary information to define and control processing. It delegates all the information to a Job to carry out its task.
Spring Batch uses chunk oriented style of processing which is reading data one at a time, and creating chunks that will be written out within a transaction. The item is read by ItemReader and passed onto ItemProcessor, then it is written out by ItemWriter once the item is ready. The Job Repository will be used to store the step execution periodically during the item processing.
Let's get into coding.
Setting Up the Project
Create a sample Spring Boot application. Here is my sample project structure.
In this article, I will be using sample data which represents voltage drop for a discharging Capacitor. We will read this data from a CSV file and write it out to an in-memory database which is H2.
Add the required dependencies to pom.xml.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
The CSV file Volts.csv contains two fields volt and time. Let us create a JPA entity called Voltage. Note that this entity is just for the example. It is not production-ready code.
package com.techshard.batch.dao.entity;
import javax.persistence.*;
import javax.validation.constraints.NotNull;
import java.math.BigDecimal;
@Entity
public class Voltage {
@Id
@Column (name = "ID", nullable = false)
@GeneratedValue (strategy = GenerationType.IDENTITY)
private long id;
@NotNull
@Column (name = "volt", precision = 10, scale = 4, nullable = false)
private BigDecimal volt;
@NotNull
@Column (name = "time", nullable = false)
private double time;
public Voltage() {
}
public Voltage(final BigDecimal volt, final double time) {
this.volt = volt;
this.time = time;
}
public long getId(){
return id;
}
public BigDecimal getVolt(){
return volt;
}
public void setVolt(final BigDecimal volt){
this.volt = volt;
}
public double getTime(){
return time;
}
public void setTime(final double time){
this.time = time;
}
}
Batch Configuration
Let's create a batch configuration class:
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
}
@EnableBatchProcessing enables Spring Batch features and provides a base configuration for setting up batch jobs in an @Configuration class.
We need to include two components in the above class.
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
JobBuilderFactory creates a job builder. Using StepBuilderFactory, Spring Batch will create a step builder and will initialize its job repository and transaction manager.
Configuring ItemReader
We will now define ItemReader interface for our model Voltage which will be used for reading data from CSV file.
@Bean
public FlatFileItemReader<Voltage> reader() {
return new FlatFileItemReaderBuilder<Voltage>()
.name("voltItemReader")
.resource(new ClassPathResource("Volts.csv"))
.delimited()
.names(new String[]{"volt", "time"})
.lineMapper(lineMapper())
.fieldSetMapper(new BeanWrapperFieldSetMapper<Voltage>() {{
setTargetType(Voltage.class);
}})
.build();
}
Here, we are creating FlatFileItemReaderBuilder of model Voltage.
name - Name of the ItemReader
resource - Specify path for the resource file to be read.
delimited - Builds delimited tokenizer.
names - Pass the fields that are to be read
lineMapper - Interface to map lines from file to domain object.
fieldSetMapper - Interface to map data obtained from a fieldset to an object.
Note that, we have passed custom lineMapper() above. Let us define that bean.
@Bean
public LineMapper<Voltage> lineMapper() {
final DefaultLineMapper<Voltage> defaultLineMapper = new DefaultLineMapper<>();
final DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
lineTokenizer.setDelimiter(";");
lineTokenizer.setStrict(false);
lineTokenizer.setNames(new String[] {"volt","time"});
final VoltageFieldSetMapper fieldSetMapper = new VoltageFieldSetMapper();
defaultLineMapper.setLineTokenizer(lineTokenizer);
defaultLineMapper.setFieldSetMapper(fieldSetMapper);
return defaultLineMapper;
}
In the custom lineMapper, we can specify the delimiter to be read from CSV file and also used for reading string values into database-specific datatypes. The VoltageFieldSetMapper is defined as follows:
package com.techshard.batch.configuration;
import com.techshard.batch.dao.entity.Voltage;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.stereotype.Component;
@Component
public class VoltageFieldSetMapper implements FieldSetMapper<Voltage> {
@Override
public Voltage mapFieldSet(FieldSet fieldSet) {
final Voltage voltage = new Voltage();
voltage.setVolt(fieldSet.readBigDecimal("volt"));
voltage.setTime(fieldSet.readDouble("time"));
return voltage;
}
}
Configuring ItemProcessor
We will define the processor in Batch configuration as follows:
@Bean
public VoltageProcessor processor() {
return new VoltageProcessor();
}
We have defined a custom processor VoltageProcessor. Once the data is read, this processor is used for processing the data such as data conversion, applying business logic and so on. This is just an example. This custom processor may not always be required. It can be defined depending on your application requirements.
package com.techshard.batch.configuration;
import com.techshard.batch.dao.entity.Voltage;
import org.springframework.batch.item.ItemProcessor;
import java.math.BigDecimal;
public class VoltageProcessor implements ItemProcessor<Voltage, Voltage>{
@Override
public Voltage process(final Voltage voltage) {
final BigDecimal volt = voltage.getVolt();
final double time = voltage.getTime();
final Voltage processedVoltage = new Voltage();
processedVoltage.setVolt(volt);
processedVoltage.setTime(time);
return processedVoltage;
}
}
ItemWriter
Once the data is processed, the data needs to be stored in a database as per our requirement. We will define a JdbcBatchWriter to insert data into a database table. There is also JPA specific JpaItemWriter which can be used with EntityManager.
@Bean public JdbcBatchItemWriter<Voltage> writer(final DataSource dataSource) { return new JdbcBatchItemWriterBuilder<Voltage>() .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()) .sql("INSERT INTO voltage (volt, time) VALUES (:volt, :time)") .dataSource(dataSource) .build(); }
Job and Step Configuration
We will now define a Step which will contain a reader, processor, and writer in the same way we need a StepBuilderFactory, which will be used to inject in our Job() method.
@Bean public Step step1(JdbcBatchItemWriter<Voltage> writer) { return stepBuilderFactory.get("step1") .<Voltage, Voltage> chunk(10) .reader(reader()) .processor(processor()) .writer(writer) .build(); }
Here, step1 is just a name of the Step which we can define. We can also specify chunk size in Step configuration.
Finally, a Job is defined as follows:
@Bean public Job importVoltageJob(NotificationListener listener, Step step1) { return jobBuilderFactory.get("importVoltageJob") .incrementer(new RunIdIncrementer()) .listener(listener) .flow(step1) .end() .build(); }
Note that we have passed NotificationListener that extends Spring Batch's JobExecutionListenerSupport. It can log results before or after job execution. Here, we have only defined afterJob(). JobExecutionListenerSupport also provides beforeJob() to log any information before the job execution.
package com.techshard.batch.configuration; import com.techshard.batch.dao.entity.Voltage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.listener.JobExecutionListenerSupport; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Component; @Component public class NotificationListener extends JobExecutionListenerSupport{ private static final Logger LOGGER = LoggerFactory.getLogger(NotificationListener.class); private final JdbcTemplate jdbcTemplate; @Autowired public NotificationListener(final JdbcTemplate jdbcTemplate) { this.jdbcTemplate = jdbcTemplate; } @Override public void afterJob(final JobExecution jobExecution) { if(jobExecution.getStatus() == BatchStatus.COMPLETED) { LOGGER.info("!!! JOB FINISHED! Time to verify the results"); jdbcTemplate.query("SELECT volt, time FROM voltage", (rs, row) -> new Voltage( rs.getBigDecimal(1), rs.getDouble(2)) ).forEach(voltage -> LOGGER.info("Found <" + voltage + "> in the database.")); } } }
Before we run the application, we will enable H2 (in-memory) console in application.properties.
spring.datasource.url=jdbc:h2:mem:batchdb spring.datasource.driverClassName=org.h2.Driver spring.datasource.username=sa spring.datasource.password=password spring.jpa.database-platform=org.hibernate.dialect.H2Dialect spring.h2.console.enabled=true
Additionally, I have also configured Aspect using Spring AOP to measure the time taken by batch execution.
package com.techshard.batch; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.reflect.MethodSignature; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @Aspect @Component public class TracePerformanceAspect { private final Logger logger = LoggerFactory.getLogger(TracePerformanceAspect.class); @Around ("execution(* com.techshard..*.*(..)))") public Object logTracePerformanceAspect(ProceedingJoinPoint joinPoint) throws Throwable { MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature(); //Get intercepted method details String className = methodSignature.getDeclaringType().getSimpleName(); String methodName = methodSignature.getName(); long start = System.currentTimeMillis(); Object result = joinPoint.proceed(); long end = System.currentTimeMillis(); //Log method execution time logger.info("Execution time of " + className + "." + methodName + " :: " + (end - start) + " ms"); return result; } }
Running the Application
Run the Spring Boot application. Once the application is started, login to H2 console using link http://localhost:8080/h2-console/ . Then, you will get a login screen as below.
Once we login, we will be able to see the table Voltage and all the tables created by Spring Batch. In these tables, we will find all the details about job execution such as job name, status, id and so on.
Conclusion
This article just scratched the surface of Spring Batch in general. The example used in this article is not production-ready code. You can define job configuration depending on your project requirements. I hope you enjoyed this article. Let me know if you have any comments or suggestions.
The complete code can be found on my GitHub repository.