Spring Boot Batch with Chunk Oriented Processing

Spring boot batch is a lightweight, robust framework for autonomously processing batch data without the need for user intervention. The spring boot batch supports two execution methods: Tasklet and Chunk processing. Chunk processing speeds up data processing, reduces database connectivity overhead, and improves network connectivity.

The chunk processing consists of three tasks: ItemReader, ItemProcesser, and ItemWriter. The ItemReader interface reads data from a source and processes it with the ItemProcessor. The data is written in the target by the ItemWriter. When the spring boot batch runs, the reader selects each piece of data, processes it, and then writes it to the destination. This process is repeated until the data is ready for use.

The chunk processing method allows for the processing of a single block of data at a time. If the chunk is specified with a value greater than one, for example, 5, the batch will run the read method 5 times in the ItemReader and process the data 5 times in the ItemProcessor. The ItemWriter’s write method will only be called once, and it will include all 5 process data. All five processed data sets can be committed to a database or transmitted over the network in a single session.

This example is used the spring boot batch classes from the previous post. The basic java classes can. be referred from the link below.

https://www.yawintutor.com/spring-boot-batch-example-step-by-step/



Default Chunk Implementation

The code below illustrates the default Chunk implementation. The chunk is set to 1 by default. The reader selects the data, processes it, and then publishes it to the destination. The three tasks will be executed with each data selection. In this scenario, the database or network connection is opened and closed each time the data is written.

	@Bean
	public Step createStep() {
		return stepBuilderFactory.get("MyStep")
				.<String, String> chunk(1)
				.reader(new MyCustomReader())
				.processor(new MyCustomProcessor())
				.writer(new MyCustomWriter())
				.build();
	}

Complete Spring boot configuration file – SpringConfig.java

package com.yawintutor;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class BatchConfig {
	@Autowired
	public JobBuilderFactory jobBuilderFactory;

	@Autowired
	public StepBuilderFactory stepBuilderFactory;

	@Bean
	public Job createJob() {
		return jobBuilderFactory.get("MyJob")
				.incrementer(new RunIdIncrementer())
				.flow(createStep()).end().build();
	}
	
	@Bean
	public Step createStep() {
		return stepBuilderFactory.get("MyStep")
				.<String, String> chunk(1)
				.reader(new MyCustomReader())
				.processor(new MyCustomProcessor())
				.writer(new MyCustomWriter())
				.build();
	}
	
}


Default Chunk logs

The default chunk with value 1 displays the logs as seen below. Every time data is read, processed, and written to a destination. These procedures will be repeated by the log.

2021-07-23 11:15:16.982  INFO 44295 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=MyJob]] launched with the following parameters: [{run.id=4, time=2021-07-22 16:22:57.293}]
2021-07-23 11:15:17.023  INFO 44295 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [MyStep]
MyCustomReader    : Reading data    : 0 Zero
MyCustomProcessor : Processing data : 0 Zero
MyCustomWriter    : Writing data    : 0 ZERO
MyCustomWriter    : Writing data    : completed
MyCustomReader    : Reading data    : 1 One
MyCustomProcessor : Processing data : 1 One
MyCustomWriter    : Writing data    : 1 ONE
MyCustomWriter    : Writing data    : completed
MyCustomReader    : Reading data    : 2 Two
MyCustomProcessor : Processing data : 2 Two
MyCustomWriter    : Writing data    : 2 TWO
MyCustomWriter    : Writing data    : completed
MyCustomReader    : Reading data    : 3 Three
MyCustomProcessor : Processing data : 3 Three
MyCustomWriter    : Writing data    : 3 THREE
MyCustomWriter    : Writing data    : completed
MyCustomReader    : Reading data    : 4 Four
MyCustomProcessor : Processing data : 4 Four
MyCustomWriter    : Writing data    : 4 FOUR
MyCustomWriter    : Writing data    : completed
MyCustomReader    : Reading data    : 5 Five
MyCustomProcessor : Processing data : 5 Five
MyCustomWriter    : Writing data    : 5 FIVE
MyCustomWriter    : Writing data    : completed
2021-07-23 11:15:17.052  INFO 44295 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [MyStep] executed in 29ms
2021-07-23 11:15:17.057  INFO 44295 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=MyJob]] completed with the following parameters: [{run.id=4, time=2021-07-22 16:22:57.293}] and the following status: [COMPLETED] in 42ms


Connectivity optimisation using chunk

In the following example, the chunk will be specified with a value larger than 1. Before processing the processor, the ItemReader will call read with the configured times. All read values will be processed by the ItemProcesser. The ItemWriter will be called once and will execute all of the processed data and write it all at once.

	@Bean
	public Step createStep() {
		return stepBuilderFactory.get("MyStep")
				.<String, String> chunk(6)
				.reader(new MyCustomReader())
				.processor(new MyCustomProcessor())
				.writer(new MyCustomWriter())
				.build();
	}

Complete spring configuration file – SpringConfig.java

package com.yawintutor;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class BatchConfig {
	@Autowired
	public JobBuilderFactory jobBuilderFactory;

	@Autowired
	public StepBuilderFactory stepBuilderFactory;

	@Bean
	public Job createJob() {
		return jobBuilderFactory.get("MyJob")
				.incrementer(new RunIdIncrementer())
				.flow(createStep()).end().build();
	}
	
	@Bean
	public Step createStep() {
		return stepBuilderFactory.get("MyStep")
				.<String, String> chunk(6)
				.reader(new MyCustomReader())
				.processor(new MyCustomProcessor())
				.writer(new MyCustomWriter())
				.build();
	}
	
}


Logs for the Chunk with greater than 1

According to the logs below, the item writer only calls the write method once. The item reader and item process call the read and process method the number of times specified.

2021-07-23 11:19:12.242  INFO 44491 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=MyJob]] launched with the following parameters: [{run.id=5, time=2021-07-22 16:22:57.293}]
2021-07-23 11:19:12.276  INFO 44491 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [MyStep]
MyCustomReader    : Reading data    : 0 Zero
MyCustomReader    : Reading data    : 1 One
MyCustomReader    : Reading data    : 2 Two
MyCustomReader    : Reading data    : 3 Three
MyCustomReader    : Reading data    : 4 Four
MyCustomReader    : Reading data    : 5 Five
MyCustomProcessor : Processing data : 0 Zero
MyCustomProcessor : Processing data : 1 One
MyCustomProcessor : Processing data : 2 Two
MyCustomProcessor : Processing data : 3 Three
MyCustomProcessor : Processing data : 4 Four
MyCustomProcessor : Processing data : 5 Five
MyCustomWriter    : Writing data    : 0 ZERO
MyCustomWriter    : Writing data    : 1 ONE
MyCustomWriter    : Writing data    : 2 TWO
MyCustomWriter    : Writing data    : 3 THREE
MyCustomWriter    : Writing data    : 4 FOUR
MyCustomWriter    : Writing data    : 5 FIVE
MyCustomWriter    : Writing data    : completed
2021-07-23 11:19:12.295  INFO 44491 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [MyStep] executed in 19ms
2021-07-23 11:19:12.301  INFO 44491 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=MyJob]] completed with the following parameters: [{run.id=5, time=2021-07-22 16:22:57.293}] and the following status: [COMPLETED] in 34ms


Chunk Tasks

As previously said, there are three kinds of tasks. ItemReader, ItemProcessor, and ItemWriter. The three classes that have been implemented are given below.



ItemReader Implementation

MyCustomReader.java

package com.yawintutor;

import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;

public class MyCustomReader implements ItemReader<String>{

	private String[] stringArray = { "Zero", "One", "Two", "Three", "Four", "Five" };

	private int index = 0;

	@Override
	public String read() throws Exception, UnexpectedInputException,
			ParseException, NonTransientResourceException {
		if (index >= stringArray.length) {
			return null;
		}
		
		String data = index + " " + stringArray[index];
		index++;
		System.out.println("MyCustomReader    : Reading data    : "+ data);
		return data;
	}
}


ItemProcessor Implementation

MyCustomProcessor.java

package com.yawintutor;

import org.springframework.batch.item.ItemProcessor;

public class MyCustomProcessor implements ItemProcessor<String, String> {

	@Override
	public String process(String data) throws Exception {
		System.out.println("MyCustomProcessor : Processing data : "+data);
		data = data.toUpperCase();
		return data;
	}
}


ItemWriter Implemenation

MyCustomWriter.java

package com.yawintutor;

import java.util.List;

import org.springframework.batch.item.ItemWriter;

public class MyCustomWriter implements ItemWriter<String> {

	@Override
	public void write(List<? extends String> list) throws Exception {
		for (String data : list) {
			System.out.println("MyCustomWriter    : Writing data    : " + data);
		}
		System.out.println("MyCustomWriter    : Writing data    : completed");
	}
}



Leave a Reply