Spring Boot Batch with Parallel Processing

The spring boot batch with parallel processing does multiple batch tasks at the same time. If you wish to perform multiple batch tasks concurrently, you must create multiple schedulers to run at the same time. Spring boot batch enables you to execute many tasks concurrently. Parallel processing will help in the execution of several tasks at the same time.

The parallel process executes multiple tasks concurrently and waits for all tasks to be completed before executing the next batch step. This will help in the development of the spring boot batch so that it can be executed efficiently. Spring boot batch tasks can be configured to run in parallel or sequential order depending on the nature of the data and processing needs.

In this post, we’ll look at how to set up the spring boot batch for parallel processing. In the spring boot batch, you may create batch tasks to run concurrently or sequentially. The step-by-step instructions in this post will assist you in achieving concurrent parallel processing of spring boot batch tasks.



Batch configurations

The Flow interface is used in spring boot to achieve parallel processing. The Flow interface specifies how and when the batch steps should be executed. The batch flow allows you to choose the order of the batch steps. The batch flow is contained in the spring boot batch jobs, and the batch flow consists of stages that can be performed in parallel or sequentially. The batch steps include the execution tasks. All of these batch-related objects are configured in the batch configuration.

BatchConfig.java

package com.yawintutor;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

@Configuration
@EnableBatchProcessing
public class BatchConfig {
	@Autowired
	public JobBuilderFactory jobBuilderFactory;

	@Autowired
	public StepBuilderFactory stepBuilderFactory;

	@Bean
	public Job createJob() {
		Flow flowA = new FlowBuilder<Flow>("flowA").start(createStepA()).build();

		Flow flowB = new FlowBuilder<Flow>("flowB").start(createStepB()).build();
		Flow flowC = new FlowBuilder<Flow>("flowC").start(createStepC()).build();
		Flow flowD = new FlowBuilder<Flow>("flowD").start(createStepD()).build();

		Flow parallelFlow = new FlowBuilder<Flow>("parellelFlow").split(new SimpleAsyncTaskExecutor())
				.add(flowB, flowC, flowD).build();

		return jobBuilderFactory.get("MyJob").incrementer(new RunIdIncrementer()).start(flowA).next(parallelFlow).end()
				.build();
	}

	@Bean
	public Step createStepA() {
		return stepBuilderFactory.get("MyStepA").<String, String>chunk(1).reader(new MyCustomReader("StepA"))
				.processor(new MyCustomProcessor("StepA")).writer(new MyCustomWriter("StepA")).build();
	}

	@Bean
	public Step createStepB() {
		return stepBuilderFactory.get("MyStepB").<String, String>chunk(1).reader(new MyCustomReader("StepB"))
				.processor(new MyCustomProcessor("StepB")).writer(new MyCustomWriter("StepB")).build();
	}

	@Bean
	public Step createStepC() {
		return stepBuilderFactory.get("MyStepC").<String, String>chunk(1).reader(new MyCustomReader("StepC"))
				.processor(new MyCustomProcessor("StepC")).writer(new MyCustomWriter("StepC")).build();
	}

	@Bean
	public Step createStepD() {
		return stepBuilderFactory.get("MyStepD").<String, String>chunk(1).reader(new MyCustomReader("StepD"))
				.processor(new MyCustomProcessor("StepD")).writer(new MyCustomWriter("StepD")).build();
	}
}


ItemReader Implementation

The ItemReader interface will help in the creation of Reader objects. The processing data will be retrieved from the source by the reader object. A read method is provided by the ItemReader interface. When the spring boot batch runs, the read method in the ItemReader interface is called. The unique data will be read for batch processing.

MyCustomReader.java

package com.yawintutor;

import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;

public class MyCustomReader implements ItemReader<String>{

	private String[] stringArray = { "Zero", "One", "Two" };

	private int index = 0;
	
	private String name;
	
	public MyCustomReader(String name) {
		this.name = name;
	}

	@Override
	public String read() throws Exception, UnexpectedInputException,
			ParseException, NonTransientResourceException {
		if (index >= stringArray.length) {
			return null;
		}
		
		String data = index + " " + stringArray[index];
		index++;
		System.out.println("MyCustomReader    : Reading data    : "+name+" : "+ data);
		return data;
	}
}


ItemProcessor Implementation

The ItemProcessor interface will help to implement the Processor objects. The processor object will processing the data that is read from the source. The ItemProcessor interface contains a process method. When the spring boot batch executes, the batch will execute the process method in the ItemProcessor interface.

MyCustomProcessor.java

package com.yawintutor;

import org.springframework.batch.item.ItemProcessor;

public class MyCustomProcessor implements ItemProcessor<String, String> {

	private String name;
	
	public MyCustomProcessor(String name) {
		this.name = name;
	}

	@Override
	public String process(String data) throws Exception {
		System.out.println("MyCustomProcessor : Processing data : "+name+" : "+data);
		data = data.toUpperCase();
		return data;
	}
}


ItemWriter Implementation

The ItemWriter interface will help in the creation of Writer objects. The processing data will be stored in the destination by the writer object. A write method is provided by the ItemWriter interface. When the spring boot batch runs, the write method in the ItemWriter interface is called.

MyCustomWriter.java

package com.yawintutor;

import java.util.List;

import org.springframework.batch.item.ItemWriter;

public class MyCustomWriter implements ItemWriter<String> {

	private String name;
	
	public MyCustomWriter(String name) {
		this.name = name;
	}

	@Override
	public void write(List<? extends String> list) throws Exception {
		for (String data : list) {
			System.out.println("MyCustomWriter    : Writing data    : "+name+" : " + data);
		}
		System.out.println("MyCustomWriter    : Writing data    : completed. - "+name);
	}
}


Scheduler Configuration

The spring boot scheduler configuration file will run the spring boot batch on a regular basis. When the spring boot program is started, the scheduler will execute the specified method. The method will invoke the JobLauncher class, which will run the spring boot batch.

SchedulerConfig.java

package com.yawintutor;

import java.text.SimpleDateFormat;
import java.util.Calendar;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;

@Configuration
@EnableScheduling
public class SchedulerConfig {

	@Autowired
	JobLauncher jobLauncher;

	@Autowired
	Job job;

	SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S");

	@Scheduled(fixedDelay = 5000, initialDelay = 5000)
	public void scheduleByFixedRate() throws Exception {
		System.out.println("Batch job starting");
		JobParameters jobParameters = new JobParametersBuilder()
				.addString("time", format.format(Calendar.getInstance().getTime())).toJobParameters();
		jobLauncher.run(job, jobParameters);
		System.out.println("Batch job executed successfully\n");
	}
}


Spring Boot Main Method

The spring boot main method will be shown as below. The main method will invoke when the spring boot application starts.

SpringBootBatch3Application.java

package com.yawintutor;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringBootBatch3Application {

	public static void main(String[] args) {
		SpringApplication.run(SpringBootBatch3Application.class, args);
	}
}


Application properties file

The application.properties file contains two properties configuration. The first property is for configuring the database url. The second property to enable to create batch related tables in the database.

application.properties

spring.datasource.url=jdbc:h2:file:./DB
spring.batch.initialize-schema=ALWAYS


Pom.xml

The pom.xml file contains spring boot batch dependencies and the database dependencies. The spring boot batch configuration will include all the jars related to spring boot batch.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.5.3</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.yawintutor</groupId>
	<artifactId>SpringBootBatch3</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>SpringBootBatch3</name>
	<description>Demo project for Spring Boot</description>
	<properties>
		<java.version>11</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-batch</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.batch</groupId>
			<artifactId>spring-batch-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>com.h2database</groupId>
			<artifactId>h2</artifactId>
		</dependency>		
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>


How to run Spring Boot Batch

If you start the spring boot application, the spring boot scheduler will invoke the spring boot batch code. The JobLauncher class will invoke the spring boot batch jobs. The spring boot steps will execute in parallel. The logs will be shown as below.

2021-07-24 14:27:21.760  INFO 87719 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=MyJob]] launched with the following parameters: [{run.id=2, time=2021-07-23 17:36:15.420}]
2021-07-24 14:27:21.802  INFO 87719 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [MyStepA]
MyCustomReader    : Reading data    : StepA : 0 Zero
MyCustomProcessor : Processing data : StepA : 0 Zero
MyCustomWriter    : Writing data    : StepA : 0 ZERO
MyCustomWriter    : Writing data    : completed. - StepA
MyCustomReader    : Reading data    : StepA : 1 One
MyCustomProcessor : Processing data : StepA : 1 One
MyCustomWriter    : Writing data    : StepA : 1 ONE
MyCustomWriter    : Writing data    : completed. - StepA
MyCustomReader    : Reading data    : StepA : 2 Two
MyCustomProcessor : Processing data : StepA : 2 Two
MyCustomWriter    : Writing data    : StepA : 2 TWO
MyCustomWriter    : Writing data    : completed. - StepA
2021-07-24 14:27:21.823  INFO 87719 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [MyStepA] executed in 21ms
2021-07-24 14:27:21.832  INFO 87719 --- [cTaskExecutor-2] o.s.batch.core.job.SimpleStepHandler     : Executing step: [MyStepC]
2021-07-24 14:27:21.832  INFO 87719 --- [cTaskExecutor-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [MyStepB]
2021-07-24 14:27:21.833  INFO 87719 --- [cTaskExecutor-3] o.s.batch.core.job.SimpleStepHandler     : Executing step: [MyStepD]
MyCustomReader    : Reading data    : StepB : 0 Zero
MyCustomProcessor : Processing data : StepB : 0 Zero
MyCustomWriter    : Writing data    : StepB : 0 ZERO
MyCustomWriter    : Writing data    : completed. - StepB
MyCustomReader    : Reading data    : StepC : 0 Zero
MyCustomProcessor : Processing data : StepC : 0 Zero
MyCustomReader    : Reading data    : StepD : 0 Zero
MyCustomWriter    : Writing data    : StepC : 0 ZERO
MyCustomWriter    : Writing data    : completed. - StepC
MyCustomProcessor : Processing data : StepD : 0 Zero
MyCustomWriter    : Writing data    : StepD : 0 ZERO
MyCustomWriter    : Writing data    : completed. - StepD
MyCustomReader    : Reading data    : StepB : 1 One
MyCustomProcessor : Processing data : StepB : 1 One
MyCustomWriter    : Writing data    : StepB : 1 ONE
MyCustomWriter    : Writing data    : completed. - StepB
MyCustomReader    : Reading data    : StepC : 1 One
MyCustomProcessor : Processing data : StepC : 1 One
MyCustomWriter    : Writing data    : StepC : 1 ONE
MyCustomWriter    : Writing data    : completed. - StepC
MyCustomReader    : Reading data    : StepD : 1 One
MyCustomProcessor : Processing data : StepD : 1 One
MyCustomWriter    : Writing data    : StepD : 1 ONE
MyCustomWriter    : Writing data    : completed. - StepD
MyCustomReader    : Reading data    : StepB : 2 Two
MyCustomProcessor : Processing data : StepB : 2 Two
MyCustomReader    : Reading data    : StepC : 2 Two
MyCustomWriter    : Writing data    : StepB : 2 TWO
MyCustomWriter    : Writing data    : completed. - StepB
MyCustomProcessor : Processing data : StepC : 2 Two
MyCustomWriter    : Writing data    : StepC : 2 TWO
MyCustomWriter    : Writing data    : completed. - StepC
MyCustomReader    : Reading data    : StepD : 2 Two
MyCustomProcessor : Processing data : StepD : 2 Two
MyCustomWriter    : Writing data    : StepD : 2 TWO
MyCustomWriter    : Writing data    : completed. - StepD
2021-07-24 14:27:21.843  INFO 87719 --- [cTaskExecutor-2] o.s.batch.core.step.AbstractStep         : Step: [MyStepC] executed in 10ms
2021-07-24 14:27:21.843  INFO 87719 --- [cTaskExecutor-3] o.s.batch.core.step.AbstractStep         : Step: [MyStepD] executed in 10ms
2021-07-24 14:27:21.847  INFO 87719 --- [cTaskExecutor-1] o.s.batch.core.step.AbstractStep         : Step: [MyStepB] executed in 14ms
2021-07-24 14:27:21.854  INFO 87719 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=MyJob]] completed with the following parameters: [{run.id=2, time=2021-07-23 17:36:15.420}] and the following status: [COMPLETED] in 64ms



Leave a Reply