The spring boot batch with parallel processing does multiple batch tasks at the same time. If you wish to perform multiple batch tasks concurrently, you must create multiple schedulers to run at the same time. Spring boot batch enables you to execute many tasks concurrently. Parallel processing will help in the execution of several tasks at the same time.
The parallel process executes multiple tasks concurrently and waits for all tasks to be completed before executing the next batch step. This will help in the development of the spring boot batch so that it can be executed efficiently. Spring boot batch tasks can be configured to run in parallel or sequential order depending on the nature of the data and processing needs.
In this post, we’ll look at how to set up the spring boot batch for parallel processing. In the spring boot batch, you may create batch tasks to run concurrently or sequentially. The step-by-step instructions in this post will assist you in achieving concurrent parallel processing of spring boot batch tasks.
Batch configurations
The Flow interface is used in spring boot to achieve parallel processing. The Flow interface specifies how and when the batch steps should be executed. The batch flow allows you to choose the order of the batch steps. The batch flow is contained in the spring boot batch jobs, and the batch flow consists of stages that can be performed in parallel or sequentially. The batch steps include the execution tasks. All of these batch-related objects are configured in the batch configuration.
BatchConfig.java
package com.yawintutor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Bean
public Job createJob() {
Flow flowA = new FlowBuilder<Flow>("flowA").start(createStepA()).build();
Flow flowB = new FlowBuilder<Flow>("flowB").start(createStepB()).build();
Flow flowC = new FlowBuilder<Flow>("flowC").start(createStepC()).build();
Flow flowD = new FlowBuilder<Flow>("flowD").start(createStepD()).build();
Flow parallelFlow = new FlowBuilder<Flow>("parellelFlow").split(new SimpleAsyncTaskExecutor())
.add(flowB, flowC, flowD).build();
return jobBuilderFactory.get("MyJob").incrementer(new RunIdIncrementer()).start(flowA).next(parallelFlow).end()
.build();
}
@Bean
public Step createStepA() {
return stepBuilderFactory.get("MyStepA").<String, String>chunk(1).reader(new MyCustomReader("StepA"))
.processor(new MyCustomProcessor("StepA")).writer(new MyCustomWriter("StepA")).build();
}
@Bean
public Step createStepB() {
return stepBuilderFactory.get("MyStepB").<String, String>chunk(1).reader(new MyCustomReader("StepB"))
.processor(new MyCustomProcessor("StepB")).writer(new MyCustomWriter("StepB")).build();
}
@Bean
public Step createStepC() {
return stepBuilderFactory.get("MyStepC").<String, String>chunk(1).reader(new MyCustomReader("StepC"))
.processor(new MyCustomProcessor("StepC")).writer(new MyCustomWriter("StepC")).build();
}
@Bean
public Step createStepD() {
return stepBuilderFactory.get("MyStepD").<String, String>chunk(1).reader(new MyCustomReader("StepD"))
.processor(new MyCustomProcessor("StepD")).writer(new MyCustomWriter("StepD")).build();
}
}
ItemReader Implementation
The ItemReader interface will help in the creation of Reader objects. The processing data will be retrieved from the source by the reader object. A read method is provided by the ItemReader interface. When the spring boot batch runs, the read method in the ItemReader interface is called. The unique data will be read for batch processing.
MyCustomReader.java
package com.yawintutor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
public class MyCustomReader implements ItemReader<String>{
private String[] stringArray = { "Zero", "One", "Two" };
private int index = 0;
private String name;
public MyCustomReader(String name) {
this.name = name;
}
@Override
public String read() throws Exception, UnexpectedInputException,
ParseException, NonTransientResourceException {
if (index >= stringArray.length) {
return null;
}
String data = index + " " + stringArray[index];
index++;
System.out.println("MyCustomReader : Reading data : "+name+" : "+ data);
return data;
}
}
ItemProcessor Implementation
The ItemProcessor interface will help to implement the Processor objects. The processor object will processing the data that is read from the source. The ItemProcessor interface contains a process method. When the spring boot batch executes, the batch will execute the process method in the ItemProcessor interface.
MyCustomProcessor.java
package com.yawintutor;
import org.springframework.batch.item.ItemProcessor;
public class MyCustomProcessor implements ItemProcessor<String, String> {
private String name;
public MyCustomProcessor(String name) {
this.name = name;
}
@Override
public String process(String data) throws Exception {
System.out.println("MyCustomProcessor : Processing data : "+name+" : "+data);
data = data.toUpperCase();
return data;
}
}
ItemWriter Implementation
The ItemWriter interface will help in the creation of Writer objects. The processing data will be stored in the destination by the writer object. A write method is provided by the ItemWriter interface. When the spring boot batch runs, the write method in the ItemWriter interface is called.
MyCustomWriter.java
package com.yawintutor;
import java.util.List;
import org.springframework.batch.item.ItemWriter;
public class MyCustomWriter implements ItemWriter<String> {
private String name;
public MyCustomWriter(String name) {
this.name = name;
}
@Override
public void write(List<? extends String> list) throws Exception {
for (String data : list) {
System.out.println("MyCustomWriter : Writing data : "+name+" : " + data);
}
System.out.println("MyCustomWriter : Writing data : completed. - "+name);
}
}
Scheduler Configuration
The spring boot scheduler configuration file will run the spring boot batch on a regular basis. When the spring boot program is started, the scheduler will execute the specified method. The method will invoke the JobLauncher class, which will run the spring boot batch.
SchedulerConfig.java
package com.yawintutor;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
@Configuration
@EnableScheduling
public class SchedulerConfig {
@Autowired
JobLauncher jobLauncher;
@Autowired
Job job;
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S");
@Scheduled(fixedDelay = 5000, initialDelay = 5000)
public void scheduleByFixedRate() throws Exception {
System.out.println("Batch job starting");
JobParameters jobParameters = new JobParametersBuilder()
.addString("time", format.format(Calendar.getInstance().getTime())).toJobParameters();
jobLauncher.run(job, jobParameters);
System.out.println("Batch job executed successfully\n");
}
}
Spring Boot Main Method
The spring boot main method will be shown as below. The main method will invoke when the spring boot application starts.
SpringBootBatch3Application.java
package com.yawintutor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringBootBatch3Application {
public static void main(String[] args) {
SpringApplication.run(SpringBootBatch3Application.class, args);
}
}
Application properties file
The application.properties file contains two properties configuration. The first property is for configuring the database url. The second property to enable to create batch related tables in the database.
application.properties
spring.datasource.url=jdbc:h2:file:./DB
spring.batch.initialize-schema=ALWAYS
Pom.xml
The pom.xml file contains spring boot batch dependencies and the database dependencies. The spring boot batch configuration will include all the jars related to spring boot batch.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.yawintutor</groupId>
<artifactId>SpringBootBatch3</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>SpringBootBatch3</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
How to run Spring Boot Batch
If you start the spring boot application, the spring boot scheduler will invoke the spring boot batch code. The JobLauncher class will invoke the spring boot batch jobs. The spring boot steps will execute in parallel. The logs will be shown as below.
2021-07-24 14:27:21.760 INFO 87719 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=MyJob]] launched with the following parameters: [{run.id=2, time=2021-07-23 17:36:15.420}]
2021-07-24 14:27:21.802 INFO 87719 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [MyStepA]
MyCustomReader : Reading data : StepA : 0 Zero
MyCustomProcessor : Processing data : StepA : 0 Zero
MyCustomWriter : Writing data : StepA : 0 ZERO
MyCustomWriter : Writing data : completed. - StepA
MyCustomReader : Reading data : StepA : 1 One
MyCustomProcessor : Processing data : StepA : 1 One
MyCustomWriter : Writing data : StepA : 1 ONE
MyCustomWriter : Writing data : completed. - StepA
MyCustomReader : Reading data : StepA : 2 Two
MyCustomProcessor : Processing data : StepA : 2 Two
MyCustomWriter : Writing data : StepA : 2 TWO
MyCustomWriter : Writing data : completed. - StepA
2021-07-24 14:27:21.823 INFO 87719 --- [ main] o.s.batch.core.step.AbstractStep : Step: [MyStepA] executed in 21ms
2021-07-24 14:27:21.832 INFO 87719 --- [cTaskExecutor-2] o.s.batch.core.job.SimpleStepHandler : Executing step: [MyStepC]
2021-07-24 14:27:21.832 INFO 87719 --- [cTaskExecutor-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [MyStepB]
2021-07-24 14:27:21.833 INFO 87719 --- [cTaskExecutor-3] o.s.batch.core.job.SimpleStepHandler : Executing step: [MyStepD]
MyCustomReader : Reading data : StepB : 0 Zero
MyCustomProcessor : Processing data : StepB : 0 Zero
MyCustomWriter : Writing data : StepB : 0 ZERO
MyCustomWriter : Writing data : completed. - StepB
MyCustomReader : Reading data : StepC : 0 Zero
MyCustomProcessor : Processing data : StepC : 0 Zero
MyCustomReader : Reading data : StepD : 0 Zero
MyCustomWriter : Writing data : StepC : 0 ZERO
MyCustomWriter : Writing data : completed. - StepC
MyCustomProcessor : Processing data : StepD : 0 Zero
MyCustomWriter : Writing data : StepD : 0 ZERO
MyCustomWriter : Writing data : completed. - StepD
MyCustomReader : Reading data : StepB : 1 One
MyCustomProcessor : Processing data : StepB : 1 One
MyCustomWriter : Writing data : StepB : 1 ONE
MyCustomWriter : Writing data : completed. - StepB
MyCustomReader : Reading data : StepC : 1 One
MyCustomProcessor : Processing data : StepC : 1 One
MyCustomWriter : Writing data : StepC : 1 ONE
MyCustomWriter : Writing data : completed. - StepC
MyCustomReader : Reading data : StepD : 1 One
MyCustomProcessor : Processing data : StepD : 1 One
MyCustomWriter : Writing data : StepD : 1 ONE
MyCustomWriter : Writing data : completed. - StepD
MyCustomReader : Reading data : StepB : 2 Two
MyCustomProcessor : Processing data : StepB : 2 Two
MyCustomReader : Reading data : StepC : 2 Two
MyCustomWriter : Writing data : StepB : 2 TWO
MyCustomWriter : Writing data : completed. - StepB
MyCustomProcessor : Processing data : StepC : 2 Two
MyCustomWriter : Writing data : StepC : 2 TWO
MyCustomWriter : Writing data : completed. - StepC
MyCustomReader : Reading data : StepD : 2 Two
MyCustomProcessor : Processing data : StepD : 2 Two
MyCustomWriter : Writing data : StepD : 2 TWO
MyCustomWriter : Writing data : completed. - StepD
2021-07-24 14:27:21.843 INFO 87719 --- [cTaskExecutor-2] o.s.batch.core.step.AbstractStep : Step: [MyStepC] executed in 10ms
2021-07-24 14:27:21.843 INFO 87719 --- [cTaskExecutor-3] o.s.batch.core.step.AbstractStep : Step: [MyStepD] executed in 10ms
2021-07-24 14:27:21.847 INFO 87719 --- [cTaskExecutor-1] o.s.batch.core.step.AbstractStep : Step: [MyStepB] executed in 14ms
2021-07-24 14:27:21.854 INFO 87719 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=MyJob]] completed with the following parameters: [{run.id=2, time=2021-07-23 17:36:15.420}] and the following status: [COMPLETED] in 64ms