Spring Boot Batch Partitioning Example

The spring boot batch partitioner will help improve the speed of the spring boot batch by utilizing multiple thread execution. The spring boot batch partitioner will use multiple threads to divide and run batch jobs in a multi-threaded environment. The partitioner will boost batch execution performance. Each csv file will be processed by the partitioner in a separate thread.

In this post, we’ll explore at how to create, configure, and run the spring boot batch partitioner. In this spring boot batch partitioning example, multiple CSV files are used to execute in concurrently to improve the spring boot batch’s speed.



Sample CSV Data Files

Two data sample csv files are used in this example. Two data sample files contains employee details like id, name and salary. These files are stored in data folder in the project structure.

data/input.csv

id,name.salary
1,name01,1000
2,name02,2000
3,name03,3000

data/input2.csv

id,name.salary
21,name21,21000
22,name22,22000
23,name23,23000


Data Model class

In this example, the java bean data model class is created. The employee class has fields for id, name, and salary. The Employee class will be assigned the data from the example csv file. The spring boot batch reads data from a csv file and assigns it to the Employee class.

Employee.java

package com.yawintutor;

public class Employee {
	private int id;
	private String name;
	private int salary;
	public int getId() {
		return id;
	}
	public void setId(int id) {
		this.id = id;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public int getSalary() {
		return salary;
	}
	public void setSalary(int salary) {
		this.salary = salary;
	}	
}


Spring Boot Batch Configurations

The batch job and batch step configurations are stored in the spring boot batch configuration file. The FlatFileItemReader class is used to read and parse a csv file. The FlatFileItemReader class is set up with column names and the amount of lines to skip. In the spring boot batch, the MultiResourcePartitioner.class is used to create a partition. The MultiResourcePartitioner reads all csv files in a specified folder and offers batch steps for splitting and executing.

All csv files from the MultiResourcePartitioner class will be read by the master step. Batch will create a multi-threaded environment and allocate each csv file to a thread to run based on the partitioning configuration. Each thread will read the data from the allocated csv file and execute it in accordance with the slave configuration. The slave configuration will use the FlatFileItemReader class and the ItemReader interface to execute the tasks.

BatchConfig.java

package com.yawintutor;

import java.util.List;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.partition.support.MultiResourcePartitioner;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

@Configuration
@EnableBatchProcessing
public class BatchConfig {
	@Autowired
	public JobBuilderFactory jobBuilderFactory;

	@Autowired
	public StepBuilderFactory stepBuilderFactory;
	
	@Bean
	public Job partitioningJob() throws Exception {
		return jobBuilderFactory.get("partitioningJob")
				.incrementer(new RunIdIncrementer())
				.flow(cerateMasterStep())
				.end()
				.build();
	}
	
	@Bean
	public Step cerateMasterStep() throws Exception {
		return stepBuilderFactory.get("MasterStep")
				.partitioner("partition", createPartitioner())
				.step(createSlaveStep())
				.gridSize(5)
				.taskExecutor(new SimpleAsyncTaskExecutor())
				.build();
	}
	 
	@Bean
	public Partitioner createPartitioner() throws Exception {
		MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
		PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
		partitioner.setResources(resolver.getResources("file:./data/*.csv"));
		return partitioner;

	}

	@Bean
	public Step createSlaveStep() {
		return stepBuilderFactory.get("SlaveStep")
				.<Employee, Employee>chunk(1)
				.reader(itemReader(null))
				.writer(itemWriter())
				.build();
	}
	
	
	@StepScope
	@Bean
	public FlatFileItemReader<Employee> itemReader(
	  @Value("#{stepExecutionContext[fileName]}") String fileName)
	  throws UnexpectedInputException, ParseException {
		FlatFileItemReader<Employee> flatFileItemReader  = new FlatFileItemReader<Employee>();
		flatFileItemReader.setResource(new FileSystemResource(fileName.substring(5)));
		
		DefaultLineMapper<Employee> defaultLineMapper = new DefaultLineMapper<Employee>();
		
		DelimitedLineTokenizer delimitedLineTokenizer =new DelimitedLineTokenizer();
		delimitedLineTokenizer.setNames(new String[] { "id", "name", "salary" });
		defaultLineMapper.setLineTokenizer(delimitedLineTokenizer);
		
		BeanWrapperFieldSetMapper<Employee> beanWrapperFieldSetMapper = new BeanWrapperFieldSetMapper<Employee>();
		beanWrapperFieldSetMapper.setTargetType(Employee.class);
		defaultLineMapper.setFieldSetMapper(beanWrapperFieldSetMapper);

		flatFileItemReader.setLinesToSkip(1); 
		flatFileItemReader.setLineMapper(defaultLineMapper);

		return flatFileItemReader;
	}
	
	@Bean
	public ItemWriter<Employee> itemWriter() {
		return new ItemWriter<Employee>() {
			@Override
			public void write(List<? extends Employee> list) throws Exception {
				for (Employee data : list) {
					System.out.println("MyCustomWriter    : Writing data    : " + data.getId()+" : "+data.getName()+" : "+data.getSalary());
				}
			}
		};
	}

}


Scheduler class

SchedulerConfig.java

package com.yawintutor;

import java.text.SimpleDateFormat;
import java.util.Calendar;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;

@Configuration
@EnableScheduling
public class SchedulerConfig {

	@Autowired
	JobLauncher jobLauncher;

	@Autowired
	Job job;

	SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S");

	@Scheduled(fixedDelay = 5000, initialDelay = 5000)
	public void scheduleByFixedRate() throws Exception {
		System.out.println("Batch job starting");
		JobParameters jobParameters = new JobParametersBuilder()
				.addString("time", format.format(Calendar.getInstance().getTime())).toJobParameters();
		jobLauncher.run(job, jobParameters);
		System.out.println("Batch job executed successfully\n");
	}
}


Spring boot Batch Main Class

The main class for the spring boot batch will be shown as below. The default implementation will be used for the main class. The main method in this file will be executed when the spring boot batch application starts.

SpringBootBatch4Application.java

package com.yawintutor;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringBootBatch4Application {

	public static void main(String[] args) {
		SpringApplication.run(SpringBootBatch4Application.class, args);
	}
}


Application properties details

The application properties will be placed in the resources folder’s application.properties files. The application now has two settings. properties. One is for the h2 database url, and the other is for allowing batch-related tables to be created in the database.

application.properties

spring.datasource.url=jdbc:h2:file:./DB
spring.batch.initialize-schema=ALWAYS


Application configuration using pom.xml file

The required application configurations and dependencies are created using maven pom.xml file. The Maven POM.xml file contains dependencies for spring boot batch and h2 database. If you are using some other database other than h2, you need to add the dependency to the respective database in the pom.xml file.

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.5.3</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.yawintutor</groupId>
	<artifactId>SpringBootBatch4</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>SpringBootBatch4</name>
	<description>Demo project for Spring Boot</description>
	<properties>
		<java.version>11</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-batch</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.batch</groupId>
			<artifactId>spring-batch-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>com.h2database</groupId>
			<artifactId>h2</artifactId>
		</dependency>		
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>


How to run

The scheduler will call the JobLancher when the spring boot batch program starts. The spring boot batch will be executed by the JobLancher. The logs will indicate that the batch will be run in a multi-environment scenario. The combined log will be shown from the multiple tables.

Batch job starting
2021-07-25 12:13:19.765  INFO 6127 --- [   scheduling-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=partitioningJob]] launched with the following parameters: [{time=2021-07-25 12:13:19.761}]
2021-07-25 12:13:19.770  INFO 6127 --- [   scheduling-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [MasterStep]
MyCustomWriter    : Writing data    : 21 : name21 : 21000
MyCustomWriter    : Writing data    : 1 : name01 : 1000
MyCustomWriter    : Writing data    : 22 : name22 : 22000
MyCustomWriter    : Writing data    : 2 : name02 : 2000
MyCustomWriter    : Writing data    : 23 : name23 : 23000
MyCustomWriter    : Writing data    : 3 : name03 : 3000
2021-07-25 12:13:19.782  INFO 6127 --- [cTaskExecutor-9] o.s.batch.core.step.AbstractStep         : Step: [SlaveStep:partition0] executed in 9ms
2021-07-25 12:13:19.782  INFO 6127 --- [TaskExecutor-10] o.s.batch.core.step.AbstractStep         : Step: [SlaveStep:partition1] executed in 9ms
2021-07-25 12:13:19.783  INFO 6127 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [MasterStep] executed in 13ms
2021-07-25 12:13:19.784  INFO 6127 --- [   scheduling-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=partitioningJob]] completed with the following parameters: [{time=2021-07-25 12:13:19.761}] and the following status: [COMPLETED] in 18ms
Batch job executed successfully



Leave a Reply