Spring boot batch is a lightweight, robust framework for autonomously processing batch data without the need for user intervention. The spring boot batch supports two execution methods: Tasklet and Chunk processing. Chunk processing speeds up data processing, reduces database connectivity overhead, and improves network connectivity.
The chunk processing consists of three tasks: ItemReader, ItemProcesser, and ItemWriter. The ItemReader interface reads data from a source and processes it with the ItemProcessor. The data is written in the target by the ItemWriter. When the spring boot batch runs, the reader selects each piece of data, processes it, and then writes it to the destination. This process is repeated until the data is ready for use.
The chunk processing method allows for the processing of a single block of data at a time. If the chunk is specified with a value greater than one, for example, 5, the batch will run the read method 5 times in the ItemReader and process the data 5 times in the ItemProcessor. The ItemWriter’s write method will only be called once, and it will include all 5 process data. All five processed data sets can be committed to a database or transmitted over the network in a single session.
This example is used the spring boot batch classes from the previous post. The basic java classes can. be referred from the link below.
https://www.yawintutor.com/spring-boot-batch-example-step-by-step/
Default Chunk Implementation
The code below illustrates the default Chunk implementation. The chunk is set to 1 by default. The reader selects the data, processes it, and then publishes it to the destination. The three tasks will be executed with each data selection. In this scenario, the database or network connection is opened and closed each time the data is written.
@Bean
public Step createStep() {
return stepBuilderFactory.get("MyStep")
.<String, String> chunk(1)
.reader(new MyCustomReader())
.processor(new MyCustomProcessor())
.writer(new MyCustomWriter())
.build();
}
Complete Spring boot configuration file – SpringConfig.java
package com.yawintutor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Bean
public Job createJob() {
return jobBuilderFactory.get("MyJob")
.incrementer(new RunIdIncrementer())
.flow(createStep()).end().build();
}
@Bean
public Step createStep() {
return stepBuilderFactory.get("MyStep")
.<String, String> chunk(1)
.reader(new MyCustomReader())
.processor(new MyCustomProcessor())
.writer(new MyCustomWriter())
.build();
}
}
Default Chunk logs
The default chunk with value 1 displays the logs as seen below. Every time data is read, processed, and written to a destination. These procedures will be repeated by the log.
2021-07-23 11:15:16.982 INFO 44295 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=MyJob]] launched with the following parameters: [{run.id=4, time=2021-07-22 16:22:57.293}]
2021-07-23 11:15:17.023 INFO 44295 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [MyStep]
MyCustomReader : Reading data : 0 Zero
MyCustomProcessor : Processing data : 0 Zero
MyCustomWriter : Writing data : 0 ZERO
MyCustomWriter : Writing data : completed
MyCustomReader : Reading data : 1 One
MyCustomProcessor : Processing data : 1 One
MyCustomWriter : Writing data : 1 ONE
MyCustomWriter : Writing data : completed
MyCustomReader : Reading data : 2 Two
MyCustomProcessor : Processing data : 2 Two
MyCustomWriter : Writing data : 2 TWO
MyCustomWriter : Writing data : completed
MyCustomReader : Reading data : 3 Three
MyCustomProcessor : Processing data : 3 Three
MyCustomWriter : Writing data : 3 THREE
MyCustomWriter : Writing data : completed
MyCustomReader : Reading data : 4 Four
MyCustomProcessor : Processing data : 4 Four
MyCustomWriter : Writing data : 4 FOUR
MyCustomWriter : Writing data : completed
MyCustomReader : Reading data : 5 Five
MyCustomProcessor : Processing data : 5 Five
MyCustomWriter : Writing data : 5 FIVE
MyCustomWriter : Writing data : completed
2021-07-23 11:15:17.052 INFO 44295 --- [ main] o.s.batch.core.step.AbstractStep : Step: [MyStep] executed in 29ms
2021-07-23 11:15:17.057 INFO 44295 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=MyJob]] completed with the following parameters: [{run.id=4, time=2021-07-22 16:22:57.293}] and the following status: [COMPLETED] in 42ms
Connectivity optimisation using chunk
In the following example, the chunk will be specified with a value larger than 1. Before processing the processor, the ItemReader will call read with the configured times. All read values will be processed by the ItemProcesser. The ItemWriter will be called once and will execute all of the processed data and write it all at once.
@Bean
public Step createStep() {
return stepBuilderFactory.get("MyStep")
.<String, String> chunk(6)
.reader(new MyCustomReader())
.processor(new MyCustomProcessor())
.writer(new MyCustomWriter())
.build();
}
Complete spring configuration file – SpringConfig.java
package com.yawintutor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Bean
public Job createJob() {
return jobBuilderFactory.get("MyJob")
.incrementer(new RunIdIncrementer())
.flow(createStep()).end().build();
}
@Bean
public Step createStep() {
return stepBuilderFactory.get("MyStep")
.<String, String> chunk(6)
.reader(new MyCustomReader())
.processor(new MyCustomProcessor())
.writer(new MyCustomWriter())
.build();
}
}
Logs for the Chunk with greater than 1
According to the logs below, the item writer only calls the write method once. The item reader and item process call the read and process method the number of times specified.
2021-07-23 11:19:12.242 INFO 44491 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=MyJob]] launched with the following parameters: [{run.id=5, time=2021-07-22 16:22:57.293}]
2021-07-23 11:19:12.276 INFO 44491 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [MyStep]
MyCustomReader : Reading data : 0 Zero
MyCustomReader : Reading data : 1 One
MyCustomReader : Reading data : 2 Two
MyCustomReader : Reading data : 3 Three
MyCustomReader : Reading data : 4 Four
MyCustomReader : Reading data : 5 Five
MyCustomProcessor : Processing data : 0 Zero
MyCustomProcessor : Processing data : 1 One
MyCustomProcessor : Processing data : 2 Two
MyCustomProcessor : Processing data : 3 Three
MyCustomProcessor : Processing data : 4 Four
MyCustomProcessor : Processing data : 5 Five
MyCustomWriter : Writing data : 0 ZERO
MyCustomWriter : Writing data : 1 ONE
MyCustomWriter : Writing data : 2 TWO
MyCustomWriter : Writing data : 3 THREE
MyCustomWriter : Writing data : 4 FOUR
MyCustomWriter : Writing data : 5 FIVE
MyCustomWriter : Writing data : completed
2021-07-23 11:19:12.295 INFO 44491 --- [ main] o.s.batch.core.step.AbstractStep : Step: [MyStep] executed in 19ms
2021-07-23 11:19:12.301 INFO 44491 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=MyJob]] completed with the following parameters: [{run.id=5, time=2021-07-22 16:22:57.293}] and the following status: [COMPLETED] in 34ms
Chunk Tasks
As previously said, there are three kinds of tasks. ItemReader, ItemProcessor, and ItemWriter. The three classes that have been implemented are given below.
ItemReader Implementation
MyCustomReader.java
package com.yawintutor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
public class MyCustomReader implements ItemReader<String>{
private String[] stringArray = { "Zero", "One", "Two", "Three", "Four", "Five" };
private int index = 0;
@Override
public String read() throws Exception, UnexpectedInputException,
ParseException, NonTransientResourceException {
if (index >= stringArray.length) {
return null;
}
String data = index + " " + stringArray[index];
index++;
System.out.println("MyCustomReader : Reading data : "+ data);
return data;
}
}
ItemProcessor Implementation
MyCustomProcessor.java
package com.yawintutor;
import org.springframework.batch.item.ItemProcessor;
public class MyCustomProcessor implements ItemProcessor<String, String> {
@Override
public String process(String data) throws Exception {
System.out.println("MyCustomProcessor : Processing data : "+data);
data = data.toUpperCase();
return data;
}
}
ItemWriter Implemenation
MyCustomWriter.java
package com.yawintutor;
import java.util.List;
import org.springframework.batch.item.ItemWriter;
public class MyCustomWriter implements ItemWriter<String> {
@Override
public void write(List<? extends String> list) throws Exception {
for (String data : list) {
System.out.println("MyCustomWriter : Writing data : " + data);
}
System.out.println("MyCustomWriter : Writing data : completed");
}
}